source: fedd/federation/experiment_control.py @ ee7f7e4

axis_examplecompt_changesinfo-ops
Last change on this file since ee7f7e4 was 1ec5d4a, checked in by Mike Ryan <mikeryan@…>, 14 years ago

don't attempt to close file that was never opened

  • Property mode set to 100644
File size: 83.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        dt = config.get("experiment_control", "direct_transit")
154        self.auth_type = config.get('experiment_control', 'auth_type') \
155                or 'legacy'
156        self.auth_dir = config.get('experiment_control', 'auth_dir')
157        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
158        else: self.direct_transit = [ ]
159        # NB for internal master/slave ops, not experiment setup
160        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
161
162        self.overrides = set([])
163        ovr = config.get('experiment_control', 'overrides')
164        if ovr:
165            for o in ovr.split(","):
166                o = o.strip()
167                if o.startswith('fedid:'): o = o[len('fedid:'):]
168                self.overrides.add(fedid(hexstr=o))
169
170        self.state = { }
171        self.state_lock = Lock()
172        self.tclsh = "/usr/local/bin/otclsh"
173        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
174                config.get("experiment_control", "tcl_splitter",
175                        "/usr/testbed/lib/ns2ir/parse.tcl")
176        mapdb_file = config.get("experiment_control", "mapdb")
177        self.trace_file = sys.stderr
178
179        self.def_expstart = \
180                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
181                "/tmp/federate";
182        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
183                "FEDDIR/hosts";
184        self.def_gwstart = \
185                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
186                "/tmp/bridge.log";
187        self.def_mgwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
189                "/tmp/bridge.log";
190        self.def_gwimage = "FBSD61-TUNNEL2";
191        self.def_gwtype = "pc";
192        self.local_access = { }
193
194        if self.auth_type == 'legacy':
195            if auth:
196                self.auth = auth
197            else:
198                self.log.error( "[access]: No authorizer initialized, " +\
199                        "creating local one.")
200                auth = authorizer()
201            self.get_access = self.legacy_get_access
202        elif self.auth_type == 'abac':
203            self.auth = abac_authorizer(load=self.auth_dir)
204        else:
205            raise service_error(service_error.internal, 
206                    "Unknown auth_type: %s" % self.auth_type)
207
208        if mapdb_file:
209            self.read_mapdb(mapdb_file)
210        else:
211            self.log.warn("[experiment_control] No testbed map, using defaults")
212            self.tbmap = { 
213                    'deter':'https://users.isi.deterlab.net:23235',
214                    'emulab':'https://users.isi.deterlab.net:23236',
215                    'ucb':'https://users.isi.deterlab.net:23237',
216                    }
217
218        if accessdb_file:
219                self.read_accessdb(accessdb_file)
220        else:
221            raise service_error(service_error.internal,
222                    "No accessdb specified in config")
223
224        # Grab saved state.  OK to do this w/o locking because it's read only
225        # and only one thread should be in existence that can see self.state at
226        # this point.
227        if self.state_filename:
228            self.read_state()
229
230        if self.store_filename:
231            self.read_store()
232        else:
233            self.log.warning("No saved synch store")
234            self.synch_store = synch_store
235
236        # Dispatch tables
237        self.soap_services = {\
238                'New': soap_handler('New', self.new_experiment),
239                'Create': soap_handler('Create', self.create_experiment),
240                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
241                'Vis': soap_handler('Vis', self.get_vis),
242                'Info': soap_handler('Info', self.get_info),
243                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
244                'Terminate': soap_handler('Terminate', 
245                    self.terminate_experiment),
246                'GetValue': soap_handler('GetValue', self.GetValue),
247                'SetValue': soap_handler('SetValue', self.SetValue),
248        }
249
250        self.xmlrpc_services = {\
251                'New': xmlrpc_handler('New', self.new_experiment),
252                'Create': xmlrpc_handler('Create', self.create_experiment),
253                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
254                'Vis': xmlrpc_handler('Vis', self.get_vis),
255                'Info': xmlrpc_handler('Info', self.get_info),
256                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
257                'Terminate': xmlrpc_handler('Terminate',
258                    self.terminate_experiment),
259                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
260                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
261        }
262
263    # Call while holding self.state_lock
264    def write_state(self):
265        """
266        Write a new copy of experiment state after copying the existing state
267        to a backup.
268
269        State format is a simple pickling of the state dictionary.
270        """
271        if os.access(self.state_filename, os.W_OK):
272            copy_file(self.state_filename, \
273                    "%s.bak" % self.state_filename)
274        try:
275            f = open(self.state_filename, 'w')
276            pickle.dump(self.state, f)
277        except EnvironmentError, e:
278            self.log.error("Can't write file %s: %s" % \
279                    (self.state_filename, e))
280        except pickle.PicklingError, e:
281            self.log.error("Pickling problem: %s" % e)
282        except TypeError, e:
283            self.log.error("Pickling problem (TypeError): %s" % e)
284
285    @staticmethod
286    def get_alloc_ids(state):
287        """
288        Pull the fedids of the identifiers of each allocation from the
289        state.  Again, a dict dive that's best isolated.
290
291        Used by read_store and read state
292        """
293
294        return [ f['allocID']['fedid'] 
295                for f in state.get('federant',[]) \
296                    if f.has_key('allocID') and \
297                        f['allocID'].has_key('fedid')]
298
299    # Call while holding self.state_lock
300    def read_state(self):
301        """
302        Read a new copy of experiment state.  Old state is overwritten.
303
304        State format is a simple pickling of the state dictionary.
305        """
306       
307        def get_experiment_id(state):
308            """
309            Pull the fedid experimentID out of the saved state.  This is kind
310            of a gross walk through the dict.
311            """
312
313            if state.has_key('experimentID'):
314                for e in state['experimentID']:
315                    if e.has_key('fedid'):
316                        return e['fedid']
317                else:
318                    return None
319            else:
320                return None
321
322        try:
323            f = open(self.state_filename, "r")
324            self.state = pickle.load(f)
325            self.log.debug("[read_state]: Read state from %s" % \
326                    self.state_filename)
327        except EnvironmentError, e:
328            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
329                    % (self.state_filename, e))
330        except pickle.UnpicklingError, e:
331            self.log.warning(("[read_state]: No saved state: " + \
332                    "Unpickling failed: %s") % e)
333       
334        for s in self.state.values():
335            try:
336
337                eid = get_experiment_id(s)
338                if eid : 
339                    if self.auth_type == 'legacy':
340                        # XXX: legacy
341                        # Give the owner rights to the experiment
342                        self.auth.set_attribute(s['owner'], eid)
343                        # And holders of the eid as well
344                        self.auth.set_attribute(eid, eid)
345                        # allow overrides to control experiments as well
346                        for o in self.overrides:
347                            self.auth.set_attribute(o, eid)
348                        # Set permissions to allow reading of the software
349                        # repo, if any, as well.
350                        for a in self.get_alloc_ids(s):
351                            self.auth.set_attribute(a, 'repo/%s' % eid)
352                else:
353                    raise KeyError("No experiment id")
354            except KeyError, e:
355                self.log.warning("[read_state]: State ownership or identity " +\
356                        "misformatted in %s: %s" % (self.state_filename, e))
357
358
359    def read_accessdb(self, accessdb_file):
360        """
361        Read the mapping from fedids that can create experiments to their name
362        in the 3-level access namespace.  All will be asserted from this
363        testbed and can include the local username and porject that will be
364        asserted on their behalf by this fedd.  Each fedid is also added to the
365        authorization system with the "create" attribute.
366        """
367        self.accessdb = {}
368        # These are the regexps for parsing the db
369        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
370        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
371                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
372        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\s*(" + name_expr + ")\s*$")
374        lineno = 0
375
376        # Parse the mappings and store in self.authdb, a dict of
377        # fedid -> (proj, user)
378        try:
379            f = open(accessdb_file, "r")
380            for line in f:
381                lineno += 1
382                line = line.strip()
383                if len(line) == 0 or line.startswith('#'):
384                    continue
385                m = project_line.match(line)
386                if m:
387                    fid = fedid(hexstr=m.group(1))
388                    project, user = m.group(2,3)
389                    if not self.accessdb.has_key(fid):
390                        self.accessdb[fid] = []
391                    self.accessdb[fid].append((project, user))
392                    continue
393
394                m = user_line.match(line)
395                if m:
396                    fid = fedid(hexstr=m.group(1))
397                    project = None
398                    user = m.group(2)
399                    if not self.accessdb.has_key(fid):
400                        self.accessdb[fid] = []
401                    self.accessdb[fid].append((project, user))
402                    continue
403                self.log.warn("[experiment_control] Error parsing access " +\
404                        "db %s at line %d" %  (accessdb_file, lineno))
405        except EnvironmentError:
406            raise service_error(service_error.internal,
407                    ("Error opening/reading %s as experiment " +\
408                            "control accessdb") %  accessdb_file)
409        f.close()
410
411        # Initialize the authorization attributes
412        # XXX: legacy
413        if self.auth_type == 'legacy':
414            for fid in self.accessdb.keys():
415                self.auth.set_attribute(fid, 'create')
416                self.auth.set_attribute(fid, 'new')
417
418    def read_mapdb(self, file):
419        """
420        Read a simple colon separated list of mappings for the
421        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
422        """
423
424        self.tbmap = { }
425        lineno =0
426        try:
427            f = open(file, "r")
428            for line in f:
429                lineno += 1
430                line = line.strip()
431                if line.startswith('#') or len(line) == 0:
432                    continue
433                try:
434                    label, url = line.split(':', 1)
435                    self.tbmap[label] = url
436                except ValueError, e:
437                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
438                            "map db: %s %s" % (lineno, line, e))
439        except EnvironmentError, e:
440            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
441                    "open %s: %s" % (file, e))
442        else:
443            f.close()
444
445    def read_store(self):
446        try:
447            self.synch_store = synch_store()
448            self.synch_store.load(self.store_filename)
449            self.log.debug("[read_store]: Read store from %s" % \
450                    self.store_filename)
451        except EnvironmentError, e:
452            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
453                    % (self.state_filename, e))
454            self.synch_store = synch_store()
455
456        # Set the initial permissions on data in the store.  XXX: This ad hoc
457        # authorization attribute initialization is getting out of hand.
458        # XXX: legacy
459        if self.auth_type == 'legacy':
460            for k in self.synch_store.all_keys():
461                try:
462                    if k.startswith('fedid:'):
463                        fid = fedid(hexstr=k[6:46])
464                        if self.state.has_key(fid):
465                            for a in self.get_alloc_ids(self.state[fid]):
466                                self.auth.set_attribute(a, k)
467                except ValueError, e:
468                    self.log.warn("Cannot deduce permissions for %s" % k)
469
470
471    def write_store(self):
472        """
473        Write a new copy of synch_store after writing current state
474        to a backup.  We use the internal synch_store pickle method to avoid
475        incinsistent data.
476
477        State format is a simple pickling of the store.
478        """
479        if os.access(self.store_filename, os.W_OK):
480            copy_file(self.store_filename, \
481                    "%s.bak" % self.store_filename)
482        try:
483            self.synch_store.save(self.store_filename)
484        except EnvironmentError, e:
485            self.log.error("Can't write file %s: %s" % \
486                    (self.store_filename, e))
487        except TypeError, e:
488            self.log.error("Pickling problem (TypeError): %s" % e)
489
490
491    def remove_dirs(self, dir):
492        """
493        Remove the directory tree and all files rooted at dir.  Log any errors,
494        but continue.
495        """
496        self.log.debug("[removedirs]: removing %s" % dir)
497        try:
498            for path, dirs, files in os.walk(dir, topdown=False):
499                for f in files:
500                    os.remove(os.path.join(path, f))
501                for d in dirs:
502                    os.rmdir(os.path.join(path, d))
503            os.rmdir(dir)
504        except EnvironmentError, e:
505            self.log.error("Error deleting directory tree in %s" % e);
506
507    @staticmethod
508    def make_temp_certfile(expcert, tmpdir):
509        """
510        make a protected copy of the access certificate so the experiment
511        controller can act as the experiment principal.  mkstemp is the most
512        secure way to do that. The directory should be created by
513        mkdtemp.  Return the filename.
514        """
515        if expcert and tmpdir:
516            try:
517                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
518                f = os.fdopen(certf, 'w')
519                print >> f, expcert
520                f.close()
521            except EnvironmentError, e:
522                raise service_error(service_error.internal, 
523                        "Cannot create temp cert file?")
524            return certfn
525        else:
526            return None
527
528       
529    def generate_ssh_keys(self, dest, type="rsa" ):
530        """
531        Generate a set of keys for the gateways to use to talk.
532
533        Keys are of type type and are stored in the required dest file.
534        """
535        valid_types = ("rsa", "dsa")
536        t = type.lower();
537        if t not in valid_types: raise ValueError
538        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
539
540        try:
541            trace = open("/dev/null", "w")
542        except EnvironmentError:
543            raise service_error(service_error.internal,
544                    "Cannot open /dev/null??");
545
546        # May raise CalledProcessError
547        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
548        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
549        if rv != 0:
550            raise service_error(service_error.internal, 
551                    "Cannot generate nonce ssh keys.  %s return code %d" \
552                            % (self.ssh_keygen, rv))
553
554    def gentopo(self, str):
555        """
556        Generate the topology data structure from the splitter's XML
557        representation of it.
558
559        The topology XML looks like:
560            <experiment>
561                <nodes>
562                    <node><vname></vname><ips>ip1:ip2</ips></node>
563                </nodes>
564                <lans>
565                    <lan>
566                        <vname></vname><vnode></vnode><ip></ip>
567                        <bandwidth></bandwidth><member>node:port</member>
568                    </lan>
569                </lans>
570        """
571        class topo_parse:
572            """
573            Parse the topology XML and create the dats structure.
574            """
575            def __init__(self):
576                # Typing of the subelements for data conversion
577                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
578                self.int_subelements = ( 'bandwidth',)
579                self.float_subelements = ( 'delay',)
580                # The final data structure
581                self.nodes = [ ]
582                self.lans =  [ ]
583                self.topo = { \
584                        'node': self.nodes,\
585                        'lan' : self.lans,\
586                    }
587                self.element = { }  # Current element being created
588                self.chars = ""     # Last text seen
589
590            def end_element(self, name):
591                # After each sub element the contents is added to the current
592                # element or to the appropriate list.
593                if name == 'node':
594                    self.nodes.append(self.element)
595                    self.element = { }
596                elif name == 'lan':
597                    self.lans.append(self.element)
598                    self.element = { }
599                elif name in self.str_subelements:
600                    self.element[name] = self.chars
601                    self.chars = ""
602                elif name in self.int_subelements:
603                    self.element[name] = int(self.chars)
604                    self.chars = ""
605                elif name in self.float_subelements:
606                    self.element[name] = float(self.chars)
607                    self.chars = ""
608
609            def found_chars(self, data):
610                self.chars += data.rstrip()
611
612
613        tp = topo_parse();
614        parser = xml.parsers.expat.ParserCreate()
615        parser.EndElementHandler = tp.end_element
616        parser.CharacterDataHandler = tp.found_chars
617
618        parser.Parse(str)
619
620        return tp.topo
621       
622
623    def genviz(self, topo):
624        """
625        Generate the visualization the virtual topology
626        """
627
628        neato = "/usr/local/bin/neato"
629        # These are used to parse neato output and to create the visualization
630        # file.
631        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
632        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
633                "%s</type></node>"
634
635        try:
636            # Node names
637            nodes = [ n['vname'] for n in topo['node'] ]
638            topo_lans = topo['lan']
639        except KeyError, e:
640            raise service_error(service_error.internal, "Bad topology: %s" %e)
641
642        lans = { }
643        links = { }
644
645        # Walk through the virtual topology, organizing the connections into
646        # 2-node connections (links) and more-than-2-node connections (lans).
647        # When a lan is created, it's added to the list of nodes (there's a
648        # node in the visualization for the lan).
649        for l in topo_lans:
650            if links.has_key(l['vname']):
651                if len(links[l['vname']]) < 2:
652                    links[l['vname']].append(l['vnode'])
653                else:
654                    nodes.append(l['vname'])
655                    lans[l['vname']] = links[l['vname']]
656                    del links[l['vname']]
657                    lans[l['vname']].append(l['vnode'])
658            elif lans.has_key(l['vname']):
659                lans[l['vname']].append(l['vnode'])
660            else:
661                links[l['vname']] = [ l['vnode'] ]
662
663
664        # Open up a temporary file for dot to turn into a visualization
665        try:
666            df, dotname = tempfile.mkstemp()
667            dotfile = os.fdopen(df, 'w')
668        except EnvironmentError:
669            raise service_error(service_error.internal,
670                    "Failed to open file in genviz")
671
672        try:
673            dnull = open('/dev/null', 'w')
674        except EnvironmentError:
675            service_error(service_error.internal,
676                    "Failed to open /dev/null in genviz")
677
678        # Generate a dot/neato input file from the links, nodes and lans
679        try:
680            print >>dotfile, "graph G {"
681            for n in nodes:
682                print >>dotfile, '\t"%s"' % n
683            for l in links.keys():
684                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
685            for l in lans.keys():
686                for n in lans[l]:
687                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
688            print >>dotfile, "}"
689            dotfile.close()
690        except TypeError:
691            raise service_error(service_error.internal,
692                    "Single endpoint link in vtopo")
693        except EnvironmentError:
694            raise service_error(service_error.internal, "Cannot write dot file")
695
696        # Use dot to create a visualization
697        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
698                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
699                close_fds=True)
700        dnull.close()
701
702        # Translate dot to vis format
703        vis_nodes = [ ]
704        vis = { 'node': vis_nodes }
705        for line in dot.stdout:
706            m = vis_re.match(line)
707            if m:
708                vn = m.group(1)
709                vis_node = {'name': vn, \
710                        'x': float(m.group(2)),\
711                        'y' : float(m.group(3)),\
712                    }
713                if vn in links.keys() or vn in lans.keys():
714                    vis_node['type'] = 'lan'
715                else:
716                    vis_node['type'] = 'node'
717                vis_nodes.append(vis_node)
718        rv = dot.wait()
719
720        os.remove(dotname)
721        if rv == 0 : return vis
722        else: return None
723
724
725    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
726            cert_pwd=None):
727        """
728        Release access to testbed through fedd
729        """
730
731        if not uri and tbmap:
732            uri = tbmap.get(tb, None)
733        if not uri:
734            raise service_error(service_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        if self.local_access.has_key(uri):
738            resp = self.local_access[uri].ReleaseAccess(\
739                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
740                    fedid(file=cert_file))
741            resp = { 'ReleaseAccessResponseBody': resp } 
742        else:
743            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
744                    cert_file, cert_pwd, self.trusted_certs)
745
746        # better error coding
747
748    def remote_ns2topdl(self, uri, desc):
749
750        req = {
751                'description' : { 'ns2description': desc },
752            }
753
754        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
755                self.trusted_certs)
756
757        if r.has_key('Ns2TopdlResponseBody'):
758            r = r['Ns2TopdlResponseBody']
759            ed = r.get('experimentdescription', None)
760            if ed.has_key('topdldescription'):
761                return topdl.Topology(**ed['topdldescription'])
762            else:
763                raise service_error(service_error.protocol, 
764                        "Bad splitter response (no output)")
765        else:
766            raise service_error(service_error.protocol, "Bad splitter response")
767
768    class start_segment:
769        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
770                cert_pwd=None, trusted_certs=None, caller=None,
771                log_collector=None):
772            self.log = log
773            self.debug = debug
774            self.cert_file = cert_file
775            self.cert_pwd = cert_pwd
776            self.trusted_certs = None
777            self.caller = caller
778            self.testbed = testbed
779            self.log_collector = log_collector
780            self.response = None
781            self.node = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            for e in resp.get('embedding', []):
786                if 'toponame' in e and 'physname' in e:
787                    self.node[e['toponame']] = e['physname'][0]
788
789        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
790            req = {
791                    'allocID': { 'fedid' : aid }, 
792                    'segmentdescription': { 
793                        'topdldescription': topo.to_dict(),
794                    },
795                }
796
797            if connInfo:
798                req['connection'] = connInfo
799
800            import_svcs = [ s for m in masters.values() \
801                    for s in m if self.testbed in s.importers]
802
803            if import_svcs or self.testbed in masters:
804                req['service'] = []
805
806            for s in import_svcs:
807                for r in s.reqs:
808                    sr = copy.deepcopy(r)
809                    sr['visibility'] = 'import';
810                    req['service'].append(sr)
811
812            for s in masters.get(self.testbed, []):
813                for r in s.reqs:
814                    sr = copy.deepcopy(r)
815                    sr['visibility'] = 'export';
816                    req['service'].append(sr)
817
818            if attrs:
819                req['fedAttr'] = attrs
820
821            try:
822                self.log.debug("Calling StartSegment at %s " % uri)
823                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
824                        self.trusted_certs)
825                if r.has_key('StartSegmentResponseBody'):
826                    lval = r['StartSegmentResponseBody'].get('allocationLog',
827                            None)
828                    if lval and self.log_collector:
829                        for line in  lval.splitlines(True):
830                            self.log_collector.write(line)
831                    self.make_map(r['StartSegmentResponseBody'])
832                    if 'proof' in r: self.proof = r['proof']
833                    self.response = r
834                else:
835                    raise service_error(service_error.internal, 
836                            "Bad response!?: %s" %r)
837                return True
838            except service_error, e:
839                self.log.error("Start segment failed on %s: %s" % \
840                        (self.testbed, e))
841                return False
842
843
844
845    class terminate_segment:
846        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
847                cert_pwd=None, trusted_certs=None, caller=None):
848            self.log = log
849            self.debug = debug
850            self.cert_file = cert_file
851            self.cert_pwd = cert_pwd
852            self.trusted_certs = None
853            self.caller = caller
854            self.testbed = testbed
855
856        def __call__(self, uri, aid ):
857            req = {
858                    'allocID': aid , 
859                }
860            try:
861                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
862                        self.trusted_certs)
863                return True
864            except service_error, e:
865                self.log.error("Terminate segment failed on %s: %s" % \
866                        (self.testbed, e))
867                return False
868
869    def allocate_resources(self, allocated, masters, eid, expid, 
870            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
871            attrs=None, connInfo={}, tbmap=None, expcert=None):
872
873        started = { }           # Testbeds where a sub-experiment started
874                                # successfully
875
876        # XXX
877        fail_soft = False
878
879        if tbmap is None: tbmap = { }
880
881        log = alloc_log or self.log
882
883        tp = thread_pool(self.nthreads)
884        threads = [ ]
885        starters = [ ]
886
887        if expcert:
888            cert = expcert
889            pw = None
890        else:
891            cert = self.cert_file
892            pw = self.cert_pwd
893
894        for tb in allocated.keys():
895            # Create and start a thread to start the segment, and save it
896            # to get the return value later
897            tb_attrs = copy.copy(attrs)
898            tp.wait_for_slot()
899            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
900            base, suffix = split_testbed(tb)
901            if suffix:
902                tb_attrs.append({'attribute': 'experiment_name', 
903                    'value': "%s-%s" % (eid, suffix)})
904            else:
905                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
906            if not uri:
907                raise service_error(service_error.internal, 
908                        "Unknown testbed %s !?" % tb)
909
910            if tbparams[tb].has_key('allocID') and \
911                    tbparams[tb]['allocID'].has_key('fedid'):
912                aid = tbparams[tb]['allocID']['fedid']
913            else:
914                raise service_error(service_error.internal, 
915                        "No alloc id for testbed %s !?" % tb)
916
917            s = self.start_segment(log=log, debug=self.debug,
918                    testbed=tb, cert_file=cert,
919                    cert_pwd=pw, trusted_certs=self.trusted_certs,
920                    caller=self.call_StartSegment,
921                    log_collector=log_collector)
922            starters.append(s)
923            t  = pooled_thread(\
924                    target=s, name=tb,
925                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
926                    pdata=tp, trace_file=self.trace_file)
927            threads.append(t)
928            t.start()
929
930        # Wait until all finish (keep pinging the log, though)
931        mins = 0
932        revoked = False
933        while not tp.wait_for_all_done(60.0):
934            mins += 1
935            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
936                    % mins)
937            if not revoked and \
938                    len([ t.getName() for t in threads if t.rv == False]) > 0:
939                # a testbed has failed.  Revoke this experiment's
940                # synchronizarion values so that sub experiments will not
941                # deadlock waiting for synchronization that will never happen
942                self.log.info("A subexperiment has failed to swap in, " + \
943                        "revoking synch keys")
944                var_key = "fedid:%s" % expid
945                for k in self.synch_store.all_keys():
946                    if len(k) > 45 and k[0:46] == var_key:
947                        self.synch_store.revoke_key(k)
948                revoked = True
949
950        failed = [ t.getName() for t in threads if not t.rv ]
951        succeeded = [tb for tb in allocated.keys() if tb not in failed]
952
953        # If one failed clean up, unless fail_soft is set
954        if failed:
955            if not fail_soft:
956                tp.clear()
957                for tb in succeeded:
958                    # Create and start a thread to stop the segment
959                    tp.wait_for_slot()
960                    uri = tbparams[tb]['uri']
961                    t  = pooled_thread(\
962                            target=self.terminate_segment(log=log,
963                                testbed=tb,
964                                cert_file=cert, 
965                                cert_pwd=pw,
966                                trusted_certs=self.trusted_certs,
967                                caller=self.call_TerminateSegment),
968                            args=(uri, tbparams[tb]['federant']['allocID']),
969                            name=tb,
970                            pdata=tp, trace_file=self.trace_file)
971                    t.start()
972                # Wait until all finish (if any are being stopped)
973                if succeeded:
974                    tp.wait_for_all_done()
975
976                # release the allocations
977                for tb in tbparams.keys():
978                    try:
979                        self.release_access(tb, tbparams[tb]['allocID'], 
980                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
981                                cert_file=cert, cert_pwd=pw)
982                    except service_error, e:
983                        self.log.warn("Error releasing access: %s" % e.desc)
984                # Remove the placeholder
985                self.state_lock.acquire()
986                self.state[eid]['experimentStatus'] = 'failed'
987                if self.state_filename: self.write_state()
988                self.state_lock.release()
989                # Remove the repo dir
990                self.remove_dirs("%s/%s" %(self.repodir, expid))
991                # Walk up tmpdir, deleting as we go
992                if self.cleanup:
993                    self.remove_dirs(tmpdir)
994                else:
995                    log.debug("[start_experiment]: not removing %s" % tmpdir)
996
997
998                log.error("Swap in failed on %s" % ",".join(failed))
999                return
1000        else:
1001            # Walk through the successes and gather the virtual to physical
1002            # mapping.
1003            embedding = [ ]
1004            proofs = { }
1005            for s in starters:
1006                for k, v in s.node.items():
1007                    embedding.append({
1008                        'toponame': k, 
1009                        'physname': [ v],
1010                        'testbed': s.testbed
1011                        })
1012                if s.proof:
1013                    proofs[s.testbed] = s.proof
1014            log.info("[start_segment]: Experiment %s active" % eid)
1015
1016
1017        # Walk up tmpdir, deleting as we go
1018        if self.cleanup:
1019            self.remove_dirs(tmpdir)
1020        else:
1021            log.debug("[start_experiment]: not removing %s" % tmpdir)
1022
1023        # Insert the experiment into our state and update the disk copy.
1024        self.state_lock.acquire()
1025        self.state[expid]['experimentStatus'] = 'active'
1026        self.state[eid] = self.state[expid]
1027        self.state[eid]['experimentdescription']['topdldescription'] = \
1028                top.to_dict()
1029        self.state[eid]['embedding'] = embedding
1030        # Append startup proofs
1031        for f in self.state[eid]['federant']:
1032            if 'name' in f and 'localname' in f['name']:
1033                if f['name']['localname'] in proofs:
1034                    f['proof'].append(proofs[f['name']['localname']])
1035
1036        if self.state_filename: self.write_state()
1037        self.state_lock.release()
1038        return
1039
1040
1041    def add_kit(self, e, kit):
1042        """
1043        Add a Software object created from the list of (install, location)
1044        tuples passed as kit  to the software attribute of an object e.  We
1045        do this enough to break out the code, but it's kind of a hack to
1046        avoid changing the old tuple rep.
1047        """
1048
1049        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1050
1051        if isinstance(e.software, list): e.software.extend(s)
1052        else: e.software = s
1053
1054    def append_experiment_authorization(self, expid, attrs, 
1055            need_state_lock=True):
1056        """
1057        Append the authorization information to system state
1058        """
1059
1060        for p, a in attrs:
1061            self.auth.set_attribute(p, a)
1062        self.auth.save()
1063
1064        if need_state_lock: self.state_lock.acquire()
1065        self.state[expid]['auth'].update(attrs)
1066        if self.state_filename: self.write_state()
1067        if need_state_lock: self.state_lock.release()
1068
1069    def clear_experiment_authorization(self, expid, need_state_lock=True):
1070        """
1071        Attrs is a set of attribute principal pairs that need to be removed
1072        from the authenticator.  Remove them and save the authenticator.
1073        """
1074
1075        if need_state_lock: self.state_lock.acquire()
1076        if expid in self.state and 'auth' in self.state[expid]:
1077            for p, a in self.state[expid]['auth']:
1078                self.auth.unset_attribute(p, a)
1079            self.state[expid]['auth'] = set()
1080        if self.state_filename: self.write_state()
1081        if need_state_lock: self.state_lock.release()
1082        self.auth.save()
1083
1084
1085    def create_experiment_state(self, fid, req, expid, expcert,
1086            state='starting'):
1087        """
1088        Create the initial entry in the experiment's state.  The expid and
1089        expcert are the experiment's fedid and certifacte that represents that
1090        ID, which are installed in the experiment state.  If the request
1091        includes a suggested local name that is used if possible.  If the local
1092        name is already taken by an experiment owned by this user that has
1093        failed, it is overwritten.  Otherwise new letters are added until a
1094        valid localname is found.  The generated local name is returned.
1095        """
1096
1097        if req.has_key('experimentID') and \
1098                req['experimentID'].has_key('localname'):
1099            overwrite = False
1100            eid = req['experimentID']['localname']
1101            # If there's an old failed experiment here with the same local name
1102            # and accessible by this user, we'll overwrite it, otherwise we'll
1103            # fall through and do the collision avoidance.
1104            old_expid = self.get_experiment_fedid(eid)
1105            if old_expid and self.check_experiment_access(fid, old_expid):
1106                self.state_lock.acquire()
1107                status = self.state[eid].get('experimentStatus', None)
1108                if status and status == 'failed':
1109                    # remove the old access attributes
1110                    self.clear_experiment_authorization(eid,
1111                            need_state_lock=False)
1112                    overwrite = True
1113                    del self.state[eid]
1114                    del self.state[old_expid]
1115                self.state_lock.release()
1116            self.state_lock.acquire()
1117            while (self.state.has_key(eid) and not overwrite):
1118                eid += random.choice(string.ascii_letters)
1119            # Initial state
1120            self.state[eid] = {
1121                    'experimentID' : \
1122                            [ { 'localname' : eid }, {'fedid': expid } ],
1123                    'experimentStatus': state,
1124                    'experimentAccess': { 'X509' : expcert },
1125                    'owner': fid,
1126                    'log' : [],
1127                    'auth': set(),
1128                }
1129            self.state[expid] = self.state[eid]
1130            if self.state_filename: self.write_state()
1131            self.state_lock.release()
1132        else:
1133            eid = self.exp_stem
1134            for i in range(0,5):
1135                eid += random.choice(string.ascii_letters)
1136            self.state_lock.acquire()
1137            while (self.state.has_key(eid)):
1138                eid = self.exp_stem
1139                for i in range(0,5):
1140                    eid += random.choice(string.ascii_letters)
1141            # Initial state
1142            self.state[eid] = {
1143                    'experimentID' : \
1144                            [ { 'localname' : eid }, {'fedid': expid } ],
1145                    'experimentStatus': state,
1146                    'experimentAccess': { 'X509' : expcert },
1147                    'owner': fid,
1148                    'log' : [],
1149                    'auth': set(),
1150                }
1151            self.state[expid] = self.state[eid]
1152            if self.state_filename: self.write_state()
1153            self.state_lock.release()
1154
1155        # Let users touch the state.  Authorize this fid and the expid itself
1156        # to touch the experiment, as well as allowing th eoverrides.
1157        self.append_experiment_authorization(eid, 
1158                set([(fid, expid), (expid,expid)] + \
1159                        [ (o, expid) for o in self.overrides]))
1160
1161        return eid
1162
1163
1164    def allocate_ips_to_topo(self, top):
1165        """
1166        Add an ip4_address attribute to all the hosts in the topology, based on
1167        the shared substrates on which they sit.  An /etc/hosts file is also
1168        created and returned as a list of hostfiles entries.  We also return
1169        the allocator, because we may need to allocate IPs to portals
1170        (specifically DRAGON portals).
1171        """
1172        subs = sorted(top.substrates, 
1173                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1174                reverse=True)
1175        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1176        ifs = { }
1177        hosts = [ ]
1178
1179        for idx, s in enumerate(subs):
1180            net_size = len(s.interfaces)+2
1181
1182            a = ips.allocate(net_size)
1183            if a :
1184                base, num = a
1185                if num < net_size: 
1186                    raise service_error(service_error.internal,
1187                            "Allocator returned wrong number of IPs??")
1188            else:
1189                raise service_error(service_error.req, 
1190                        "Cannot allocate IP addresses")
1191            mask = ips.min_alloc
1192            while mask < net_size:
1193                mask *= 2
1194
1195            netmask = ((2**32-1) ^ (mask-1))
1196
1197            base += 1
1198            for i in s.interfaces:
1199                i.attribute.append(
1200                        topdl.Attribute('ip4_address', 
1201                            "%s" % ip_addr(base)))
1202                i.attribute.append(
1203                        topdl.Attribute('ip4_netmask', 
1204                            "%s" % ip_addr(int(netmask))))
1205
1206                hname = i.element.name
1207                if ifs.has_key(hname):
1208                    hosts.append("%s\t%s-%s %s-%d" % \
1209                            (ip_addr(base), hname, s.name, hname,
1210                                ifs[hname]))
1211                else:
1212                    ifs[hname] = 0
1213                    hosts.append("%s\t%s-%s %s-%d %s" % \
1214                            (ip_addr(base), hname, s.name, hname,
1215                                ifs[hname], hname))
1216
1217                ifs[hname] += 1
1218                base += 1
1219        return hosts, ips
1220
1221    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1222            tbparam, masters, tbmap, expid=None, expcert=None):
1223        for tb in testbeds:
1224            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1225                    expcert)
1226            allocated[tb] = 1
1227
1228    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1229            expcert=None):
1230        """
1231        Get access to testbed through fedd and set the parameters for that tb
1232        """
1233        def get_export_project(svcs):
1234            """
1235            Look through for the list of federated_service for this testbed
1236            objects for a project_export service, and extract the project
1237            parameter.
1238            """
1239
1240            pe = [s for s in svcs if s.name=='project_export']
1241            if len(pe) == 1:
1242                return pe[0].params.get('project', None)
1243            elif len(pe) == 0:
1244                return None
1245            else:
1246                raise service_error(service_error.req,
1247                        "More than one project export is not supported")
1248
1249        def add_services(svcs, type, slist):
1250            """
1251            Add the given services to slist.  type is import or export.
1252            """
1253            for i, s in enumerate(svcs):
1254                idx = '%s%d' % (type, i)
1255                sr = {'id': idx, 'name': s.name, 'visibility': type }
1256                if s.params:
1257                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1258                            for k, v in s.params.items()]
1259                slist.append(sr)
1260
1261        uri = tbmap.get(testbed_base(tb), None)
1262        if not uri:
1263            raise service_error(service_error.server_config, 
1264                    "Unknown testbed: %s" % tb)
1265
1266        export_svcs = masters.get(tb,[])
1267        import_svcs = [ s for m in masters.values() \
1268                for s in m \
1269                    if tb in s.importers ]
1270
1271        export_project = get_export_project(export_svcs)
1272        # Compose the credential list so that IDs come before attributes
1273        creds = set()
1274        keys = set()
1275        certs = self.auth.get_creds_for_principal(fid)
1276        if expid:
1277            certs.update(self.auth.get_creds_for_principal(expid))
1278        for c in certs:
1279            keys.add(c.issuer_cert())
1280            creds.add(c.attribute_cert())
1281        creds = list(keys) + list(creds)
1282
1283        if expcert: cert, pw = expcert, None
1284        else: cert, pw = self.cert_file, self.cert_pw
1285
1286        # Request credentials
1287        req = {
1288                'abac_credential': creds,
1289            }
1290        # Make the service request from the services we're importing and
1291        # exporting.  Keep track of the export request ids so we can
1292        # collect the resulting info from the access response.
1293        e_keys = { }
1294        if import_svcs or export_svcs:
1295            slist = []
1296            add_services(import_svcs, 'import', slist)
1297            add_services(export_svcs, 'export', slist)
1298            req['service'] = slist
1299
1300        if self.local_access.has_key(uri):
1301            # Local access call
1302            req = { 'RequestAccessRequestBody' : req }
1303            r = self.local_access[uri].RequestAccess(req, 
1304                    fedid(file=self.cert_file))
1305            r = { 'RequestAccessResponseBody' : r }
1306        else:
1307            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1308
1309        if r.has_key('RequestAccessResponseBody'):
1310            # Through to here we have a valid response, not a fault.
1311            # Access denied is a fault, so something better or worse than
1312            # access denied has happened.
1313            r = r['RequestAccessResponseBody']
1314            self.log.debug("[get_access] Access granted")
1315        else:
1316            raise service_error(service_error.protocol,
1317                        "Bad proxy response")
1318        if 'proof' not in r:
1319            raise service_error(service_error.protocol,
1320                        "Bad access response (no access proof)")
1321       
1322        tbparam[tb] = { 
1323                "allocID" : r['allocID'],
1324                "uri": uri,
1325                "proof": [ r['proof'] ],
1326                }
1327
1328        # Collect the responses corresponding to the services this testbed
1329        # exports.  These will be the service requests that we will include in
1330        # the start segment requests (with appropriate visibility values) to
1331        # import and export the segments.
1332        for s in r.get('service', []):
1333            id = s.get('id', None)
1334            if id and id in e_keys:
1335                e_keys[id].reqs.append(s)
1336
1337        # Add attributes to parameter space.  We don't allow attributes to
1338        # overlay any parameters already installed.
1339        for a in r.get('fedAttr', []):
1340            try:
1341                if a['attribute'] and \
1342                        isinstance(a['attribute'], basestring)\
1343                        and not tbparam[tb].has_key(a['attribute'].lower()):
1344                    tbparam[tb][a['attribute'].lower()] = a['value']
1345            except KeyError:
1346                self.log.error("Bad attribute in response: %s" % a)
1347
1348
1349    def split_topology(self, top, topo, testbeds):
1350        """
1351        Create the sub-topologies that are needed for experiment instantiation.
1352        """
1353        for tb in testbeds:
1354            topo[tb] = top.clone()
1355            # copy in for loop allows deletions from the original
1356            for e in [ e for e in topo[tb].elements]:
1357                etb = e.get_attribute('testbed')
1358                # NB: elements without a testbed attribute won't appear in any
1359                # sub topologies. 
1360                if not etb or etb != tb:
1361                    for i in e.interface:
1362                        for s in i.subs:
1363                            try:
1364                                s.interfaces.remove(i)
1365                            except ValueError:
1366                                raise service_error(service_error.internal,
1367                                        "Can't remove interface??")
1368                    topo[tb].elements.remove(e)
1369            topo[tb].make_indices()
1370
1371    def confirm_software(self, top):
1372        """
1373        Make sure that the software to be loaded in the topo is all available
1374        before we begin making access requests, etc.  This is a subset of
1375        wrangle_software.
1376        """
1377        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1378        pkgs.update([x.location for e in top.elements for x in e.software])
1379
1380        for pkg in pkgs:
1381            loc = pkg
1382
1383            scheme, host, path = urlparse(loc)[0:3]
1384            dest = os.path.basename(path)
1385            if not scheme:
1386                if not loc.startswith('/'):
1387                    loc = "/%s" % loc
1388                loc = "file://%s" %loc
1389            # NB: if scheme was found, loc == pkg
1390            try:
1391                u = urlopen(loc)
1392                u.close()
1393            except Exception, e:
1394                raise service_error(service_error.req, 
1395                        "Cannot open %s: %s" % (loc, e))
1396        return True
1397
1398    def wrangle_software(self, expid, top, topo, tbparams):
1399        """
1400        Copy software out to the repository directory, allocate permissions and
1401        rewrite the segment topologies to look for the software in local
1402        places.
1403        """
1404
1405        # Copy the rpms and tarfiles to a distribution directory from
1406        # which the federants can retrieve them
1407        linkpath = "%s/software" %  expid
1408        softdir ="%s/%s" % ( self.repodir, linkpath)
1409        softmap = { }
1410
1411        # self.fedkit and self.gateway kit are lists of tuples of
1412        # (install_location, download_location) this extracts the download
1413        # locations.
1414        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1415        pkgs.update([x.location for e in top.elements for x in e.software])
1416        try:
1417            os.makedirs(softdir)
1418        except EnvironmentError, e:
1419            raise service_error(
1420                    "Cannot create software directory: %s" % e)
1421        # The actual copying.  Everything's converted into a url for copying.
1422        auth_attrs = set()
1423        for pkg in pkgs:
1424            loc = pkg
1425
1426            scheme, host, path = urlparse(loc)[0:3]
1427            dest = os.path.basename(path)
1428            if not scheme:
1429                if not loc.startswith('/'):
1430                    loc = "/%s" % loc
1431                loc = "file://%s" %loc
1432            # NB: if scheme was found, loc == pkg
1433            try:
1434                u = urlopen(loc)
1435            except Exception, e:
1436                raise service_error(service_error.req, 
1437                        "Cannot open %s: %s" % (loc, e))
1438            try:
1439                f = open("%s/%s" % (softdir, dest) , "w")
1440                self.log.debug("Writing %s/%s" % (softdir,dest) )
1441                data = u.read(4096)
1442                while data:
1443                    f.write(data)
1444                    data = u.read(4096)
1445                f.close()
1446                u.close()
1447            except Exception, e:
1448                raise service_error(service_error.internal,
1449                        "Could not copy %s: %s" % (loc, e))
1450            path = re.sub("/tmp", "", linkpath)
1451            # XXX
1452            softmap[pkg] = \
1453                    "%s/%s/%s" %\
1454                    ( self.repo_url, path, dest)
1455
1456            # Allow the individual segments to access the software by assigning
1457            # an attribute to each testbed allocation that encodes the data to
1458            # be released.  This expression collects the data for each run of
1459            # the loop.
1460            auth_attrs.update([
1461                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1462                        for tb in tbparams.keys()])
1463
1464        self.append_experiment_authorization(expid, auth_attrs)
1465
1466        # Convert the software locations in the segments into the local
1467        # copies on this host
1468        for soft in [ s for tb in topo.values() \
1469                for e in tb.elements \
1470                    if getattr(e, 'software', False) \
1471                        for s in e.software ]:
1472            if softmap.has_key(soft.location):
1473                soft.location = softmap[soft.location]
1474
1475
1476    def new_experiment(self, req, fid):
1477        """
1478        The external interface to empty initial experiment creation called from
1479        the dispatcher.
1480
1481        Creates a working directory, splits the incoming description using the
1482        splitter script and parses out the avrious subsections using the
1483        lcasses above.  Once each sub-experiment is created, use pooled threads
1484        to instantiate them and start it all up.
1485        """
1486        req = req.get('NewRequestBody', None)
1487        if not req:
1488            raise service_error(service_error.req,
1489                    "Bad request format (no NewRequestBody)")
1490
1491        if self.auth.import_credentials(data_list=req.get('credential', [])):
1492            self.auth.save()
1493
1494        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1495                with_proof=True)
1496
1497        if not access_ok:
1498            raise service_error(service_error.access, "New access denied",
1499                    proof=[proof])
1500
1501        try:
1502            tmpdir = tempfile.mkdtemp(prefix="split-")
1503        except EnvironmentError:
1504            raise service_error(service_error.internal, "Cannot create tmp dir")
1505
1506        try:
1507            access_user = self.accessdb[fid]
1508        except KeyError:
1509            raise service_error(service_error.internal,
1510                    "Access map and authorizer out of sync in " + \
1511                            "new_experiment for fedid %s"  % fid)
1512
1513        # Generate an ID for the experiment (slice) and a certificate that the
1514        # allocator can use to prove they own it.  We'll ship it back through
1515        # the encrypted connection.  If the requester supplied one, use it.
1516        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1517            expcert = req['experimentAccess']['X509']
1518            expid = fedid(certstr=expcert)
1519            self.state_lock.acquire()
1520            if expid in self.state:
1521                self.state_lock.release()
1522                raise service_error(service_error.req, 
1523                        'fedid %s identifies an existing experiment' % expid)
1524            self.state_lock.release()
1525        else:
1526            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1527
1528        #now we're done with the tmpdir, and it should be empty
1529        if self.cleanup:
1530            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1531            os.rmdir(tmpdir)
1532        else:
1533            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1534
1535        eid = self.create_experiment_state(fid, req, expid, expcert, 
1536                state='empty')
1537
1538        rv = {
1539                'experimentID': [
1540                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1541                ],
1542                'experimentStatus': 'empty',
1543                'experimentAccess': { 'X509' : expcert },
1544                'proof': proof.to_dict(),
1545            }
1546
1547        return rv
1548
1549    # create_experiment sub-functions
1550
1551    @staticmethod
1552    def get_experiment_key(req, field='experimentID'):
1553        """
1554        Parse the experiment identifiers out of the request (the request body
1555        tag has been removed).  Specifically this pulls either the fedid or the
1556        localname out of the experimentID field.  A fedid is preferred.  If
1557        neither is present or the request does not contain the fields,
1558        service_errors are raised.
1559        """
1560        # Get the experiment access
1561        exp = req.get(field, None)
1562        if exp:
1563            if exp.has_key('fedid'):
1564                key = exp['fedid']
1565            elif exp.has_key('localname'):
1566                key = exp['localname']
1567            else:
1568                raise service_error(service_error.req, "Unknown lookup type")
1569        else:
1570            raise service_error(service_error.req, "No request?")
1571
1572        return key
1573
1574    def get_experiment_ids_and_start(self, key, tmpdir):
1575        """
1576        Get the experiment name, id and access certificate from the state, and
1577        set the experiment state to 'starting'.  returns a triple (fedid,
1578        localname, access_cert_file). The access_cert_file is a copy of the
1579        contents of the access certificate, created in the tempdir with
1580        restricted permissions.  If things are confused, raise an exception.
1581        """
1582
1583        expid = eid = None
1584        self.state_lock.acquire()
1585        if self.state.has_key(key):
1586            self.state[key]['experimentStatus'] = "starting"
1587            for e in self.state[key].get('experimentID',[]):
1588                if not expid and e.has_key('fedid'):
1589                    expid = e['fedid']
1590                elif not eid and e.has_key('localname'):
1591                    eid = e['localname']
1592            if 'experimentAccess' in self.state[key] and \
1593                    'X509' in self.state[key]['experimentAccess']:
1594                expcert = self.state[key]['experimentAccess']['X509']
1595            else:
1596                expcert = None
1597        self.state_lock.release()
1598
1599        # make a protected copy of the access certificate so the experiment
1600        # controller can act as the experiment principal.
1601        if expcert:
1602            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1603            if not expcert_file:
1604                raise service_error(service_error.internal, 
1605                        "Cannot create temp cert file?")
1606        else:
1607            expcert_file = None
1608
1609        return (eid, expid, expcert_file)
1610
1611    def get_topology(self, req, tmpdir):
1612        """
1613        Get the ns2 content and put it into a file for parsing.  Call the local
1614        or remote parser and return the topdl.Topology.  Errors result in
1615        exceptions.  req is the request and tmpdir is a work directory.
1616        """
1617
1618        # The tcl parser needs to read a file so put the content into that file
1619        descr=req.get('experimentdescription', None)
1620        if descr:
1621            if 'ns2description' in descr:
1622                file_content=descr['ns2description']
1623            elif 'topdldescription' in descr:
1624                return topdl.Topology(**descr['topdldescription'])
1625            else:
1626                raise service_error(service_error.req, 
1627                        'Unknown experiment description type')
1628        else:
1629            raise service_error(service_error.req, "No experiment description")
1630
1631
1632        if self.splitter_url:
1633            self.log.debug("Calling remote topdl translator at %s" % \
1634                    self.splitter_url)
1635            top = self.remote_ns2topdl(self.splitter_url, file_content)
1636        else:
1637            tclfile = os.path.join(tmpdir, "experiment.tcl")
1638            if file_content:
1639                try:
1640                    f = open(tclfile, 'w')
1641                    f.write(file_content)
1642                    f.close()
1643                except EnvironmentError:
1644                    raise service_error(service_error.internal,
1645                            "Cannot write temp experiment description")
1646            else:
1647                raise service_error(service_error.req, 
1648                        "Only ns2descriptions supported")
1649            pid = "dummy"
1650            gid = "dummy"
1651            eid = "dummy"
1652
1653            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1654                str(self.muxmax), '-m', 'dummy']
1655
1656            tclcmd.extend([pid, gid, eid, tclfile])
1657
1658            self.log.debug("running local splitter %s", " ".join(tclcmd))
1659            # This is just fantastic.  As a side effect the parser copies
1660            # tb_compat.tcl into the current directory, so that directory
1661            # must be writable by the fedd user.  Doing this in the
1662            # temporary subdir ensures this is the case.
1663            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1664                    cwd=tmpdir)
1665            split_data = tclparser.stdout
1666
1667            top = topdl.topology_from_xml(file=split_data, top="experiment")
1668            os.remove(tclfile)
1669
1670        return top
1671
1672    def get_testbed_services(self, req, testbeds):
1673        """
1674        Parse the services section of the request into into two dicts mapping
1675        testbed to lists of federated_service objects.  The first lists all
1676        exporters of services, and the second all exporters of services that
1677        need control portals int the experiment.
1678        """
1679        masters = { }
1680        pmasters = { }
1681        for s in req.get('service', []):
1682            # If this is a service request with the importall field
1683            # set, fill it out.
1684
1685            if s.get('importall', False):
1686                s['import'] = [ tb for tb in testbeds \
1687                        if tb not in s.get('export',[])]
1688                del s['importall']
1689
1690            # Add the service to masters
1691            for tb in s.get('export', []):
1692                if s.get('name', None):
1693
1694                    params = { }
1695                    for a in s.get('fedAttr', []):
1696                        params[a.get('attribute', '')] = a.get('value','')
1697
1698                    fser = federated_service(name=s['name'],
1699                            exporter=tb, importers=s.get('import',[]),
1700                            params=params)
1701                    if fser.name == 'hide_hosts' \
1702                            and 'hosts' not in fser.params:
1703                        fser.params['hosts'] = \
1704                                ",".join(tb_hosts.get(fser.exporter, []))
1705                    if tb in masters: masters[tb].append(fser)
1706                    else: masters[tb] = [fser]
1707
1708                    if fser.portal:
1709                        if tb not in pmasters: pmasters[tb] = [ fser ]
1710                        else: pmasters[tb].append(fser)
1711                else:
1712                    self.log.error('Testbed service does not have name " + \
1713                            "and importers')
1714        return masters, pmasters
1715
1716    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1717        """
1718        Create the ssh keys necessary for interconnecting the potral nodes and
1719        the global hosts file for letting each segment know about the IP
1720        addresses in play.  Save these into the repo.  Add attributes to the
1721        autorizer allowing access controllers to download them and return a set
1722        of attributes that inform the segments where to find this stuff.  Mau
1723        raise service_errors in if there are problems.
1724        """
1725        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1726        gw_secretkey_base = "fed.%s" % self.ssh_type
1727        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1728        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1729
1730        try:
1731            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1732        except ValueError:
1733            raise service_error(service_error.server_config, 
1734                    "Bad key type (%s)" % self.ssh_type)
1735
1736
1737        # Copy configuration files into the remote file store
1738        # The config urlpath
1739        configpath = "/%s/config" % expid
1740        # The config file system location
1741        configdir ="%s%s" % ( self.repodir, configpath)
1742        try:
1743            os.makedirs(configdir)
1744        except EnvironmentError, e:
1745            raise service_error(service_error.internal,
1746                    "Cannot create config directory: %s" % e)
1747        try:
1748            f = open("%s/hosts" % configdir, "w")
1749            print >> f, string.join(hosts, '\n')
1750            f.close()
1751        except EnvironmentError, e:
1752            raise service_error(service_error.internal, 
1753                    "Cannot write hosts file: %s" % e)
1754        try:
1755            copy_file("%s" % gw_pubkey, "%s/%s" % \
1756                    (configdir, gw_pubkey_base))
1757            copy_file("%s" % gw_secretkey, "%s/%s" % \
1758                    (configdir, gw_secretkey_base))
1759        except EnvironmentError, e:
1760            raise service_error(service_error.internal, 
1761                    "Cannot copy keyfiles: %s" % e)
1762
1763        # Allow the individual testbeds to access the configuration files,
1764        # again by setting an attribute for the relevant pathnames on each
1765        # allocation principal.  Yeah, that's a long list comprehension.
1766        self.append_experiment_authorization(expid, set([
1767            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1768                    for tb in tbparams.keys() \
1769                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1770
1771        attrs = [ 
1772                {
1773                    'attribute': 'ssh_pubkey', 
1774                    'value': '%s/%s/config/%s' % \
1775                            (self.repo_url, expid, gw_pubkey_base)
1776                },
1777                {
1778                    'attribute': 'ssh_secretkey', 
1779                    'value': '%s/%s/config/%s' % \
1780                            (self.repo_url, expid, gw_secretkey_base)
1781                },
1782                {
1783                    'attribute': 'hosts', 
1784                    'value': '%s/%s/config/hosts' % \
1785                            (self.repo_url, expid)
1786                },
1787            ]
1788        return attrs
1789
1790
1791    def get_vtopo(self, req, fid):
1792        """
1793        Return the stored virtual topology for this experiment
1794        """
1795        rv = None
1796        state = None
1797
1798        req = req.get('VtopoRequestBody', None)
1799        if not req:
1800            raise service_error(service_error.req,
1801                    "Bad request format (no VtopoRequestBody)")
1802        exp = req.get('experiment', None)
1803        if exp:
1804            if exp.has_key('fedid'):
1805                key = exp['fedid']
1806                keytype = "fedid"
1807            elif exp.has_key('localname'):
1808                key = exp['localname']
1809                keytype = "localname"
1810            else:
1811                raise service_error(service_error.req, "Unknown lookup type")
1812        else:
1813            raise service_error(service_error.req, "No request?")
1814
1815        proof = self.check_experiment_access(fid, key)
1816
1817        self.state_lock.acquire()
1818        if self.state.has_key(key):
1819            if self.state[key].has_key('vtopo'):
1820                rv = { 'experiment' : {keytype: key },
1821                        'vtopo': self.state[key]['vtopo'],
1822                        'proof': proof.to_dict(), 
1823                    }
1824            else:
1825                state = self.state[key]['experimentStatus']
1826        self.state_lock.release()
1827
1828        if rv: return rv
1829        else: 
1830            if state:
1831                raise service_error(service_error.partial, 
1832                        "Not ready: %s" % state)
1833            else:
1834                raise service_error(service_error.req, "No such experiment")
1835
1836    def get_vis(self, req, fid):
1837        """
1838        Return the stored visualization for this experiment
1839        """
1840        rv = None
1841        state = None
1842
1843        req = req.get('VisRequestBody', None)
1844        if not req:
1845            raise service_error(service_error.req,
1846                    "Bad request format (no VisRequestBody)")
1847        exp = req.get('experiment', None)
1848        if exp:
1849            if exp.has_key('fedid'):
1850                key = exp['fedid']
1851                keytype = "fedid"
1852            elif exp.has_key('localname'):
1853                key = exp['localname']
1854                keytype = "localname"
1855            else:
1856                raise service_error(service_error.req, "Unknown lookup type")
1857        else:
1858            raise service_error(service_error.req, "No request?")
1859
1860        proof = self.check_experiment_access(fid, key)
1861
1862        self.state_lock.acquire()
1863        if self.state.has_key(key):
1864            if self.state[key].has_key('vis'):
1865                rv =  { 'experiment' : {keytype: key },
1866                        'vis': self.state[key]['vis'],
1867                        'proof': proof.to_dict(), 
1868                        }
1869            else:
1870                state = self.state[key]['experimentStatus']
1871        self.state_lock.release()
1872
1873        if rv: return rv
1874        else:
1875            if state:
1876                raise service_error(service_error.partial, 
1877                        "Not ready: %s" % state)
1878            else:
1879                raise service_error(service_error.req, "No such experiment")
1880
1881   
1882    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1883            top):
1884        """
1885        Store the various data that have changed in the experiment state
1886        between when it was started and the beginning of resource allocation.
1887        This is basically the information about each local allocation.  This
1888        fills in the values of the placeholder allocation in the state.  It
1889        also collects the access proofs and returns them as dicts for a
1890        response message.
1891        """
1892        # save federant information
1893        for k in allocated.keys():
1894            tbparams[k]['federant'] = {
1895                    'name': [ { 'localname' : eid} ],
1896                    'allocID' : tbparams[k]['allocID'],
1897                    'uri': tbparams[k]['uri'],
1898                    'proof': tbparams[k]['proof'],
1899                }
1900
1901        self.state_lock.acquire()
1902        self.state[eid]['vtopo'] = vtopo
1903        self.state[eid]['vis'] = vis
1904        self.state[eid]['experimentdescription'] = \
1905                { 'topdldescription': top.to_dict() }
1906        self.state[eid]['federant'] = \
1907                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1908                    if tbparams[tb].has_key('federant') ]
1909        # Access proofs for the response message
1910        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1911                    for p in tbparams[k]['federant']['proof']]
1912        if self.state_filename: 
1913            self.write_state()
1914        self.state_lock.release()
1915        return proofs
1916
1917    def clear_placeholder(self, eid, expid, tmpdir):
1918        """
1919        Clear the placeholder and remove any allocated temporary dir.
1920        """
1921
1922        self.state_lock.acquire()
1923        del self.state[eid]
1924        del self.state[expid]
1925        if self.state_filename: self.write_state()
1926        self.state_lock.release()
1927        if tmpdir and self.cleanup:
1928            self.remove_dirs(tmpdir)
1929
1930    # end of create_experiment sub-functions
1931
1932    def create_experiment(self, req, fid):
1933        """
1934        The external interface to experiment creation called from the
1935        dispatcher.
1936
1937        Creates a working directory, splits the incoming description using the
1938        splitter script and parses out the various subsections using the
1939        classes above.  Once each sub-experiment is created, use pooled threads
1940        to instantiate them and start it all up.
1941        """
1942
1943        req = req.get('CreateRequestBody', None)
1944        if req:
1945            key = self.get_experiment_key(req)
1946        else:
1947            raise service_error(service_error.req,
1948                    "Bad request format (no CreateRequestBody)")
1949
1950        # Import information from the requester
1951        if self.auth.import_credentials(data_list=req.get('credential', [])):
1952            self.auth.save()
1953
1954        # Make sure that the caller can talk to us
1955        proof = self.check_experiment_access(fid, key)
1956
1957        # Install the testbed map entries supplied with the request into a copy
1958        # of the testbed map.
1959        tbmap = dict(self.tbmap)
1960        for m in req.get('testbedmap', []):
1961            if 'testbed' in m and 'uri' in m:
1962                tbmap[m['testbed']] = m['uri']
1963
1964        # a place to work
1965        try:
1966            tmpdir = tempfile.mkdtemp(prefix="split-")
1967            os.mkdir(tmpdir+"/keys")
1968        except EnvironmentError:
1969            raise service_error(service_error.internal, "Cannot create tmp dir")
1970
1971        tbparams = { }
1972
1973        eid, expid, expcert_file = \
1974                self.get_experiment_ids_and_start(key, tmpdir)
1975
1976        # This catches exceptions to clear the placeholder if necessary
1977        try: 
1978            if not (eid and expid):
1979                raise service_error(service_error.internal, 
1980                        "Cannot find local experiment info!?")
1981
1982            top = self.get_topology(req, tmpdir)
1983            self.confirm_software(top)
1984            # Assign the IPs
1985            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1986            # Find the testbeds to look up
1987            tb_hosts = { }
1988            testbeds = [ ]
1989            for e in top.elements:
1990                if isinstance(e, topdl.Computer):
1991                    tb = e.get_attribute('testbed') or 'default'
1992                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1993                    else: 
1994                        tb_hosts[tb] = [ e.name ]
1995                        testbeds.append(tb)
1996
1997            masters, pmasters = self.get_testbed_services(req, testbeds)
1998            allocated = { }         # Testbeds we can access
1999            topo ={ }               # Sub topologies
2000            connInfo = { }          # Connection information
2001
2002            self.split_topology(top, topo, testbeds)
2003
2004            self.get_access_to_testbeds(testbeds, fid, allocated, 
2005                    tbparams, masters, tbmap, expid, expcert_file)
2006
2007            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2008
2009            part = experiment_partition(self.auth, self.store_url, tbmap,
2010                    self.muxmax, self.direct_transit)
2011            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2012                    connInfo, expid)
2013
2014            auth_attrs = set()
2015            # Now get access to the dynamic testbeds (those added above)
2016            for tb in [ t for t in topo if t not in allocated]:
2017                self.get_access(tb, tbparams, fid, masters, tbmap, 
2018                        expid, expcert_file)
2019                allocated[tb] = 1
2020                store_keys = topo[tb].get_attribute('store_keys')
2021                # Give the testbed access to keys it exports or imports
2022                if store_keys:
2023                    auth_attrs.update(set([
2024                        (tbparams[tb]['allocID']['fedid'], sk) \
2025                                for sk in store_keys.split(" ")]))
2026
2027            if auth_attrs:
2028                self.append_experiment_authorization(expid, auth_attrs)
2029
2030            # transit and disconnected testbeds may not have a connInfo entry.
2031            # Fill in the blanks.
2032            for t in allocated.keys():
2033                if not connInfo.has_key(t):
2034                    connInfo[t] = { }
2035
2036            self.wrangle_software(expid, top, topo, tbparams)
2037
2038            vtopo = topdl.topology_to_vtopo(top)
2039            vis = self.genviz(vtopo)
2040            proofs = self.save_federant_information(allocated, tbparams, 
2041                    eid, vtopo, vis, top)
2042        except service_error, e:
2043            # If something goes wrong in the parse (usually an access error)
2044            # clear the placeholder state.  From here on out the code delays
2045            # exceptions.  Failing at this point returns a fault to the remote
2046            # caller.
2047            self.clear_placeholder(eid, expid, tmpdir)
2048            raise e
2049
2050        # Start the background swapper and return the starting state.  From
2051        # here on out, the state will stick around a while.
2052
2053        # Create a logger that logs to the experiment's state object as well as
2054        # to the main log file.
2055        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2056        alloc_collector = self.list_log(self.state[eid]['log'])
2057        h = logging.StreamHandler(alloc_collector)
2058        # XXX: there should be a global one of these rather than repeating the
2059        # code.
2060        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2061                    '%d %b %y %H:%M:%S'))
2062        alloc_log.addHandler(h)
2063
2064        # Start a thread to do the resource allocation
2065        t  = Thread(target=self.allocate_resources,
2066                args=(allocated, masters, eid, expid, tbparams, 
2067                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2068                    connInfo, tbmap, expcert_file),
2069                name=eid)
2070        t.start()
2071
2072        rv = {
2073                'experimentID': [
2074                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2075                ],
2076                'experimentStatus': 'starting',
2077                'proof': [ proof.to_dict() ] + proofs,
2078            }
2079
2080        return rv
2081   
2082    def get_experiment_fedid(self, key):
2083        """
2084        find the fedid associated with the localname key in the state database.
2085        """
2086
2087        rv = None
2088        self.state_lock.acquire()
2089        if self.state.has_key(key):
2090            if isinstance(self.state[key], dict):
2091                try:
2092                    kl = [ f['fedid'] for f in \
2093                            self.state[key]['experimentID']\
2094                                if f.has_key('fedid') ]
2095                except KeyError:
2096                    self.state_lock.release()
2097                    raise service_error(service_error.internal, 
2098                            "No fedid for experiment %s when getting "+\
2099                                    "fedid(!?)" % key)
2100                if len(kl) == 1:
2101                    rv = kl[0]
2102                else:
2103                    self.state_lock.release()
2104                    raise service_error(service_error.internal, 
2105                            "multiple fedids for experiment %s when " +\
2106                                    "getting fedid(!?)" % key)
2107            else:
2108                self.state_lock.release()
2109                raise service_error(service_error.internal, 
2110                        "Unexpected state for %s" % key)
2111        self.state_lock.release()
2112        return rv
2113
2114    def check_experiment_access(self, fid, key):
2115        """
2116        Confirm that the fid has access to the experiment.  Though a request
2117        may be made in terms of a local name, the access attribute is always
2118        the experiment's fedid.
2119        """
2120        if not isinstance(key, fedid):
2121            key = self.get_experiment_fedid(key)
2122
2123        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2124
2125        if access_ok:
2126            return proof
2127        else:
2128            raise service_error(service_error.access, "Access Denied",
2129                proof)
2130
2131
2132    def get_handler(self, path, fid):
2133        """
2134        Perhaps surprisingly named, this function handles HTTP GET requests to
2135        this server (SOAP requests are POSTs).
2136        """
2137        self.log.info("Get handler %s %s" % (path, fid))
2138        # XXX: log proofs?
2139        if self.auth.check_attribute(fid, path):
2140            return ("%s/%s" % (self.repodir, path), "application/binary")
2141        else:
2142            return (None, None)
2143
2144    def clean_info_response(self, rv, proof):
2145        """
2146        Remove the information in the experiment's state object that is not in
2147        the info response.
2148        """
2149        # Remove the owner info (should always be there, but...)
2150        if rv.has_key('owner'): del rv['owner']
2151        if 'auth' in rv: del rv['auth']
2152
2153        # Convert the log into the allocationLog parameter and remove the
2154        # log entry (with defensive programming)
2155        if rv.has_key('log'):
2156            rv['allocationLog'] = "".join(rv['log'])
2157            del rv['log']
2158        else:
2159            rv['allocationLog'] = ""
2160
2161        if rv['experimentStatus'] != 'active':
2162            if rv.has_key('federant'): del rv['federant']
2163        else:
2164            # remove the allocationID and uri info from each federant
2165            for f in rv.get('federant', []):
2166                if f.has_key('allocID'): del f['allocID']
2167                if f.has_key('uri'): del f['uri']
2168        rv['proof'] = proof.to_dict()
2169
2170        return rv
2171
2172    def get_info(self, req, fid):
2173        """
2174        Return all the stored info about this experiment
2175        """
2176        rv = None
2177
2178        req = req.get('InfoRequestBody', None)
2179        if not req:
2180            raise service_error(service_error.req,
2181                    "Bad request format (no InfoRequestBody)")
2182        exp = req.get('experiment', None)
2183        if exp:
2184            if exp.has_key('fedid'):
2185                key = exp['fedid']
2186                keytype = "fedid"
2187            elif exp.has_key('localname'):
2188                key = exp['localname']
2189                keytype = "localname"
2190            else:
2191                raise service_error(service_error.req, "Unknown lookup type")
2192        else:
2193            raise service_error(service_error.req, "No request?")
2194
2195        proof = self.check_experiment_access(fid, key)
2196
2197        # The state may be massaged by the service function that called
2198        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2199        # state.
2200        self.state_lock.acquire()
2201        if self.state.has_key(key):
2202            rv = copy.deepcopy(self.state[key])
2203        self.state_lock.release()
2204
2205        if rv:
2206            return self.clean_info_response(rv, proof)
2207        else:
2208            raise service_error(service_error.req, "No such experiment")
2209
2210    def get_multi_info(self, req, fid):
2211        """
2212        Return all the stored info that this fedid can access
2213        """
2214        rv = { 'info': [ ], 'proof': [ ] }
2215
2216        self.state_lock.acquire()
2217        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2218            try:
2219                proof = self.check_experiment_access(fid, key)
2220            except service_error, e:
2221                if e.code == service_error.access:
2222                    continue
2223                else:
2224                    self.state_lock.release()
2225                    raise e
2226
2227            if self.state.has_key(key):
2228                e = copy.deepcopy(self.state[key])
2229                e = self.clean_info_response(e, proof)
2230                rv['info'].append(e)
2231                rv['proof'].append(proof.to_dict())
2232        self.state_lock.release()
2233        return rv
2234
2235    def check_termination_status(self, fed_exp, force):
2236        """
2237        Confirm that the experiment is sin a valid state to stop (or force it)
2238        return the state - invalid states for deletion and force settings cause
2239        exceptions.
2240        """
2241        self.state_lock.acquire()
2242        status = fed_exp.get('experimentStatus', None)
2243
2244        if status:
2245            if status in ('starting', 'terminating'):
2246                if not force:
2247                    self.state_lock.release()
2248                    raise service_error(service_error.partial, 
2249                            'Experiment still being created or destroyed')
2250                else:
2251                    self.log.warning('Experiment in %s state ' % status + \
2252                            'being terminated by force.')
2253            self.state_lock.release()
2254            return status
2255        else:
2256            # No status??? trouble
2257            self.state_lock.release()
2258            raise service_error(service_error.internal,
2259                    "Experiment has no status!?")
2260
2261
2262    def get_termination_info(self, fed_exp):
2263        ids = []
2264        term_params = { }
2265        self.state_lock.acquire()
2266        #  experimentID is a list of dicts that are self-describing
2267        #  identifiers.  This finds all the fedids and localnames - the
2268        #  keys of self.state - and puts them into ids, which is used to delete
2269        #  the state after everything is swapped out.
2270        for id in fed_exp.get('experimentID', []):
2271            if 'fedid' in id: 
2272                ids.append(id['fedid'])
2273                repo = "%s" % id['fedid']
2274            if 'localname' in id: ids.append(id['localname'])
2275
2276        # Get the experimentAccess - the principal for this experiment.  It
2277        # is this principal to which credentials have been delegated, and
2278        # as which the experiment controller must act.
2279        if 'experimentAccess' in fed_exp and \
2280                'X509' in fed_exp['experimentAccess']:
2281            expcert = fed_exp['experimentAccess']['X509']
2282        else:
2283            expcert = None
2284
2285        # Collect the allocation/segment ids into a dict keyed by the fedid
2286        # of the allocation (or a monotonically increasing integer) that
2287        # contains a tuple of uri, aid (which is a dict...)
2288        for i, fed in enumerate(fed_exp.get('federant', [])):
2289            try:
2290                uri = fed['uri']
2291                aid = fed['allocID']
2292                k = fed['allocID'].get('fedid', i)
2293            except KeyError, e:
2294                continue
2295            term_params[k] = (uri, aid)
2296        # Change the experiment state
2297        fed_exp['experimentStatus'] = 'terminating'
2298        if self.state_filename: self.write_state()
2299        self.state_lock.release()
2300
2301        return ids, term_params, expcert, repo
2302
2303
2304    def deallocate_resources(self, term_params, expcert, status, force, 
2305            dealloc_log):
2306        tmpdir = None
2307        # This try block makes sure the tempdir is cleared
2308        try:
2309            # If no expcert, try the deallocation as the experiment
2310            # controller instance.
2311            if expcert and self.auth_type != 'legacy': 
2312                try:
2313                    tmpdir = tempfile.mkdtemp(prefix="term-")
2314                except EnvironmentError:
2315                    raise service_error(service_error.internal, 
2316                            "Cannot create tmp dir")
2317                cert_file = self.make_temp_certfile(expcert, tmpdir)
2318                pw = None
2319            else: 
2320                cert_file = self.cert_file
2321                pw = self.cert_pwd
2322
2323            # Stop everyone.  NB, wait_for_all waits until a thread starts
2324            # and then completes, so we can't wait if nothing starts.  So,
2325            # no tbparams, no start.
2326            if len(term_params) > 0:
2327                tp = thread_pool(self.nthreads)
2328                for k, (uri, aid) in term_params.items():
2329                    # Create and start a thread to stop the segment
2330                    tp.wait_for_slot()
2331                    t  = pooled_thread(\
2332                            target=self.terminate_segment(log=dealloc_log,
2333                                testbed=uri,
2334                                cert_file=cert_file, 
2335                                cert_pwd=pw,
2336                                trusted_certs=self.trusted_certs,
2337                                caller=self.call_TerminateSegment),
2338                            args=(uri, aid), name=k,
2339                            pdata=tp, trace_file=self.trace_file)
2340                    t.start()
2341                # Wait for completions
2342                tp.wait_for_all_done()
2343
2344            # release the allocations (failed experiments have done this
2345            # already, and starting experiments may be in odd states, so we
2346            # ignore errors releasing those allocations
2347            try: 
2348                for k, (uri, aid)  in term_params.items():
2349                    self.release_access(None, aid, uri=uri,
2350                            cert_file=cert_file, cert_pwd=pw)
2351            except service_error, e:
2352                if status != 'failed' and not force:
2353                    raise e
2354
2355        # Clean up the tmpdir no matter what
2356        finally:
2357            if tmpdir: self.remove_dirs(tmpdir)
2358
2359    def terminate_experiment(self, req, fid):
2360        """
2361        Swap this experiment out on the federants and delete the shared
2362        information
2363        """
2364        tbparams = { }
2365        req = req.get('TerminateRequestBody', None)
2366        if not req:
2367            raise service_error(service_error.req,
2368                    "Bad request format (no TerminateRequestBody)")
2369
2370        key = self.get_experiment_key(req, 'experiment')
2371        proof = self.check_experiment_access(fid, key)
2372        exp = req.get('experiment', False)
2373        force = req.get('force', False)
2374
2375        dealloc_list = [ ]
2376
2377
2378        # Create a logger that logs to the dealloc_list as well as to the main
2379        # log file.
2380        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2381        h = logging.StreamHandler(self.list_log(dealloc_list))
2382        # XXX: there should be a global one of these rather than repeating the
2383        # code.
2384        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2385                    '%d %b %y %H:%M:%S'))
2386        dealloc_log.addHandler(h)
2387
2388        self.state_lock.acquire()
2389        fed_exp = self.state.get(key, None)
2390        self.state_lock.release()
2391        repo = None
2392
2393        if fed_exp:
2394            status = self.check_termination_status(fed_exp, force)
2395            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2396            self.deallocate_resources(term_params, expcert, status, force, 
2397                    dealloc_log)
2398
2399            # Remove the terminated experiment
2400            self.state_lock.acquire()
2401            for id in ids:
2402                self.clear_experiment_authorization(id, need_state_lock=False)
2403                if id in self.state: del self.state[id]
2404
2405            if self.state_filename: self.write_state()
2406            self.state_lock.release()
2407
2408            # Delete any synch points associated with this experiment.  All
2409            # synch points begin with the fedid of the experiment.
2410            fedid_keys = set(["fedid:%s" % f for f in ids \
2411                    if isinstance(f, fedid)])
2412            for k in self.synch_store.all_keys():
2413                try:
2414                    if len(k) > 45 and k[0:46] in fedid_keys:
2415                        self.synch_store.del_value(k)
2416                except synch_store.BadDeletionError:
2417                    pass
2418            self.write_store()
2419
2420            # Remove software and other cached stuff from the filesystem.
2421            if repo:
2422                self.remove_dirs("%s/%s" % (self.repodir, repo))
2423       
2424            return { 
2425                    'experiment': exp , 
2426                    'deallocationLog': string.join(dealloc_list, ''),
2427                    'proof': [proof.to_dict()],
2428                    }
2429        else:
2430            raise service_error(service_error.req, "No saved state")
2431
2432
2433    def GetValue(self, req, fid):
2434        """
2435        Get a value from the synchronized store
2436        """
2437        req = req.get('GetValueRequestBody', None)
2438        if not req:
2439            raise service_error(service_error.req,
2440                    "Bad request format (no GetValueRequestBody)")
2441       
2442        name = req.get('name', None)
2443        wait = req.get('wait', False)
2444        rv = { 'name': name }
2445
2446        if not name:
2447            raise service_error(service_error.req, "No name?")
2448
2449        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2450
2451        if access_ok:
2452            self.log.debug("[GetValue] asking for %s " % name)
2453            try:
2454                v = self.synch_store.get_value(name, wait)
2455            except synch_store.RevokedKeyError:
2456                # No more synch on this key
2457                raise service_error(service_error.federant, 
2458                        "Synch key %s revoked" % name)
2459            if v is not None:
2460                rv['value'] = v
2461            rv['proof'] = proof.to_dict()
2462            self.log.debug("[GetValue] got %s from %s" % (v, name))
2463            return rv
2464        else:
2465            raise service_error(service_error.access, "Access Denied",
2466                    proof=proof)
2467       
2468
2469    def SetValue(self, req, fid):
2470        """
2471        Set a value in the synchronized store
2472        """
2473        req = req.get('SetValueRequestBody', None)
2474        if not req:
2475            raise service_error(service_error.req,
2476                    "Bad request format (no SetValueRequestBody)")
2477       
2478        name = req.get('name', None)
2479        v = req.get('value', '')
2480
2481        if not name:
2482            raise service_error(service_error.req, "No name?")
2483
2484        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2485
2486        if access_ok:
2487            try:
2488                self.synch_store.set_value(name, v)
2489                self.write_store()
2490                self.log.debug("[SetValue] set %s to %s" % (name, v))
2491            except synch_store.CollisionError:
2492                # Translate into a service_error
2493                raise service_error(service_error.req,
2494                        "Value already set: %s" %name)
2495            except synch_store.RevokedKeyError:
2496                # No more synch on this key
2497                raise service_error(service_error.federant, 
2498                        "Synch key %s revoked" % name)
2499                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2500        else:
2501            raise service_error(service_error.access, "Access Denied",
2502                    proof=proof)
Note: See TracBrowser for help on using the repository browser.