source: fedd/federation/experiment_control.py @ 95be336

compt_changesinfo-ops
Last change on this file since 95be336 was 95be336, checked in by Ted Faber <faber@…>, 12 years ago

Use -set_serial instead of fighting the serial directory format #33

  • Property mode set to 100644
File size: 87.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38
39import topdl
40import list_log
41from ip_allocator import ip_allocator
42from ip_addr import ip_addr
43
44
45class nullHandler(logging.Handler):
46    def emit(self, record): pass
47
48fl = logging.getLogger("fedd.experiment_control")
49fl.addHandler(nullHandler())
50
51
52# Right now, no support for composition.
53class federated_service:
54    def __init__(self, name, exporter=None, importers=None, params=None, 
55            reqs=None, portal=None):
56        self.name=name
57        self.exporter=exporter
58        if importers is None: self.importers = []
59        else: self.importers=importers
60        if params is None: self.params = { }
61        else: self.params = params
62        if reqs is None: self.reqs = []
63        else: self.reqs = reqs
64
65        if portal is not None:
66            self.portal = portal
67        else:
68            self.portal = (name in federated_service.needs_portal)
69
70    def __str__(self):
71        return "name %s export %s import %s params %s reqs %s" % \
72                (self.name, self.exporter, self.importers, self.params,
73                        [ (r['name'], r['visibility']) for r in self.reqs] )
74
75    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
76
77class experiment_control_local(experiment_control_legacy):
78    """
79    Control of experiments that this system can directly access.
80
81    Includes experiment creation, termination and information dissemination.
82    Thred safe.
83    """
84
85    class ssh_cmd_timeout(RuntimeError): pass
86   
87    call_RequestAccess = service_caller('RequestAccess')
88    call_ReleaseAccess = service_caller('ReleaseAccess')
89    call_StartSegment = service_caller('StartSegment')
90    call_TerminateSegment = service_caller('TerminateSegment')
91    call_Ns2Topdl = service_caller('Ns2Topdl')
92
93    def __init__(self, config=None, auth=None):
94        """
95        Intialize the various attributes, most from the config object
96        """
97
98        def parse_tarfile_list(tf):
99            """
100            Parse a tarfile list from the configuration.  This is a set of
101            paths and tarfiles separated by spaces.
102            """
103            rv = [ ]
104            if tf is not None:
105                tl = tf.split()
106                while len(tl) > 1:
107                    p, t = tl[0:2]
108                    del tl[0:2]
109                    rv.append((p, t))
110            return rv
111
112        self.list_log = list_log.list_log
113
114        self.cert_file = config.get("experiment_control", "cert_file")
115        if self.cert_file:
116            self.cert_pwd = config.get("experiment_control", "cert_pwd")
117        else:
118            self.cert_file = config.get("globals", "cert_file")
119            self.cert_pwd = config.get("globals", "cert_pwd")
120
121        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
122                or config.get("globals", "trusted_certs")
123
124        self.repodir = config.get("experiment_control", "repodir")
125        self.repo_url = config.get("experiment_control", "repo_url", 
126                "https://users.isi.deterlab.net:23235");
127
128        self.exp_stem = "fed-stem"
129        self.log = logging.getLogger("fedd.experiment_control")
130        set_log_level(config, "experiment_control", self.log)
131        self.muxmax = 2
132        self.nthreads = 10
133        self.randomize_experiments = False
134
135        self.splitter = None
136        self.ssh_keygen = "/usr/bin/ssh-keygen"
137        self.ssh_identity_file = None
138
139
140        self.debug = config.getboolean("experiment_control", "create_debug")
141        self.cleanup = not config.getboolean("experiment_control", 
142                "leave_tmpfiles")
143        self.state_filename = config.get("experiment_control", 
144                "experiment_state")
145        self.store_filename = config.get("experiment_control", 
146                "synch_store")
147        self.store_url = config.get("experiment_control", "store_url")
148        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
149        self.fedkit = parse_tarfile_list(\
150                config.get("experiment_control", "fedkit"))
151        self.gatewaykit = parse_tarfile_list(\
152                config.get("experiment_control", "gatewaykit"))
153        accessdb_file = config.get("experiment_control", "accessdb")
154
155        dt = config.get("experiment_control", "direct_transit")
156        self.auth_type = config.get('experiment_control', 'auth_type') \
157                or 'legacy'
158        self.auth_dir = config.get('experiment_control', 'auth_dir')
159        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
160        else: self.direct_transit = [ ]
161        # NB for internal master/slave ops, not experiment setup
162        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
163
164        self.overrides = set([])
165        ovr = config.get('experiment_control', 'overrides')
166        if ovr:
167            for o in ovr.split(","):
168                o = o.strip()
169                if o.startswith('fedid:'): o = o[len('fedid:'):]
170                self.overrides.add(fedid(hexstr=o))
171
172        self.state = { }
173        self.state_lock = Lock()
174        self.tclsh = "/usr/local/bin/otclsh"
175        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
176                config.get("experiment_control", "tcl_splitter",
177                        "/usr/testbed/lib/ns2ir/parse.tcl")
178        mapdb_file = config.get("experiment_control", "mapdb")
179        self.trace_file = sys.stderr
180
181        self.def_expstart = \
182                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
183                "/tmp/federate";
184        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
185                "FEDDIR/hosts";
186        self.def_gwstart = \
187                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
188                "/tmp/bridge.log";
189        self.def_mgwstart = \
190                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
191                "/tmp/bridge.log";
192        self.def_gwimage = "FBSD61-TUNNEL2";
193        self.def_gwtype = "pc";
194        self.local_access = { }
195
196        if self.auth_type == 'legacy':
197            if auth:
198                self.auth = auth
199            else:
200                self.log.error( "[access]: No authorizer initialized, " +\
201                        "creating local one.")
202                auth = authorizer()
203            self.get_access = self.legacy_get_access
204        elif self.auth_type == 'abac':
205            self.auth = abac_authorizer(load=self.auth_dir)
206        else:
207            raise service_error(service_error.internal, 
208                    "Unknown auth_type: %s" % self.auth_type)
209
210        if mapdb_file:
211            self.read_mapdb(mapdb_file)
212        else:
213            self.log.warn("[experiment_control] No testbed map, using defaults")
214            self.tbmap = { 
215                    'deter':'https://users.isi.deterlab.net:23235',
216                    'emulab':'https://users.isi.deterlab.net:23236',
217                    'ucb':'https://users.isi.deterlab.net:23237',
218                    }
219
220        if accessdb_file:
221                self.read_accessdb(accessdb_file)
222        else:
223            raise service_error(service_error.internal,
224                    "No accessdb specified in config")
225
226        # Grab saved state.  OK to do this w/o locking because it's read only
227        # and only one thread should be in existence that can see self.state at
228        # this point.
229        if self.state_filename:
230            self.read_state()
231
232        if self.store_filename:
233            self.read_store()
234        else:
235            self.log.warning("No saved synch store")
236            self.synch_store = synch_store
237
238        # Dispatch tables
239        self.soap_services = {\
240                'New': soap_handler('New', self.new_experiment),
241                'Create': soap_handler('Create', self.create_experiment),
242                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
243                'Vis': soap_handler('Vis', self.get_vis),
244                'Info': soap_handler('Info', self.get_info),
245                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
246                'Terminate': soap_handler('Terminate', 
247                    self.terminate_experiment),
248                'GetValue': soap_handler('GetValue', self.GetValue),
249                'SetValue': soap_handler('SetValue', self.SetValue),
250        }
251
252        self.xmlrpc_services = {\
253                'New': xmlrpc_handler('New', self.new_experiment),
254                'Create': xmlrpc_handler('Create', self.create_experiment),
255                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
256                'Vis': xmlrpc_handler('Vis', self.get_vis),
257                'Info': xmlrpc_handler('Info', self.get_info),
258                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
259                'Terminate': xmlrpc_handler('Terminate',
260                    self.terminate_experiment),
261                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
262                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
263        }
264
265    # Call while holding self.state_lock
266    def write_state(self):
267        """
268        Write a new copy of experiment state after copying the existing state
269        to a backup.
270
271        State format is a simple pickling of the state dictionary.
272        """
273        if os.access(self.state_filename, os.W_OK):
274            copy_file(self.state_filename, \
275                    "%s.bak" % self.state_filename)
276        try:
277            f = open(self.state_filename, 'w')
278            pickle.dump(self.state, f)
279        except EnvironmentError, e:
280            self.log.error("Can't write file %s: %s" % \
281                    (self.state_filename, e))
282        except pickle.PicklingError, e:
283            self.log.error("Pickling problem: %s" % e)
284        except TypeError, e:
285            self.log.error("Pickling problem (TypeError): %s" % e)
286
287    @staticmethod
288    def get_alloc_ids(state):
289        """
290        Pull the fedids of the identifiers of each allocation from the
291        state.  Again, a dict dive that's best isolated.
292
293        Used by read_store and read state
294        """
295
296        return [ f['allocID']['fedid'] 
297                for f in state.get('federant',[]) \
298                    if f.has_key('allocID') and \
299                        f['allocID'].has_key('fedid')]
300
301    # Call while holding self.state_lock
302    def read_state(self):
303        """
304        Read a new copy of experiment state.  Old state is overwritten.
305
306        State format is a simple pickling of the state dictionary.
307        """
308       
309        def get_experiment_id(state):
310            """
311            Pull the fedid experimentID out of the saved state.  This is kind
312            of a gross walk through the dict.
313            """
314
315            if state.has_key('experimentID'):
316                for e in state['experimentID']:
317                    if e.has_key('fedid'):
318                        return e['fedid']
319                else:
320                    return None
321            else:
322                return None
323
324        try:
325            f = open(self.state_filename, "r")
326            self.state = pickle.load(f)
327            self.log.debug("[read_state]: Read state from %s" % \
328                    self.state_filename)
329        except EnvironmentError, e:
330            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
331                    % (self.state_filename, e))
332        except pickle.UnpicklingError, e:
333            self.log.warning(("[read_state]: No saved state: " + \
334                    "Unpickling failed: %s") % e)
335       
336        for s in self.state.values():
337            try:
338
339                eid = get_experiment_id(s)
340                if eid : 
341                    if self.auth_type == 'legacy':
342                        # XXX: legacy
343                        # Give the owner rights to the experiment
344                        self.auth.set_attribute(s['owner'], eid)
345                        # And holders of the eid as well
346                        self.auth.set_attribute(eid, eid)
347                        # allow overrides to control experiments as well
348                        for o in self.overrides:
349                            self.auth.set_attribute(o, eid)
350                        # Set permissions to allow reading of the software
351                        # repo, if any, as well.
352                        for a in self.get_alloc_ids(s):
353                            self.auth.set_attribute(a, 'repo/%s' % eid)
354                else:
355                    raise KeyError("No experiment id")
356            except KeyError, e:
357                self.log.warning("[read_state]: State ownership or identity " +\
358                        "misformatted in %s: %s" % (self.state_filename, e))
359
360
361    def read_accessdb(self, accessdb_file):
362        """
363        Read the mapping from fedids that can create experiments to their name
364        in the 3-level access namespace.  All will be asserted from this
365        testbed and can include the local username and porject that will be
366        asserted on their behalf by this fedd.  Each fedid is also added to the
367        authorization system with the "create" attribute.
368        """
369        self.accessdb = {}
370        # These are the regexps for parsing the db
371        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
372        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
374        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
375                "\s*->\s*(" + name_expr + ")\s*$")
376        lineno = 0
377
378        # Parse the mappings and store in self.authdb, a dict of
379        # fedid -> (proj, user)
380        try:
381            f = open(accessdb_file, "r")
382            for line in f:
383                lineno += 1
384                line = line.strip()
385                if len(line) == 0 or line.startswith('#'):
386                    continue
387                m = project_line.match(line)
388                if m:
389                    fid = fedid(hexstr=m.group(1))
390                    project, user = m.group(2,3)
391                    if not self.accessdb.has_key(fid):
392                        self.accessdb[fid] = []
393                    self.accessdb[fid].append((project, user))
394                    continue
395
396                m = user_line.match(line)
397                if m:
398                    fid = fedid(hexstr=m.group(1))
399                    project = None
400                    user = m.group(2)
401                    if not self.accessdb.has_key(fid):
402                        self.accessdb[fid] = []
403                    self.accessdb[fid].append((project, user))
404                    continue
405                self.log.warn("[experiment_control] Error parsing access " +\
406                        "db %s at line %d" %  (accessdb_file, lineno))
407        except EnvironmentError:
408            raise service_error(service_error.internal,
409                    ("Error opening/reading %s as experiment " +\
410                            "control accessdb") %  accessdb_file)
411        f.close()
412
413        # Initialize the authorization attributes
414        # XXX: legacy
415        if self.auth_type == 'legacy':
416            for fid in self.accessdb.keys():
417                self.auth.set_attribute(fid, 'create')
418                self.auth.set_attribute(fid, 'new')
419
420    def read_mapdb(self, file):
421        """
422        Read a simple colon separated list of mappings for the
423        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
424        """
425
426        self.tbmap = { }
427        lineno =0
428        try:
429            f = open(file, "r")
430            for line in f:
431                lineno += 1
432                line = line.strip()
433                if line.startswith('#') or len(line) == 0:
434                    continue
435                try:
436                    label, url = line.split(':', 1)
437                    self.tbmap[label] = url
438                except ValueError, e:
439                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
440                            "map db: %s %s" % (lineno, line, e))
441        except EnvironmentError, e:
442            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
443                    "open %s: %s" % (file, e))
444        else:
445            f.close()
446
447    def read_store(self):
448        try:
449            self.synch_store = synch_store()
450            self.synch_store.load(self.store_filename)
451            self.log.debug("[read_store]: Read store from %s" % \
452                    self.store_filename)
453        except EnvironmentError, e:
454            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
455                    % (self.state_filename, e))
456            self.synch_store = synch_store()
457
458        # Set the initial permissions on data in the store.  XXX: This ad hoc
459        # authorization attribute initialization is getting out of hand.
460        # XXX: legacy
461        if self.auth_type == 'legacy':
462            for k in self.synch_store.all_keys():
463                try:
464                    if k.startswith('fedid:'):
465                        fid = fedid(hexstr=k[6:46])
466                        if self.state.has_key(fid):
467                            for a in self.get_alloc_ids(self.state[fid]):
468                                self.auth.set_attribute(a, k)
469                except ValueError, e:
470                    self.log.warn("Cannot deduce permissions for %s" % k)
471
472
473    def write_store(self):
474        """
475        Write a new copy of synch_store after writing current state
476        to a backup.  We use the internal synch_store pickle method to avoid
477        incinsistent data.
478
479        State format is a simple pickling of the store.
480        """
481        if os.access(self.store_filename, os.W_OK):
482            copy_file(self.store_filename, \
483                    "%s.bak" % self.store_filename)
484        try:
485            self.synch_store.save(self.store_filename)
486        except EnvironmentError, e:
487            self.log.error("Can't write file %s: %s" % \
488                    (self.store_filename, e))
489        except TypeError, e:
490            self.log.error("Pickling problem (TypeError): %s" % e)
491
492
493    def remove_dirs(self, dir):
494        """
495        Remove the directory tree and all files rooted at dir.  Log any errors,
496        but continue.
497        """
498        self.log.debug("[removedirs]: removing %s" % dir)
499        try:
500            for path, dirs, files in os.walk(dir, topdown=False):
501                for f in files:
502                    os.remove(os.path.join(path, f))
503                for d in dirs:
504                    os.rmdir(os.path.join(path, d))
505            os.rmdir(dir)
506        except EnvironmentError, e:
507            self.log.error("Error deleting directory tree in %s" % e);
508
509    @staticmethod
510    def make_temp_certfile(expcert, tmpdir):
511        """
512        make a protected copy of the access certificate so the experiment
513        controller can act as the experiment principal.  mkstemp is the most
514        secure way to do that. The directory should be created by
515        mkdtemp.  Return the filename.
516        """
517        if expcert and tmpdir:
518            try:
519                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
520                f = os.fdopen(certf, 'w')
521                print >> f, expcert
522                f.close()
523            except EnvironmentError, e:
524                raise service_error(service_error.internal, 
525                        "Cannot create temp cert file?")
526            return certfn
527        else:
528            return None
529
530       
531    def generate_ssh_keys(self, dest, type="rsa" ):
532        """
533        Generate a set of keys for the gateways to use to talk.
534
535        Keys are of type type and are stored in the required dest file.
536        """
537        valid_types = ("rsa", "dsa")
538        t = type.lower();
539        if t not in valid_types: raise ValueError
540        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
541
542        try:
543            trace = open("/dev/null", "w")
544        except EnvironmentError:
545            raise service_error(service_error.internal,
546                    "Cannot open /dev/null??");
547
548        # May raise CalledProcessError
549        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
550        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
551        if rv != 0:
552            raise service_error(service_error.internal, 
553                    "Cannot generate nonce ssh keys.  %s return code %d" \
554                            % (self.ssh_keygen, rv))
555
556    def generate_seer_certs(self, destdir):
557        '''
558        Create a SEER ca cert and a node cert in destdir/ca.pem and
559        destdir/node.pem respectively.  These will be distributed throughout
560        the federated experiment.  This routine reports errors via
561        service_errors.
562        '''
563        openssl = '/usr/bin/openssl'
564        # All the filenames and parameters we need for openssl calls below
565        ca_key =os.path.join(destdir, 'ca.key') 
566        ca_pem = os.path.join(destdir, 'ca.pem')
567        node_key =os.path.join(destdir, 'node.key') 
568        node_pem = os.path.join(destdir, 'node.pem')
569        node_req = os.path.join(destdir, 'node.req')
570        node_signed = os.path.join(destdir, 'node.signed')
571        days = '%s' % (365 * 10)
572        serial = '%s' % random.randint(0, 1<<16)
573
574        try:
575            # Sequence of calls to create a CA key, create a ca cert, create a
576            # node key, node signing request, and finally a signed node
577            # certificate.
578            sequence = (
579                    (openssl, 'genrsa', '-out', ca_key, '1024'),
580                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
581                        ca_pem, '-days', days, '-subj', 
582                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
583                    (openssl, 'genrsa', '-out', node_key, '1024'),
584                    (openssl, 'req', '-new', '-key', node_key, '-out', 
585                        node_req, '-days', days, '-subj', 
586                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
587                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
588                        '-set_serial', serial, '-req', '-in', node_req, 
589                        '-out', node_signed, '-days', days),
590                )
591            # Do all that stuff; bail if there's an error, and push all the
592            # output to dev/null.
593            for cmd in sequence:
594                trace = open("/dev/null", "w")
595                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
596                if rv != 0:
597                    raise service_error(service_error.internal, 
598                            "Cannot generate SEER certs.  %s return code %d" \
599                                    % (' '.join(cmd), rv))
600            # Concatinate the node key and signed certificate into node.pem
601            f = open(node_pem, 'w')
602            for comp in (node_signed, node_key):
603                g = open(comp, 'r')
604                f.write(g.read())
605                g.close()
606            f.close()
607
608            # Throw out intermediaries.
609            for fn in (ca_key, node_key, node_req, node_signed):
610                os.unlink(fn)
611
612        except EnvironmentError, e:
613            # Any difficulties with the file system wind up here
614            raise service_error(service_error.internal,
615                    "File error on  %s while creating SEER certs: %s" % \
616                            (e.filename, e.strerror))
617
618
619
620    def gentopo(self, str):
621        """
622        Generate the topology data structure from the splitter's XML
623        representation of it.
624
625        The topology XML looks like:
626            <experiment>
627                <nodes>
628                    <node><vname></vname><ips>ip1:ip2</ips></node>
629                </nodes>
630                <lans>
631                    <lan>
632                        <vname></vname><vnode></vnode><ip></ip>
633                        <bandwidth></bandwidth><member>node:port</member>
634                    </lan>
635                </lans>
636        """
637        class topo_parse:
638            """
639            Parse the topology XML and create the dats structure.
640            """
641            def __init__(self):
642                # Typing of the subelements for data conversion
643                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
644                self.int_subelements = ( 'bandwidth',)
645                self.float_subelements = ( 'delay',)
646                # The final data structure
647                self.nodes = [ ]
648                self.lans =  [ ]
649                self.topo = { \
650                        'node': self.nodes,\
651                        'lan' : self.lans,\
652                    }
653                self.element = { }  # Current element being created
654                self.chars = ""     # Last text seen
655
656            def end_element(self, name):
657                # After each sub element the contents is added to the current
658                # element or to the appropriate list.
659                if name == 'node':
660                    self.nodes.append(self.element)
661                    self.element = { }
662                elif name == 'lan':
663                    self.lans.append(self.element)
664                    self.element = { }
665                elif name in self.str_subelements:
666                    self.element[name] = self.chars
667                    self.chars = ""
668                elif name in self.int_subelements:
669                    self.element[name] = int(self.chars)
670                    self.chars = ""
671                elif name in self.float_subelements:
672                    self.element[name] = float(self.chars)
673                    self.chars = ""
674
675            def found_chars(self, data):
676                self.chars += data.rstrip()
677
678
679        tp = topo_parse();
680        parser = xml.parsers.expat.ParserCreate()
681        parser.EndElementHandler = tp.end_element
682        parser.CharacterDataHandler = tp.found_chars
683
684        parser.Parse(str)
685
686        return tp.topo
687       
688
689    def genviz(self, topo):
690        """
691        Generate the visualization the virtual topology
692        """
693
694        neato = "/usr/local/bin/neato"
695        # These are used to parse neato output and to create the visualization
696        # file.
697        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
698        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
699                "%s</type></node>"
700
701        try:
702            # Node names
703            nodes = [ n['vname'] for n in topo['node'] ]
704            topo_lans = topo['lan']
705        except KeyError, e:
706            raise service_error(service_error.internal, "Bad topology: %s" %e)
707
708        lans = { }
709        links = { }
710
711        # Walk through the virtual topology, organizing the connections into
712        # 2-node connections (links) and more-than-2-node connections (lans).
713        # When a lan is created, it's added to the list of nodes (there's a
714        # node in the visualization for the lan).
715        for l in topo_lans:
716            if links.has_key(l['vname']):
717                if len(links[l['vname']]) < 2:
718                    links[l['vname']].append(l['vnode'])
719                else:
720                    nodes.append(l['vname'])
721                    lans[l['vname']] = links[l['vname']]
722                    del links[l['vname']]
723                    lans[l['vname']].append(l['vnode'])
724            elif lans.has_key(l['vname']):
725                lans[l['vname']].append(l['vnode'])
726            else:
727                links[l['vname']] = [ l['vnode'] ]
728
729
730        # Open up a temporary file for dot to turn into a visualization
731        try:
732            df, dotname = tempfile.mkstemp()
733            dotfile = os.fdopen(df, 'w')
734        except EnvironmentError:
735            raise service_error(service_error.internal,
736                    "Failed to open file in genviz")
737
738        try:
739            dnull = open('/dev/null', 'w')
740        except EnvironmentError:
741            service_error(service_error.internal,
742                    "Failed to open /dev/null in genviz")
743
744        # Generate a dot/neato input file from the links, nodes and lans
745        try:
746            print >>dotfile, "graph G {"
747            for n in nodes:
748                print >>dotfile, '\t"%s"' % n
749            for l in links.keys():
750                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
751            for l in lans.keys():
752                for n in lans[l]:
753                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
754            print >>dotfile, "}"
755            dotfile.close()
756        except TypeError:
757            raise service_error(service_error.internal,
758                    "Single endpoint link in vtopo")
759        except EnvironmentError:
760            raise service_error(service_error.internal, "Cannot write dot file")
761
762        # Use dot to create a visualization
763        try:
764            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
765                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
766                stderr=dnull, close_fds=True)
767        except EnvironmentError:
768            raise service_error(service_error.internal, 
769                    "Cannot generate visualization: is graphviz available?")
770        dnull.close()
771
772        # Translate dot to vis format
773        vis_nodes = [ ]
774        vis = { 'node': vis_nodes }
775        for line in dot.stdout:
776            m = vis_re.match(line)
777            if m:
778                vn = m.group(1)
779                vis_node = {'name': vn, \
780                        'x': float(m.group(2)),\
781                        'y' : float(m.group(3)),\
782                    }
783                if vn in links.keys() or vn in lans.keys():
784                    vis_node['type'] = 'lan'
785                else:
786                    vis_node['type'] = 'node'
787                vis_nodes.append(vis_node)
788        rv = dot.wait()
789
790        os.remove(dotname)
791        if rv == 0 : return vis
792        else: return None
793
794
795    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
796            cert_pwd=None):
797        """
798        Release access to testbed through fedd
799        """
800
801        if not uri and tbmap:
802            uri = tbmap.get(tb, None)
803        if not uri:
804            raise service_error(service_error.server_config, 
805                    "Unknown testbed: %s" % tb)
806
807        if self.local_access.has_key(uri):
808            resp = self.local_access[uri].ReleaseAccess(\
809                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
810                    fedid(file=cert_file))
811            resp = { 'ReleaseAccessResponseBody': resp } 
812        else:
813            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
814                    cert_file, cert_pwd, self.trusted_certs)
815
816        # better error coding
817
818    def remote_ns2topdl(self, uri, desc):
819
820        req = {
821                'description' : { 'ns2description': desc },
822            }
823
824        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
825                self.trusted_certs)
826
827        if r.has_key('Ns2TopdlResponseBody'):
828            r = r['Ns2TopdlResponseBody']
829            ed = r.get('experimentdescription', None)
830            if ed.has_key('topdldescription'):
831                return topdl.Topology(**ed['topdldescription'])
832            else:
833                raise service_error(service_error.protocol, 
834                        "Bad splitter response (no output)")
835        else:
836            raise service_error(service_error.protocol, "Bad splitter response")
837
838    class start_segment:
839        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
840                cert_pwd=None, trusted_certs=None, caller=None,
841                log_collector=None):
842            self.log = log
843            self.debug = debug
844            self.cert_file = cert_file
845            self.cert_pwd = cert_pwd
846            self.trusted_certs = None
847            self.caller = caller
848            self.testbed = testbed
849            self.log_collector = log_collector
850            self.response = None
851            self.node = { }
852            self.proof = None
853
854        def make_map(self, resp):
855            for e in resp.get('embedding', []):
856                if 'toponame' in e and 'physname' in e:
857                    self.node[e['toponame']] = e['physname'][0]
858
859        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
860            req = {
861                    'allocID': { 'fedid' : aid }, 
862                    'segmentdescription': { 
863                        'topdldescription': topo.to_dict(),
864                    },
865                }
866
867            if connInfo:
868                req['connection'] = connInfo
869
870            import_svcs = [ s for m in masters.values() \
871                    for s in m if self.testbed in s.importers]
872
873            if import_svcs or self.testbed in masters:
874                req['service'] = []
875
876            for s in import_svcs:
877                for r in s.reqs:
878                    sr = copy.deepcopy(r)
879                    sr['visibility'] = 'import';
880                    req['service'].append(sr)
881
882            for s in masters.get(self.testbed, []):
883                for r in s.reqs:
884                    sr = copy.deepcopy(r)
885                    sr['visibility'] = 'export';
886                    req['service'].append(sr)
887
888            if attrs:
889                req['fedAttr'] = attrs
890
891            try:
892                self.log.debug("Calling StartSegment at %s " % uri)
893                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
894                        self.trusted_certs)
895                if r.has_key('StartSegmentResponseBody'):
896                    lval = r['StartSegmentResponseBody'].get('allocationLog',
897                            None)
898                    if lval and self.log_collector:
899                        for line in  lval.splitlines(True):
900                            self.log_collector.write(line)
901                    self.make_map(r['StartSegmentResponseBody'])
902                    if 'proof' in r: self.proof = r['proof']
903                    self.response = r
904                else:
905                    raise service_error(service_error.internal, 
906                            "Bad response!?: %s" %r)
907                return True
908            except service_error, e:
909                self.log.error("Start segment failed on %s: %s" % \
910                        (self.testbed, e))
911                return False
912
913
914
915    class terminate_segment:
916        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
917                cert_pwd=None, trusted_certs=None, caller=None):
918            self.log = log
919            self.debug = debug
920            self.cert_file = cert_file
921            self.cert_pwd = cert_pwd
922            self.trusted_certs = None
923            self.caller = caller
924            self.testbed = testbed
925
926        def __call__(self, uri, aid ):
927            req = {
928                    'allocID': aid , 
929                }
930            try:
931                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
932                        self.trusted_certs)
933                return True
934            except service_error, e:
935                self.log.error("Terminate segment failed on %s: %s" % \
936                        (self.testbed, e))
937                return False
938
939    def allocate_resources(self, allocated, masters, eid, expid, 
940            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
941            attrs=None, connInfo={}, tbmap=None, expcert=None):
942
943        started = { }           # Testbeds where a sub-experiment started
944                                # successfully
945
946        # XXX
947        fail_soft = False
948
949        if tbmap is None: tbmap = { }
950
951        log = alloc_log or self.log
952
953        tp = thread_pool(self.nthreads)
954        threads = [ ]
955        starters = [ ]
956
957        if expcert:
958            cert = expcert
959            pw = None
960        else:
961            cert = self.cert_file
962            pw = self.cert_pwd
963
964        for tb in allocated.keys():
965            # Create and start a thread to start the segment, and save it
966            # to get the return value later
967            tb_attrs = copy.copy(attrs)
968            tp.wait_for_slot()
969            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
970            base, suffix = split_testbed(tb)
971            if suffix:
972                tb_attrs.append({'attribute': 'experiment_name', 
973                    'value': "%s-%s" % (eid, suffix)})
974            else:
975                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
976            if not uri:
977                raise service_error(service_error.internal, 
978                        "Unknown testbed %s !?" % tb)
979
980            if tbparams[tb].has_key('allocID') and \
981                    tbparams[tb]['allocID'].has_key('fedid'):
982                aid = tbparams[tb]['allocID']['fedid']
983            else:
984                raise service_error(service_error.internal, 
985                        "No alloc id for testbed %s !?" % tb)
986
987            s = self.start_segment(log=log, debug=self.debug,
988                    testbed=tb, cert_file=cert,
989                    cert_pwd=pw, trusted_certs=self.trusted_certs,
990                    caller=self.call_StartSegment,
991                    log_collector=log_collector)
992            starters.append(s)
993            t  = pooled_thread(\
994                    target=s, name=tb,
995                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
996                    pdata=tp, trace_file=self.trace_file)
997            threads.append(t)
998            t.start()
999
1000        # Wait until all finish (keep pinging the log, though)
1001        mins = 0
1002        revoked = False
1003        while not tp.wait_for_all_done(60.0):
1004            mins += 1
1005            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1006                    % mins)
1007            if not revoked and \
1008                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1009                # a testbed has failed.  Revoke this experiment's
1010                # synchronizarion values so that sub experiments will not
1011                # deadlock waiting for synchronization that will never happen
1012                self.log.info("A subexperiment has failed to swap in, " + \
1013                        "revoking synch keys")
1014                var_key = "fedid:%s" % expid
1015                for k in self.synch_store.all_keys():
1016                    if len(k) > 45 and k[0:46] == var_key:
1017                        self.synch_store.revoke_key(k)
1018                revoked = True
1019
1020        failed = [ t.getName() for t in threads if not t.rv ]
1021        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1022
1023        # If one failed clean up, unless fail_soft is set
1024        if failed:
1025            if not fail_soft:
1026                tp.clear()
1027                for tb in succeeded:
1028                    # Create and start a thread to stop the segment
1029                    tp.wait_for_slot()
1030                    uri = tbparams[tb]['uri']
1031                    t  = pooled_thread(\
1032                            target=self.terminate_segment(log=log,
1033                                testbed=tb,
1034                                cert_file=cert, 
1035                                cert_pwd=pw,
1036                                trusted_certs=self.trusted_certs,
1037                                caller=self.call_TerminateSegment),
1038                            args=(uri, tbparams[tb]['federant']['allocID']),
1039                            name=tb,
1040                            pdata=tp, trace_file=self.trace_file)
1041                    t.start()
1042                # Wait until all finish (if any are being stopped)
1043                if succeeded:
1044                    tp.wait_for_all_done()
1045
1046                # release the allocations
1047                for tb in tbparams.keys():
1048                    try:
1049                        self.release_access(tb, tbparams[tb]['allocID'], 
1050                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1051                                cert_file=cert, cert_pwd=pw)
1052                    except service_error, e:
1053                        self.log.warn("Error releasing access: %s" % e.desc)
1054                # Remove the placeholder
1055                self.state_lock.acquire()
1056                self.state[eid]['experimentStatus'] = 'failed'
1057                if self.state_filename: self.write_state()
1058                self.state_lock.release()
1059                # Remove the repo dir
1060                self.remove_dirs("%s/%s" %(self.repodir, expid))
1061                # Walk up tmpdir, deleting as we go
1062                if self.cleanup:
1063                    self.remove_dirs(tmpdir)
1064                else:
1065                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1066
1067
1068                log.error("Swap in failed on %s" % ",".join(failed))
1069                return
1070        else:
1071            # Walk through the successes and gather the virtual to physical
1072            # mapping.
1073            embedding = [ ]
1074            proofs = { }
1075            for s in starters:
1076                for k, v in s.node.items():
1077                    embedding.append({
1078                        'toponame': k, 
1079                        'physname': [ v],
1080                        'testbed': s.testbed
1081                        })
1082                if s.proof:
1083                    proofs[s.testbed] = s.proof
1084            log.info("[start_segment]: Experiment %s active" % eid)
1085
1086
1087        # Walk up tmpdir, deleting as we go
1088        if self.cleanup:
1089            self.remove_dirs(tmpdir)
1090        else:
1091            log.debug("[start_experiment]: not removing %s" % tmpdir)
1092
1093        # Insert the experiment into our state and update the disk copy.
1094        self.state_lock.acquire()
1095        self.state[expid]['experimentStatus'] = 'active'
1096        self.state[eid] = self.state[expid]
1097        self.state[eid]['experimentdescription']['topdldescription'] = \
1098                top.to_dict()
1099        self.state[eid]['embedding'] = embedding
1100        # Append startup proofs
1101        for f in self.state[eid]['federant']:
1102            if 'name' in f and 'localname' in f['name']:
1103                if f['name']['localname'] in proofs:
1104                    f['proof'].append(proofs[f['name']['localname']])
1105
1106        if self.state_filename: self.write_state()
1107        self.state_lock.release()
1108        return
1109
1110
1111    def add_kit(self, e, kit):
1112        """
1113        Add a Software object created from the list of (install, location)
1114        tuples passed as kit  to the software attribute of an object e.  We
1115        do this enough to break out the code, but it's kind of a hack to
1116        avoid changing the old tuple rep.
1117        """
1118
1119        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1120
1121        if isinstance(e.software, list): e.software.extend(s)
1122        else: e.software = s
1123
1124    def append_experiment_authorization(self, expid, attrs, 
1125            need_state_lock=True):
1126        """
1127        Append the authorization information to system state
1128        """
1129
1130        for p, a in attrs:
1131            self.auth.set_attribute(p, a)
1132        self.auth.save()
1133
1134        if need_state_lock: self.state_lock.acquire()
1135        self.state[expid]['auth'].update(attrs)
1136        if self.state_filename: self.write_state()
1137        if need_state_lock: self.state_lock.release()
1138
1139    def clear_experiment_authorization(self, expid, need_state_lock=True):
1140        """
1141        Attrs is a set of attribute principal pairs that need to be removed
1142        from the authenticator.  Remove them and save the authenticator.
1143        """
1144
1145        if need_state_lock: self.state_lock.acquire()
1146        if expid in self.state and 'auth' in self.state[expid]:
1147            for p, a in self.state[expid]['auth']:
1148                self.auth.unset_attribute(p, a)
1149            self.state[expid]['auth'] = set()
1150        if self.state_filename: self.write_state()
1151        if need_state_lock: self.state_lock.release()
1152        self.auth.save()
1153
1154
1155    def create_experiment_state(self, fid, req, expid, expcert,
1156            state='starting'):
1157        """
1158        Create the initial entry in the experiment's state.  The expid and
1159        expcert are the experiment's fedid and certifacte that represents that
1160        ID, which are installed in the experiment state.  If the request
1161        includes a suggested local name that is used if possible.  If the local
1162        name is already taken by an experiment owned by this user that has
1163        failed, it is overwritten.  Otherwise new letters are added until a
1164        valid localname is found.  The generated local name is returned.
1165        """
1166
1167        if req.has_key('experimentID') and \
1168                req['experimentID'].has_key('localname'):
1169            overwrite = False
1170            eid = req['experimentID']['localname']
1171            # If there's an old failed experiment here with the same local name
1172            # and accessible by this user, we'll overwrite it, otherwise we'll
1173            # fall through and do the collision avoidance.
1174            old_expid = self.get_experiment_fedid(eid)
1175            if old_expid:
1176                users_experiment = True
1177                try:
1178                    self.check_experiment_access(fid, old_expid)
1179                except service_error, e:
1180                    if e.code == service_error.access: users_experiment = False
1181                    else: raise e
1182                if users_experiment:
1183                    self.state_lock.acquire()
1184                    status = self.state[eid].get('experimentStatus', None)
1185                    if status and status == 'failed':
1186                        # remove the old access attributes
1187                        self.clear_experiment_authorization(eid,
1188                                need_state_lock=False)
1189                        overwrite = True
1190                        del self.state[eid]
1191                        del self.state[old_expid]
1192                    self.state_lock.release()
1193                else:
1194                    self.log.info('Experiment %s exists, ' % eid + \
1195                            'but this user cannot access it')
1196            self.state_lock.acquire()
1197            while (self.state.has_key(eid) and not overwrite):
1198                eid += random.choice(string.ascii_letters)
1199            # Initial state
1200            self.state[eid] = {
1201                    'experimentID' : \
1202                            [ { 'localname' : eid }, {'fedid': expid } ],
1203                    'experimentStatus': state,
1204                    'experimentAccess': { 'X509' : expcert },
1205                    'owner': fid,
1206                    'log' : [],
1207                    'auth': set(),
1208                }
1209            self.state[expid] = self.state[eid]
1210            if self.state_filename: self.write_state()
1211            self.state_lock.release()
1212        else:
1213            eid = self.exp_stem
1214            for i in range(0,5):
1215                eid += random.choice(string.ascii_letters)
1216            self.state_lock.acquire()
1217            while (self.state.has_key(eid)):
1218                eid = self.exp_stem
1219                for i in range(0,5):
1220                    eid += random.choice(string.ascii_letters)
1221            # Initial state
1222            self.state[eid] = {
1223                    'experimentID' : \
1224                            [ { 'localname' : eid }, {'fedid': expid } ],
1225                    'experimentStatus': state,
1226                    'experimentAccess': { 'X509' : expcert },
1227                    'owner': fid,
1228                    'log' : [],
1229                    'auth': set(),
1230                }
1231            self.state[expid] = self.state[eid]
1232            if self.state_filename: self.write_state()
1233            self.state_lock.release()
1234
1235        # Let users touch the state.  Authorize this fid and the expid itself
1236        # to touch the experiment, as well as allowing th eoverrides.
1237        self.append_experiment_authorization(eid, 
1238                set([(fid, expid), (expid,expid)] + \
1239                        [ (o, expid) for o in self.overrides]))
1240
1241        return eid
1242
1243
1244    def allocate_ips_to_topo(self, top):
1245        """
1246        Add an ip4_address attribute to all the hosts in the topology, based on
1247        the shared substrates on which they sit.  An /etc/hosts file is also
1248        created and returned as a list of hostfiles entries.  We also return
1249        the allocator, because we may need to allocate IPs to portals
1250        (specifically DRAGON portals).
1251        """
1252        subs = sorted(top.substrates, 
1253                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1254                reverse=True)
1255        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1256        ifs = { }
1257        hosts = [ ]
1258
1259        for idx, s in enumerate(subs):
1260            net_size = len(s.interfaces)+2
1261
1262            a = ips.allocate(net_size)
1263            if a :
1264                base, num = a
1265                if num < net_size: 
1266                    raise service_error(service_error.internal,
1267                            "Allocator returned wrong number of IPs??")
1268            else:
1269                raise service_error(service_error.req, 
1270                        "Cannot allocate IP addresses")
1271            mask = ips.min_alloc
1272            while mask < net_size:
1273                mask *= 2
1274
1275            netmask = ((2**32-1) ^ (mask-1))
1276
1277            base += 1
1278            for i in s.interfaces:
1279                i.attribute.append(
1280                        topdl.Attribute('ip4_address', 
1281                            "%s" % ip_addr(base)))
1282                i.attribute.append(
1283                        topdl.Attribute('ip4_netmask', 
1284                            "%s" % ip_addr(int(netmask))))
1285
1286                hname = i.element.name
1287                if ifs.has_key(hname):
1288                    hosts.append("%s\t%s-%s %s-%d" % \
1289                            (ip_addr(base), hname, s.name, hname,
1290                                ifs[hname]))
1291                else:
1292                    ifs[hname] = 0
1293                    hosts.append("%s\t%s-%s %s-%d %s" % \
1294                            (ip_addr(base), hname, s.name, hname,
1295                                ifs[hname], hname))
1296
1297                ifs[hname] += 1
1298                base += 1
1299        return hosts, ips
1300
1301    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1302            tbparam, masters, tbmap, expid=None, expcert=None):
1303        for tb in testbeds:
1304            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1305                    expcert)
1306            allocated[tb] = 1
1307
1308    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1309            expcert=None):
1310        """
1311        Get access to testbed through fedd and set the parameters for that tb
1312        """
1313        def get_export_project(svcs):
1314            """
1315            Look through for the list of federated_service for this testbed
1316            objects for a project_export service, and extract the project
1317            parameter.
1318            """
1319
1320            pe = [s for s in svcs if s.name=='project_export']
1321            if len(pe) == 1:
1322                return pe[0].params.get('project', None)
1323            elif len(pe) == 0:
1324                return None
1325            else:
1326                raise service_error(service_error.req,
1327                        "More than one project export is not supported")
1328
1329        def add_services(svcs, type, slist, keys):
1330            """
1331            Add the given services to slist.  type is import or export.  Also
1332            add a mapping entry from the assigned id to the original service
1333            record.
1334            """
1335            for i, s in enumerate(svcs):
1336                idx = '%s%d' % (type, i)
1337                keys[idx] = s
1338                sr = {'id': idx, 'name': s.name, 'visibility': type }
1339                if s.params:
1340                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1341                            for k, v in s.params.items()]
1342                slist.append(sr)
1343
1344        uri = tbmap.get(testbed_base(tb), None)
1345        if not uri:
1346            raise service_error(service_error.server_config, 
1347                    "Unknown testbed: %s" % tb)
1348
1349        export_svcs = masters.get(tb,[])
1350        import_svcs = [ s for m in masters.values() \
1351                for s in m \
1352                    if tb in s.importers ]
1353
1354        export_project = get_export_project(export_svcs)
1355        # Compose the credential list so that IDs come before attributes
1356        creds = set()
1357        keys = set()
1358        certs = self.auth.get_creds_for_principal(fid)
1359        if expid:
1360            certs.update(self.auth.get_creds_for_principal(expid))
1361        for c in certs:
1362            keys.add(c.issuer_cert())
1363            creds.add(c.attribute_cert())
1364        creds = list(keys) + list(creds)
1365
1366        if expcert: cert, pw = expcert, None
1367        else: cert, pw = self.cert_file, self.cert_pw
1368
1369        # Request credentials
1370        req = {
1371                'abac_credential': creds,
1372            }
1373        # Make the service request from the services we're importing and
1374        # exporting.  Keep track of the export request ids so we can
1375        # collect the resulting info from the access response.
1376        e_keys = { }
1377        if import_svcs or export_svcs:
1378            slist = []
1379            add_services(import_svcs, 'import', slist, e_keys)
1380            add_services(export_svcs, 'export', slist, e_keys)
1381            req['service'] = slist
1382
1383        if self.local_access.has_key(uri):
1384            # Local access call
1385            req = { 'RequestAccessRequestBody' : req }
1386            r = self.local_access[uri].RequestAccess(req, 
1387                    fedid(file=self.cert_file))
1388            r = { 'RequestAccessResponseBody' : r }
1389        else:
1390            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1391
1392        if r.has_key('RequestAccessResponseBody'):
1393            # Through to here we have a valid response, not a fault.
1394            # Access denied is a fault, so something better or worse than
1395            # access denied has happened.
1396            r = r['RequestAccessResponseBody']
1397            self.log.debug("[get_access] Access granted")
1398        else:
1399            raise service_error(service_error.protocol,
1400                        "Bad proxy response")
1401        if 'proof' not in r:
1402            raise service_error(service_error.protocol,
1403                        "Bad access response (no access proof)")
1404       
1405        tbparam[tb] = { 
1406                "allocID" : r['allocID'],
1407                "uri": uri,
1408                "proof": [ r['proof'] ],
1409                }
1410
1411        # Collect the responses corresponding to the services this testbed
1412        # exports.  These will be the service requests that we will include in
1413        # the start segment requests (with appropriate visibility values) to
1414        # import and export the segments.
1415        for s in r.get('service', []):
1416            id = s.get('id', None)
1417            if id and id in e_keys:
1418                e_keys[id].reqs.append(s)
1419
1420        # Add attributes to parameter space.  We don't allow attributes to
1421        # overlay any parameters already installed.
1422        for a in r.get('fedAttr', []):
1423            try:
1424                if a['attribute'] and \
1425                        isinstance(a['attribute'], basestring)\
1426                        and not tbparam[tb].has_key(a['attribute'].lower()):
1427                    tbparam[tb][a['attribute'].lower()] = a['value']
1428            except KeyError:
1429                self.log.error("Bad attribute in response: %s" % a)
1430
1431
1432    def split_topology(self, top, topo, testbeds):
1433        """
1434        Create the sub-topologies that are needed for experiment instantiation.
1435        """
1436        for tb in testbeds:
1437            topo[tb] = top.clone()
1438            # copy in for loop allows deletions from the original
1439            for e in [ e for e in topo[tb].elements]:
1440                etb = e.get_attribute('testbed')
1441                # NB: elements without a testbed attribute won't appear in any
1442                # sub topologies. 
1443                if not etb or etb != tb:
1444                    for i in e.interface:
1445                        for s in i.subs:
1446                            try:
1447                                s.interfaces.remove(i)
1448                            except ValueError:
1449                                raise service_error(service_error.internal,
1450                                        "Can't remove interface??")
1451                    topo[tb].elements.remove(e)
1452            topo[tb].make_indices()
1453
1454    def confirm_software(self, top):
1455        """
1456        Make sure that the software to be loaded in the topo is all available
1457        before we begin making access requests, etc.  This is a subset of
1458        wrangle_software.
1459        """
1460        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1461        pkgs.update([x.location for e in top.elements for x in e.software])
1462
1463        for pkg in pkgs:
1464            loc = pkg
1465
1466            scheme, host, path = urlparse(loc)[0:3]
1467            dest = os.path.basename(path)
1468            if not scheme:
1469                if not loc.startswith('/'):
1470                    loc = "/%s" % loc
1471                loc = "file://%s" %loc
1472            # NB: if scheme was found, loc == pkg
1473            try:
1474                u = urlopen(loc)
1475                u.close()
1476            except Exception, e:
1477                raise service_error(service_error.req, 
1478                        "Cannot open %s: %s" % (loc, e))
1479        return True
1480
1481    def wrangle_software(self, expid, top, topo, tbparams):
1482        """
1483        Copy software out to the repository directory, allocate permissions and
1484        rewrite the segment topologies to look for the software in local
1485        places.
1486        """
1487
1488        # Copy the rpms and tarfiles to a distribution directory from
1489        # which the federants can retrieve them
1490        linkpath = "%s/software" %  expid
1491        softdir ="%s/%s" % ( self.repodir, linkpath)
1492        softmap = { }
1493
1494        # self.fedkit and self.gateway kit are lists of tuples of
1495        # (install_location, download_location) this extracts the download
1496        # locations.
1497        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1498        pkgs.update([x.location for e in top.elements for x in e.software])
1499        try:
1500            os.makedirs(softdir)
1501        except EnvironmentError, e:
1502            raise service_error(
1503                    "Cannot create software directory: %s" % e)
1504        # The actual copying.  Everything's converted into a url for copying.
1505        auth_attrs = set()
1506        for pkg in pkgs:
1507            loc = pkg
1508
1509            scheme, host, path = urlparse(loc)[0:3]
1510            dest = os.path.basename(path)
1511            if not scheme:
1512                if not loc.startswith('/'):
1513                    loc = "/%s" % loc
1514                loc = "file://%s" %loc
1515            # NB: if scheme was found, loc == pkg
1516            try:
1517                u = urlopen(loc)
1518            except Exception, e:
1519                raise service_error(service_error.req, 
1520                        "Cannot open %s: %s" % (loc, e))
1521            try:
1522                f = open("%s/%s" % (softdir, dest) , "w")
1523                self.log.debug("Writing %s/%s" % (softdir,dest) )
1524                data = u.read(4096)
1525                while data:
1526                    f.write(data)
1527                    data = u.read(4096)
1528                f.close()
1529                u.close()
1530            except Exception, e:
1531                raise service_error(service_error.internal,
1532                        "Could not copy %s: %s" % (loc, e))
1533            path = re.sub("/tmp", "", linkpath)
1534            # XXX
1535            softmap[pkg] = \
1536                    "%s/%s/%s" %\
1537                    ( self.repo_url, path, dest)
1538
1539            # Allow the individual segments to access the software by assigning
1540            # an attribute to each testbed allocation that encodes the data to
1541            # be released.  This expression collects the data for each run of
1542            # the loop.
1543            auth_attrs.update([
1544                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1545                        for tb in tbparams.keys()])
1546
1547        self.append_experiment_authorization(expid, auth_attrs)
1548
1549        # Convert the software locations in the segments into the local
1550        # copies on this host
1551        for soft in [ s for tb in topo.values() \
1552                for e in tb.elements \
1553                    if getattr(e, 'software', False) \
1554                        for s in e.software ]:
1555            if softmap.has_key(soft.location):
1556                soft.location = softmap[soft.location]
1557
1558
1559    def new_experiment(self, req, fid):
1560        """
1561        The external interface to empty initial experiment creation called from
1562        the dispatcher.
1563
1564        Creates a working directory, splits the incoming description using the
1565        splitter script and parses out the avrious subsections using the
1566        lcasses above.  Once each sub-experiment is created, use pooled threads
1567        to instantiate them and start it all up.
1568        """
1569        req = req.get('NewRequestBody', None)
1570        if not req:
1571            raise service_error(service_error.req,
1572                    "Bad request format (no NewRequestBody)")
1573
1574        if self.auth.import_credentials(data_list=req.get('credential', [])):
1575            self.auth.save()
1576
1577        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1578                with_proof=True)
1579
1580        if not access_ok:
1581            raise service_error(service_error.access, "New access denied",
1582                    proof=[proof])
1583
1584        try:
1585            tmpdir = tempfile.mkdtemp(prefix="split-")
1586        except EnvironmentError:
1587            raise service_error(service_error.internal, "Cannot create tmp dir")
1588
1589        try:
1590            access_user = self.accessdb[fid]
1591        except KeyError:
1592            raise service_error(service_error.internal,
1593                    "Access map and authorizer out of sync in " + \
1594                            "new_experiment for fedid %s"  % fid)
1595
1596        # Generate an ID for the experiment (slice) and a certificate that the
1597        # allocator can use to prove they own it.  We'll ship it back through
1598        # the encrypted connection.  If the requester supplied one, use it.
1599        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1600            expcert = req['experimentAccess']['X509']
1601            expid = fedid(certstr=expcert)
1602            self.state_lock.acquire()
1603            if expid in self.state:
1604                self.state_lock.release()
1605                raise service_error(service_error.req, 
1606                        'fedid %s identifies an existing experiment' % expid)
1607            self.state_lock.release()
1608        else:
1609            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1610
1611        #now we're done with the tmpdir, and it should be empty
1612        if self.cleanup:
1613            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1614            os.rmdir(tmpdir)
1615        else:
1616            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1617
1618        eid = self.create_experiment_state(fid, req, expid, expcert, 
1619                state='empty')
1620
1621        rv = {
1622                'experimentID': [
1623                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1624                ],
1625                'experimentStatus': 'empty',
1626                'experimentAccess': { 'X509' : expcert },
1627                'proof': proof.to_dict(),
1628            }
1629
1630        return rv
1631
1632    # create_experiment sub-functions
1633
1634    @staticmethod
1635    def get_experiment_key(req, field='experimentID'):
1636        """
1637        Parse the experiment identifiers out of the request (the request body
1638        tag has been removed).  Specifically this pulls either the fedid or the
1639        localname out of the experimentID field.  A fedid is preferred.  If
1640        neither is present or the request does not contain the fields,
1641        service_errors are raised.
1642        """
1643        # Get the experiment access
1644        exp = req.get(field, None)
1645        if exp:
1646            if exp.has_key('fedid'):
1647                key = exp['fedid']
1648            elif exp.has_key('localname'):
1649                key = exp['localname']
1650            else:
1651                raise service_error(service_error.req, "Unknown lookup type")
1652        else:
1653            raise service_error(service_error.req, "No request?")
1654
1655        return key
1656
1657    def get_experiment_ids_and_start(self, key, tmpdir):
1658        """
1659        Get the experiment name, id and access certificate from the state, and
1660        set the experiment state to 'starting'.  returns a triple (fedid,
1661        localname, access_cert_file). The access_cert_file is a copy of the
1662        contents of the access certificate, created in the tempdir with
1663        restricted permissions.  If things are confused, raise an exception.
1664        """
1665
1666        expid = eid = None
1667        self.state_lock.acquire()
1668        if self.state.has_key(key):
1669            self.state[key]['experimentStatus'] = "starting"
1670            for e in self.state[key].get('experimentID',[]):
1671                if not expid and e.has_key('fedid'):
1672                    expid = e['fedid']
1673                elif not eid and e.has_key('localname'):
1674                    eid = e['localname']
1675            if 'experimentAccess' in self.state[key] and \
1676                    'X509' in self.state[key]['experimentAccess']:
1677                expcert = self.state[key]['experimentAccess']['X509']
1678            else:
1679                expcert = None
1680        self.state_lock.release()
1681
1682        # make a protected copy of the access certificate so the experiment
1683        # controller can act as the experiment principal.
1684        if expcert:
1685            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1686            if not expcert_file:
1687                raise service_error(service_error.internal, 
1688                        "Cannot create temp cert file?")
1689        else:
1690            expcert_file = None
1691
1692        return (eid, expid, expcert_file)
1693
1694    def get_topology(self, req, tmpdir):
1695        """
1696        Get the ns2 content and put it into a file for parsing.  Call the local
1697        or remote parser and return the topdl.Topology.  Errors result in
1698        exceptions.  req is the request and tmpdir is a work directory.
1699        """
1700
1701        # The tcl parser needs to read a file so put the content into that file
1702        descr=req.get('experimentdescription', None)
1703        if descr:
1704            if 'ns2description' in descr:
1705                file_content=descr['ns2description']
1706            elif 'topdldescription' in descr:
1707                return topdl.Topology(**descr['topdldescription'])
1708            else:
1709                raise service_error(service_error.req, 
1710                        'Unknown experiment description type')
1711        else:
1712            raise service_error(service_error.req, "No experiment description")
1713
1714
1715        if self.splitter_url:
1716            self.log.debug("Calling remote topdl translator at %s" % \
1717                    self.splitter_url)
1718            top = self.remote_ns2topdl(self.splitter_url, file_content)
1719        else:
1720            tclfile = os.path.join(tmpdir, "experiment.tcl")
1721            if file_content:
1722                try:
1723                    f = open(tclfile, 'w')
1724                    f.write(file_content)
1725                    f.close()
1726                except EnvironmentError:
1727                    raise service_error(service_error.internal,
1728                            "Cannot write temp experiment description")
1729            else:
1730                raise service_error(service_error.req, 
1731                        "Only ns2descriptions supported")
1732            pid = "dummy"
1733            gid = "dummy"
1734            eid = "dummy"
1735
1736            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1737                str(self.muxmax), '-m', 'dummy']
1738
1739            tclcmd.extend([pid, gid, eid, tclfile])
1740
1741            self.log.debug("running local splitter %s", " ".join(tclcmd))
1742            # This is just fantastic.  As a side effect the parser copies
1743            # tb_compat.tcl into the current directory, so that directory
1744            # must be writable by the fedd user.  Doing this in the
1745            # temporary subdir ensures this is the case.
1746            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1747                    cwd=tmpdir)
1748            split_data = tclparser.stdout
1749
1750            top = topdl.topology_from_xml(file=split_data, top="experiment")
1751            os.remove(tclfile)
1752
1753        return top
1754
1755    def get_testbed_services(self, req, testbeds):
1756        """
1757        Parse the services section of the request into into two dicts mapping
1758        testbed to lists of federated_service objects.  The first lists all
1759        exporters of services, and the second all exporters of services that
1760        need control portals in the experiment.
1761        """
1762        masters = { }
1763        pmasters = { }
1764        for s in req.get('service', []):
1765            # If this is a service request with the importall field
1766            # set, fill it out.
1767
1768            if s.get('importall', False):
1769                s['import'] = [ tb for tb in testbeds \
1770                        if tb not in s.get('export',[])]
1771                del s['importall']
1772
1773            # Add the service to masters
1774            for tb in s.get('export', []):
1775                if s.get('name', None):
1776
1777                    params = { }
1778                    for a in s.get('fedAttr', []):
1779                        params[a.get('attribute', '')] = a.get('value','')
1780
1781                    fser = federated_service(name=s['name'],
1782                            exporter=tb, importers=s.get('import',[]),
1783                            params=params)
1784                    if fser.name == 'hide_hosts' \
1785                            and 'hosts' not in fser.params:
1786                        fser.params['hosts'] = \
1787                                ",".join(tb_hosts.get(fser.exporter, []))
1788                    if tb in masters: masters[tb].append(fser)
1789                    else: masters[tb] = [fser]
1790
1791                    if fser.portal:
1792                        if tb not in pmasters: pmasters[tb] = [ fser ]
1793                        else: pmasters[tb].append(fser)
1794                else:
1795                    self.log.error('Testbed service does not have name " + \
1796                            "and importers')
1797        return masters, pmasters
1798
1799    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1800        """
1801        Create the ssh keys necessary for interconnecting the portal nodes and
1802        the global hosts file for letting each segment know about the IP
1803        addresses in play.  Save these into the repo.  Add attributes to the
1804        autorizer allowing access controllers to download them and return a set
1805        of attributes that inform the segments where to find this stuff.  May
1806        raise service_errors in if there are problems.
1807        """
1808        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1809        gw_secretkey_base = "fed.%s" % self.ssh_type
1810        keydir = os.path.join(tmpdir, 'keys')
1811        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1812        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1813
1814        try:
1815            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1816        except ValueError:
1817            raise service_error(service_error.server_config, 
1818                    "Bad key type (%s)" % self.ssh_type)
1819
1820        self.generate_seer_certs(keydir)
1821
1822        # Copy configuration files into the remote file store
1823        # The config urlpath
1824        configpath = "/%s/config" % expid
1825        # The config file system location
1826        configdir ="%s%s" % ( self.repodir, configpath)
1827        try:
1828            os.makedirs(configdir)
1829        except EnvironmentError, e:
1830            raise service_error(service_error.internal,
1831                    "Cannot create config directory: %s" % e)
1832        try:
1833            f = open("%s/hosts" % configdir, "w")
1834            print >> f, string.join(hosts, '\n')
1835            f.close()
1836        except EnvironmentError, e:
1837            raise service_error(service_error.internal, 
1838                    "Cannot write hosts file: %s" % e)
1839        try:
1840            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1841            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1842            copy_file(os.path.join(keydir, 'ca.pem'), 
1843                    os.path.join(configdir, 'ca.pem'))
1844            copy_file(os.path.join(keydir, 'node.pem'), 
1845                    os.path.join(configdir, 'node.pem'))
1846        except EnvironmentError, e:
1847            raise service_error(service_error.internal, 
1848                    "Cannot copy keyfiles: %s" % e)
1849
1850        # Allow the individual testbeds to access the configuration files,
1851        # again by setting an attribute for the relevant pathnames on each
1852        # allocation principal.  Yeah, that's a long list comprehension.
1853        self.append_experiment_authorization(expid, set([
1854            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1855                    for tb in tbparams.keys() \
1856                        for f in ("hosts", 'ca.pem', 'node.pem', 
1857                            gw_secretkey_base, gw_pubkey_base)]))
1858
1859        attrs = [ 
1860                {
1861                    'attribute': 'ssh_pubkey', 
1862                    'value': '%s/%s/config/%s' % \
1863                            (self.repo_url, expid, gw_pubkey_base)
1864                },
1865                {
1866                    'attribute': 'ssh_secretkey', 
1867                    'value': '%s/%s/config/%s' % \
1868                            (self.repo_url, expid, gw_secretkey_base)
1869                },
1870                {
1871                    'attribute': 'hosts', 
1872                    'value': '%s/%s/config/hosts' % \
1873                            (self.repo_url, expid)
1874                },
1875                {
1876                    'attribute': 'seer_ca_pem', 
1877                    'value': '%s/%s/config/%s' % \
1878                            (self.repo_url, expid, 'ca.pem')
1879                },
1880                {
1881                    'attribute': 'seer_node_pem', 
1882                    'value': '%s/%s/config/%s' % \
1883                            (self.repo_url, expid, 'node.pem')
1884                },
1885            ]
1886        return attrs
1887
1888
1889    def get_vtopo(self, req, fid):
1890        """
1891        Return the stored virtual topology for this experiment
1892        """
1893        rv = None
1894        state = None
1895
1896        req = req.get('VtopoRequestBody', None)
1897        if not req:
1898            raise service_error(service_error.req,
1899                    "Bad request format (no VtopoRequestBody)")
1900        exp = req.get('experiment', None)
1901        if exp:
1902            if exp.has_key('fedid'):
1903                key = exp['fedid']
1904                keytype = "fedid"
1905            elif exp.has_key('localname'):
1906                key = exp['localname']
1907                keytype = "localname"
1908            else:
1909                raise service_error(service_error.req, "Unknown lookup type")
1910        else:
1911            raise service_error(service_error.req, "No request?")
1912
1913        proof = self.check_experiment_access(fid, key)
1914
1915        self.state_lock.acquire()
1916        if self.state.has_key(key):
1917            if self.state[key].has_key('vtopo'):
1918                rv = { 'experiment' : {keytype: key },
1919                        'vtopo': self.state[key]['vtopo'],
1920                        'proof': proof.to_dict(), 
1921                    }
1922            else:
1923                state = self.state[key]['experimentStatus']
1924        self.state_lock.release()
1925
1926        if rv: return rv
1927        else: 
1928            if state:
1929                raise service_error(service_error.partial, 
1930                        "Not ready: %s" % state)
1931            else:
1932                raise service_error(service_error.req, "No such experiment")
1933
1934    def get_vis(self, req, fid):
1935        """
1936        Return the stored visualization for this experiment
1937        """
1938        rv = None
1939        state = None
1940
1941        req = req.get('VisRequestBody', None)
1942        if not req:
1943            raise service_error(service_error.req,
1944                    "Bad request format (no VisRequestBody)")
1945        exp = req.get('experiment', None)
1946        if exp:
1947            if exp.has_key('fedid'):
1948                key = exp['fedid']
1949                keytype = "fedid"
1950            elif exp.has_key('localname'):
1951                key = exp['localname']
1952                keytype = "localname"
1953            else:
1954                raise service_error(service_error.req, "Unknown lookup type")
1955        else:
1956            raise service_error(service_error.req, "No request?")
1957
1958        proof = self.check_experiment_access(fid, key)
1959
1960        self.state_lock.acquire()
1961        if self.state.has_key(key):
1962            if self.state[key].has_key('vis'):
1963                rv =  { 'experiment' : {keytype: key },
1964                        'vis': self.state[key]['vis'],
1965                        'proof': proof.to_dict(), 
1966                        }
1967            else:
1968                state = self.state[key]['experimentStatus']
1969        self.state_lock.release()
1970
1971        if rv: return rv
1972        else:
1973            if state:
1974                raise service_error(service_error.partial, 
1975                        "Not ready: %s" % state)
1976            else:
1977                raise service_error(service_error.req, "No such experiment")
1978
1979   
1980    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1981            top):
1982        """
1983        Store the various data that have changed in the experiment state
1984        between when it was started and the beginning of resource allocation.
1985        This is basically the information about each local allocation.  This
1986        fills in the values of the placeholder allocation in the state.  It
1987        also collects the access proofs and returns them as dicts for a
1988        response message.
1989        """
1990        # save federant information
1991        for k in allocated.keys():
1992            tbparams[k]['federant'] = {
1993                    'name': [ { 'localname' : eid} ],
1994                    'allocID' : tbparams[k]['allocID'],
1995                    'uri': tbparams[k]['uri'],
1996                    'proof': tbparams[k]['proof'],
1997                }
1998
1999        self.state_lock.acquire()
2000        self.state[eid]['vtopo'] = vtopo
2001        self.state[eid]['vis'] = vis
2002        self.state[eid]['experimentdescription'] = \
2003                { 'topdldescription': top.to_dict() }
2004        self.state[eid]['federant'] = \
2005                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2006                    if tbparams[tb].has_key('federant') ]
2007        # Access proofs for the response message
2008        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2009                    for p in tbparams[k]['federant']['proof']]
2010        if self.state_filename: 
2011            self.write_state()
2012        self.state_lock.release()
2013        return proofs
2014
2015    def clear_placeholder(self, eid, expid, tmpdir):
2016        """
2017        Clear the placeholder and remove any allocated temporary dir.
2018        """
2019
2020        self.state_lock.acquire()
2021        del self.state[eid]
2022        del self.state[expid]
2023        if self.state_filename: self.write_state()
2024        self.state_lock.release()
2025        if tmpdir and self.cleanup:
2026            self.remove_dirs(tmpdir)
2027
2028    # end of create_experiment sub-functions
2029
2030    def create_experiment(self, req, fid):
2031        """
2032        The external interface to experiment creation called from the
2033        dispatcher.
2034
2035        Creates a working directory, splits the incoming description using the
2036        splitter script and parses out the various subsections using the
2037        classes above.  Once each sub-experiment is created, use pooled threads
2038        to instantiate them and start it all up.
2039        """
2040
2041        req = req.get('CreateRequestBody', None)
2042        if req:
2043            key = self.get_experiment_key(req)
2044        else:
2045            raise service_error(service_error.req,
2046                    "Bad request format (no CreateRequestBody)")
2047
2048        # Import information from the requester
2049        if self.auth.import_credentials(data_list=req.get('credential', [])):
2050            self.auth.save()
2051
2052        # Make sure that the caller can talk to us
2053        proof = self.check_experiment_access(fid, key)
2054
2055        # Install the testbed map entries supplied with the request into a copy
2056        # of the testbed map.
2057        tbmap = dict(self.tbmap)
2058        for m in req.get('testbedmap', []):
2059            if 'testbed' in m and 'uri' in m:
2060                tbmap[m['testbed']] = m['uri']
2061
2062        # a place to work
2063        try:
2064            tmpdir = tempfile.mkdtemp(prefix="split-")
2065            os.mkdir(tmpdir+"/keys")
2066        except EnvironmentError:
2067            raise service_error(service_error.internal, "Cannot create tmp dir")
2068
2069        tbparams = { }
2070
2071        eid, expid, expcert_file = \
2072                self.get_experiment_ids_and_start(key, tmpdir)
2073
2074        # This catches exceptions to clear the placeholder if necessary
2075        try: 
2076            if not (eid and expid):
2077                raise service_error(service_error.internal, 
2078                        "Cannot find local experiment info!?")
2079
2080            top = self.get_topology(req, tmpdir)
2081            self.confirm_software(top)
2082            # Assign the IPs
2083            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2084            # Find the testbeds to look up
2085            tb_hosts = { }
2086            testbeds = [ ]
2087            for e in top.elements:
2088                if isinstance(e, topdl.Computer):
2089                    tb = e.get_attribute('testbed') or 'default'
2090                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2091                    else: 
2092                        tb_hosts[tb] = [ e.name ]
2093                        testbeds.append(tb)
2094
2095            masters, pmasters = self.get_testbed_services(req, testbeds)
2096            allocated = { }         # Testbeds we can access
2097            topo ={ }               # Sub topologies
2098            connInfo = { }          # Connection information
2099
2100            self.split_topology(top, topo, testbeds)
2101
2102            self.get_access_to_testbeds(testbeds, fid, allocated, 
2103                    tbparams, masters, tbmap, expid, expcert_file)
2104
2105            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2106
2107            part = experiment_partition(self.auth, self.store_url, tbmap,
2108                    self.muxmax, self.direct_transit)
2109            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2110                    connInfo, expid)
2111
2112            auth_attrs = set()
2113            # Now get access to the dynamic testbeds (those added above)
2114            for tb in [ t for t in topo if t not in allocated]:
2115                self.get_access(tb, tbparams, fid, masters, tbmap, 
2116                        expid, expcert_file)
2117                allocated[tb] = 1
2118                store_keys = topo[tb].get_attribute('store_keys')
2119                # Give the testbed access to keys it exports or imports
2120                if store_keys:
2121                    auth_attrs.update(set([
2122                        (tbparams[tb]['allocID']['fedid'], sk) \
2123                                for sk in store_keys.split(" ")]))
2124
2125            if auth_attrs:
2126                self.append_experiment_authorization(expid, auth_attrs)
2127
2128            # transit and disconnected testbeds may not have a connInfo entry.
2129            # Fill in the blanks.
2130            for t in allocated.keys():
2131                if not connInfo.has_key(t):
2132                    connInfo[t] = { }
2133
2134            self.wrangle_software(expid, top, topo, tbparams)
2135
2136            vtopo = topdl.topology_to_vtopo(top)
2137            vis = self.genviz(vtopo)
2138            proofs = self.save_federant_information(allocated, tbparams, 
2139                    eid, vtopo, vis, top)
2140        except service_error, e:
2141            # If something goes wrong in the parse (usually an access error)
2142            # clear the placeholder state.  From here on out the code delays
2143            # exceptions.  Failing at this point returns a fault to the remote
2144            # caller.
2145            self.clear_placeholder(eid, expid, tmpdir)
2146            raise e
2147
2148        # Start the background swapper and return the starting state.  From
2149        # here on out, the state will stick around a while.
2150
2151        # Create a logger that logs to the experiment's state object as well as
2152        # to the main log file.
2153        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2154        alloc_collector = self.list_log(self.state[eid]['log'])
2155        h = logging.StreamHandler(alloc_collector)
2156        # XXX: there should be a global one of these rather than repeating the
2157        # code.
2158        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2159                    '%d %b %y %H:%M:%S'))
2160        alloc_log.addHandler(h)
2161
2162        # Start a thread to do the resource allocation
2163        t  = Thread(target=self.allocate_resources,
2164                args=(allocated, masters, eid, expid, tbparams, 
2165                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2166                    connInfo, tbmap, expcert_file),
2167                name=eid)
2168        t.start()
2169
2170        rv = {
2171                'experimentID': [
2172                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2173                ],
2174                'experimentStatus': 'starting',
2175                'proof': [ proof.to_dict() ] + proofs,
2176            }
2177
2178        return rv
2179   
2180    def get_experiment_fedid(self, key):
2181        """
2182        find the fedid associated with the localname key in the state database.
2183        """
2184
2185        rv = None
2186        self.state_lock.acquire()
2187        if self.state.has_key(key):
2188            if isinstance(self.state[key], dict):
2189                try:
2190                    kl = [ f['fedid'] for f in \
2191                            self.state[key]['experimentID']\
2192                                if f.has_key('fedid') ]
2193                except KeyError:
2194                    self.state_lock.release()
2195                    raise service_error(service_error.internal, 
2196                            "No fedid for experiment %s when getting "+\
2197                                    "fedid(!?)" % key)
2198                if len(kl) == 1:
2199                    rv = kl[0]
2200                else:
2201                    self.state_lock.release()
2202                    raise service_error(service_error.internal, 
2203                            "multiple fedids for experiment %s when " +\
2204                                    "getting fedid(!?)" % key)
2205            else:
2206                self.state_lock.release()
2207                raise service_error(service_error.internal, 
2208                        "Unexpected state for %s" % key)
2209        self.state_lock.release()
2210        return rv
2211
2212    def check_experiment_access(self, fid, key):
2213        """
2214        Confirm that the fid has access to the experiment.  Though a request
2215        may be made in terms of a local name, the access attribute is always
2216        the experiment's fedid.
2217        """
2218        if not isinstance(key, fedid):
2219            key = self.get_experiment_fedid(key)
2220
2221        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2222
2223        if access_ok:
2224            return proof
2225        else:
2226            raise service_error(service_error.access, "Access Denied",
2227                proof)
2228
2229
2230    def get_handler(self, path, fid):
2231        """
2232        Perhaps surprisingly named, this function handles HTTP GET requests to
2233        this server (SOAP requests are POSTs).
2234        """
2235        self.log.info("Get handler %s %s" % (path, fid))
2236        # XXX: log proofs?
2237        if self.auth.check_attribute(fid, path):
2238            return ("%s/%s" % (self.repodir, path), "application/binary")
2239        else:
2240            return (None, None)
2241
2242    def clean_info_response(self, rv, proof):
2243        """
2244        Remove the information in the experiment's state object that is not in
2245        the info response.
2246        """
2247        # Remove the owner info (should always be there, but...)
2248        if rv.has_key('owner'): del rv['owner']
2249        if 'auth' in rv: del rv['auth']
2250
2251        # Convert the log into the allocationLog parameter and remove the
2252        # log entry (with defensive programming)
2253        if rv.has_key('log'):
2254            rv['allocationLog'] = "".join(rv['log'])
2255            del rv['log']
2256        else:
2257            rv['allocationLog'] = ""
2258
2259        if rv['experimentStatus'] != 'active':
2260            if rv.has_key('federant'): del rv['federant']
2261        else:
2262            # remove the allocationID and uri info from each federant
2263            for f in rv.get('federant', []):
2264                if f.has_key('allocID'): del f['allocID']
2265                if f.has_key('uri'): del f['uri']
2266        rv['proof'] = proof.to_dict()
2267
2268        return rv
2269
2270    def get_info(self, req, fid):
2271        """
2272        Return all the stored info about this experiment
2273        """
2274        rv = None
2275
2276        req = req.get('InfoRequestBody', None)
2277        if not req:
2278            raise service_error(service_error.req,
2279                    "Bad request format (no InfoRequestBody)")
2280        exp = req.get('experiment', None)
2281        if exp:
2282            if exp.has_key('fedid'):
2283                key = exp['fedid']
2284                keytype = "fedid"
2285            elif exp.has_key('localname'):
2286                key = exp['localname']
2287                keytype = "localname"
2288            else:
2289                raise service_error(service_error.req, "Unknown lookup type")
2290        else:
2291            raise service_error(service_error.req, "No request?")
2292
2293        proof = self.check_experiment_access(fid, key)
2294
2295        # The state may be massaged by the service function that called
2296        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2297        # state.
2298        self.state_lock.acquire()
2299        if self.state.has_key(key):
2300            rv = copy.deepcopy(self.state[key])
2301        self.state_lock.release()
2302
2303        if rv:
2304            return self.clean_info_response(rv, proof)
2305        else:
2306            raise service_error(service_error.req, "No such experiment")
2307
2308    def get_multi_info(self, req, fid):
2309        """
2310        Return all the stored info that this fedid can access
2311        """
2312        rv = { 'info': [ ], 'proof': [ ] }
2313
2314        self.state_lock.acquire()
2315        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2316            try:
2317                proof = self.check_experiment_access(fid, key)
2318            except service_error, e:
2319                if e.code == service_error.access:
2320                    continue
2321                else:
2322                    self.state_lock.release()
2323                    raise e
2324
2325            if self.state.has_key(key):
2326                e = copy.deepcopy(self.state[key])
2327                e = self.clean_info_response(e, proof)
2328                rv['info'].append(e)
2329                rv['proof'].append(proof.to_dict())
2330        self.state_lock.release()
2331        return rv
2332
2333    def check_termination_status(self, fed_exp, force):
2334        """
2335        Confirm that the experiment is sin a valid state to stop (or force it)
2336        return the state - invalid states for deletion and force settings cause
2337        exceptions.
2338        """
2339        self.state_lock.acquire()
2340        status = fed_exp.get('experimentStatus', None)
2341
2342        if status:
2343            if status in ('starting', 'terminating'):
2344                if not force:
2345                    self.state_lock.release()
2346                    raise service_error(service_error.partial, 
2347                            'Experiment still being created or destroyed')
2348                else:
2349                    self.log.warning('Experiment in %s state ' % status + \
2350                            'being terminated by force.')
2351            self.state_lock.release()
2352            return status
2353        else:
2354            # No status??? trouble
2355            self.state_lock.release()
2356            raise service_error(service_error.internal,
2357                    "Experiment has no status!?")
2358
2359
2360    def get_termination_info(self, fed_exp):
2361        ids = []
2362        term_params = { }
2363        self.state_lock.acquire()
2364        #  experimentID is a list of dicts that are self-describing
2365        #  identifiers.  This finds all the fedids and localnames - the
2366        #  keys of self.state - and puts them into ids, which is used to delete
2367        #  the state after everything is swapped out.
2368        for id in fed_exp.get('experimentID', []):
2369            if 'fedid' in id: 
2370                ids.append(id['fedid'])
2371                repo = "%s" % id['fedid']
2372            if 'localname' in id: ids.append(id['localname'])
2373
2374        # Get the experimentAccess - the principal for this experiment.  It
2375        # is this principal to which credentials have been delegated, and
2376        # as which the experiment controller must act.
2377        if 'experimentAccess' in fed_exp and \
2378                'X509' in fed_exp['experimentAccess']:
2379            expcert = fed_exp['experimentAccess']['X509']
2380        else:
2381            expcert = None
2382
2383        # Collect the allocation/segment ids into a dict keyed by the fedid
2384        # of the allocation (or a monotonically increasing integer) that
2385        # contains a tuple of uri, aid (which is a dict...)
2386        for i, fed in enumerate(fed_exp.get('federant', [])):
2387            try:
2388                uri = fed['uri']
2389                aid = fed['allocID']
2390                k = fed['allocID'].get('fedid', i)
2391            except KeyError, e:
2392                continue
2393            term_params[k] = (uri, aid)
2394        # Change the experiment state
2395        fed_exp['experimentStatus'] = 'terminating'
2396        if self.state_filename: self.write_state()
2397        self.state_lock.release()
2398
2399        return ids, term_params, expcert, repo
2400
2401
2402    def deallocate_resources(self, term_params, expcert, status, force, 
2403            dealloc_log):
2404        tmpdir = None
2405        # This try block makes sure the tempdir is cleared
2406        try:
2407            # If no expcert, try the deallocation as the experiment
2408            # controller instance.
2409            if expcert and self.auth_type != 'legacy': 
2410                try:
2411                    tmpdir = tempfile.mkdtemp(prefix="term-")
2412                except EnvironmentError:
2413                    raise service_error(service_error.internal, 
2414                            "Cannot create tmp dir")
2415                cert_file = self.make_temp_certfile(expcert, tmpdir)
2416                pw = None
2417            else: 
2418                cert_file = self.cert_file
2419                pw = self.cert_pwd
2420
2421            # Stop everyone.  NB, wait_for_all waits until a thread starts
2422            # and then completes, so we can't wait if nothing starts.  So,
2423            # no tbparams, no start.
2424            if len(term_params) > 0:
2425                tp = thread_pool(self.nthreads)
2426                for k, (uri, aid) in term_params.items():
2427                    # Create and start a thread to stop the segment
2428                    tp.wait_for_slot()
2429                    t  = pooled_thread(\
2430                            target=self.terminate_segment(log=dealloc_log,
2431                                testbed=uri,
2432                                cert_file=cert_file, 
2433                                cert_pwd=pw,
2434                                trusted_certs=self.trusted_certs,
2435                                caller=self.call_TerminateSegment),
2436                            args=(uri, aid), name=k,
2437                            pdata=tp, trace_file=self.trace_file)
2438                    t.start()
2439                # Wait for completions
2440                tp.wait_for_all_done()
2441
2442            # release the allocations (failed experiments have done this
2443            # already, and starting experiments may be in odd states, so we
2444            # ignore errors releasing those allocations
2445            try: 
2446                for k, (uri, aid)  in term_params.items():
2447                    self.release_access(None, aid, uri=uri,
2448                            cert_file=cert_file, cert_pwd=pw)
2449            except service_error, e:
2450                if status != 'failed' and not force:
2451                    raise e
2452
2453        # Clean up the tmpdir no matter what
2454        finally:
2455            if tmpdir: self.remove_dirs(tmpdir)
2456
2457    def terminate_experiment(self, req, fid):
2458        """
2459        Swap this experiment out on the federants and delete the shared
2460        information
2461        """
2462        tbparams = { }
2463        req = req.get('TerminateRequestBody', None)
2464        if not req:
2465            raise service_error(service_error.req,
2466                    "Bad request format (no TerminateRequestBody)")
2467
2468        key = self.get_experiment_key(req, 'experiment')
2469        proof = self.check_experiment_access(fid, key)
2470        exp = req.get('experiment', False)
2471        force = req.get('force', False)
2472
2473        dealloc_list = [ ]
2474
2475
2476        # Create a logger that logs to the dealloc_list as well as to the main
2477        # log file.
2478        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2479        h = logging.StreamHandler(self.list_log(dealloc_list))
2480        # XXX: there should be a global one of these rather than repeating the
2481        # code.
2482        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2483                    '%d %b %y %H:%M:%S'))
2484        dealloc_log.addHandler(h)
2485
2486        self.state_lock.acquire()
2487        fed_exp = self.state.get(key, None)
2488        self.state_lock.release()
2489        repo = None
2490
2491        if fed_exp:
2492            status = self.check_termination_status(fed_exp, force)
2493            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2494            self.deallocate_resources(term_params, expcert, status, force, 
2495                    dealloc_log)
2496
2497            # Remove the terminated experiment
2498            self.state_lock.acquire()
2499            for id in ids:
2500                self.clear_experiment_authorization(id, need_state_lock=False)
2501                if id in self.state: del self.state[id]
2502
2503            if self.state_filename: self.write_state()
2504            self.state_lock.release()
2505
2506            # Delete any synch points associated with this experiment.  All
2507            # synch points begin with the fedid of the experiment.
2508            fedid_keys = set(["fedid:%s" % f for f in ids \
2509                    if isinstance(f, fedid)])
2510            for k in self.synch_store.all_keys():
2511                try:
2512                    if len(k) > 45 and k[0:46] in fedid_keys:
2513                        self.synch_store.del_value(k)
2514                except synch_store.BadDeletionError:
2515                    pass
2516            self.write_store()
2517
2518            # Remove software and other cached stuff from the filesystem.
2519            if repo:
2520                self.remove_dirs("%s/%s" % (self.repodir, repo))
2521       
2522            return { 
2523                    'experiment': exp , 
2524                    'deallocationLog': string.join(dealloc_list, ''),
2525                    'proof': [proof.to_dict()],
2526                    }
2527        else:
2528            raise service_error(service_error.req, "No saved state")
2529
2530
2531    def GetValue(self, req, fid):
2532        """
2533        Get a value from the synchronized store
2534        """
2535        req = req.get('GetValueRequestBody', None)
2536        if not req:
2537            raise service_error(service_error.req,
2538                    "Bad request format (no GetValueRequestBody)")
2539       
2540        name = req.get('name', None)
2541        wait = req.get('wait', False)
2542        rv = { 'name': name }
2543
2544        if not name:
2545            raise service_error(service_error.req, "No name?")
2546
2547        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2548
2549        if access_ok:
2550            self.log.debug("[GetValue] asking for %s " % name)
2551            try:
2552                v = self.synch_store.get_value(name, wait)
2553            except synch_store.RevokedKeyError:
2554                # No more synch on this key
2555                raise service_error(service_error.federant, 
2556                        "Synch key %s revoked" % name)
2557            if v is not None:
2558                rv['value'] = v
2559            rv['proof'] = proof.to_dict()
2560            self.log.debug("[GetValue] got %s from %s" % (v, name))
2561            return rv
2562        else:
2563            raise service_error(service_error.access, "Access Denied",
2564                    proof=proof)
2565       
2566
2567    def SetValue(self, req, fid):
2568        """
2569        Set a value in the synchronized store
2570        """
2571        req = req.get('SetValueRequestBody', None)
2572        if not req:
2573            raise service_error(service_error.req,
2574                    "Bad request format (no SetValueRequestBody)")
2575       
2576        name = req.get('name', None)
2577        v = req.get('value', '')
2578
2579        if not name:
2580            raise service_error(service_error.req, "No name?")
2581
2582        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2583
2584        if access_ok:
2585            try:
2586                self.synch_store.set_value(name, v)
2587                self.write_store()
2588                self.log.debug("[SetValue] set %s to %s" % (name, v))
2589            except synch_store.CollisionError:
2590                # Translate into a service_error
2591                raise service_error(service_error.req,
2592                        "Value already set: %s" %name)
2593            except synch_store.RevokedKeyError:
2594                # No more synch on this key
2595                raise service_error(service_error.federant, 
2596                        "Synch key %s revoked" % name)
2597                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2598        else:
2599            raise service_error(service_error.access, "Access Denied",
2600                    proof=proof)
Note: See TracBrowser for help on using the repository browser.