source: fedd/federation/experiment_control.py @ faea607

axis_examplecompt_changesinfo-ops
Last change on this file since faea607 was faea607, checked in by Ted Faber <faber@…>, 13 years ago

Move thread pools into their own package. Starting on #10

  • Property mode set to 100644
File size: 85.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from authorizer import abac_authorizer
34from thread_pool import thread_pool, pooled_thread
35
36import topdl
37import list_log
38from ip_allocator import ip_allocator
39from ip_addr import ip_addr
40
41
42class nullHandler(logging.Handler):
43    def emit(self, record): pass
44
45fl = logging.getLogger("fedd.experiment_control")
46fl.addHandler(nullHandler())
47
48
49# Right now, no support for composition.
50class federated_service:
51    def __init__(self, name, exporter=None, importers=None, params=None, 
52            reqs=None, portal=None):
53        self.name=name
54        self.exporter=exporter
55        if importers is None: self.importers = []
56        else: self.importers=importers
57        if params is None: self.params = { }
58        else: self.params = params
59        if reqs is None: self.reqs = []
60        else: self.reqs = reqs
61
62        if portal is not None:
63            self.portal = portal
64        else:
65            self.portal = (name in federated_service.needs_portal)
66
67    def __str__(self):
68        return "name %s export %s import %s params %s reqs %s" % \
69                (self.name, self.exporter, self.importers, self.params,
70                        [ (r['name'], r['visibility']) for r in self.reqs] )
71
72    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
73
74class experiment_control_local:
75    """
76    Control of experiments that this system can directly access.
77
78    Includes experiment creation, termination and information dissemination.
79    Thred safe.
80    """
81
82    class ssh_cmd_timeout(RuntimeError): pass
83   
84    call_RequestAccess = service_caller('RequestAccess')
85    call_ReleaseAccess = service_caller('ReleaseAccess')
86    call_StartSegment = service_caller('StartSegment')
87    call_TerminateSegment = service_caller('TerminateSegment')
88    call_Ns2Topdl = service_caller('Ns2Topdl')
89
90    def __init__(self, config=None, auth=None):
91        """
92        Intialize the various attributes, most from the config object
93        """
94
95        def parse_tarfile_list(tf):
96            """
97            Parse a tarfile list from the configuration.  This is a set of
98            paths and tarfiles separated by spaces.
99            """
100            rv = [ ]
101            if tf is not None:
102                tl = tf.split()
103                while len(tl) > 1:
104                    p, t = tl[0:2]
105                    del tl[0:2]
106                    rv.append((p, t))
107            return rv
108
109        self.list_log = list_log.list_log
110
111        self.cert_file = config.get("experiment_control", "cert_file")
112        if self.cert_file:
113            self.cert_pwd = config.get("experiment_control", "cert_pwd")
114        else:
115            self.cert_file = config.get("globals", "cert_file")
116            self.cert_pwd = config.get("globals", "cert_pwd")
117
118        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
119                or config.get("globals", "trusted_certs")
120
121        self.repodir = config.get("experiment_control", "repodir")
122        self.repo_url = config.get("experiment_control", "repo_url", 
123                "https://users.isi.deterlab.net:23235");
124
125        self.exp_stem = "fed-stem"
126        self.log = logging.getLogger("fedd.experiment_control")
127        set_log_level(config, "experiment_control", self.log)
128        self.muxmax = 2
129        self.nthreads = 10
130        self.randomize_experiments = False
131
132        self.splitter = None
133        self.ssh_keygen = "/usr/bin/ssh-keygen"
134        self.ssh_identity_file = None
135
136
137        self.debug = config.getboolean("experiment_control", "create_debug")
138        self.cleanup = not config.getboolean("experiment_control", 
139                "leave_tmpfiles")
140        self.state_filename = config.get("experiment_control", 
141                "experiment_state")
142        self.store_filename = config.get("experiment_control", 
143                "synch_store")
144        self.store_url = config.get("experiment_control", "store_url")
145        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
146        self.fedkit = parse_tarfile_list(\
147                config.get("experiment_control", "fedkit"))
148        self.gatewaykit = parse_tarfile_list(\
149                config.get("experiment_control", "gatewaykit"))
150        accessdb_file = config.get("experiment_control", "accessdb")
151
152        self.ssh_pubkey_file = config.get("experiment_control", 
153                "ssh_pubkey_file")
154        self.ssh_privkey_file = config.get("experiment_control",
155                "ssh_privkey_file")
156        dt = config.get("experiment_control", "direct_transit")
157        self.auth_type = config.get('experiment_control', 'auth_type') \
158                or 'legacy'
159        self.auth_dir = config.get('experiment_control', 'auth_dir')
160        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
161        else: self.direct_transit = [ ]
162        # NB for internal master/slave ops, not experiment setup
163        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
164
165        self.overrides = set([])
166        ovr = config.get('experiment_control', 'overrides')
167        if ovr:
168            for o in ovr.split(","):
169                o = o.strip()
170                if o.startswith('fedid:'): o = o[len('fedid:'):]
171                self.overrides.add(fedid(hexstr=o))
172
173        self.state = { }
174        self.state_lock = Lock()
175        self.tclsh = "/usr/local/bin/otclsh"
176        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
177                config.get("experiment_control", "tcl_splitter",
178                        "/usr/testbed/lib/ns2ir/parse.tcl")
179        mapdb_file = config.get("experiment_control", "mapdb")
180        self.trace_file = sys.stderr
181
182        self.def_expstart = \
183                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
184                "/tmp/federate";
185        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
186                "FEDDIR/hosts";
187        self.def_gwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
189                "/tmp/bridge.log";
190        self.def_mgwstart = \
191                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
192                "/tmp/bridge.log";
193        self.def_gwimage = "FBSD61-TUNNEL2";
194        self.def_gwtype = "pc";
195        self.local_access = { }
196
197        if self.auth_type == 'legacy':
198            if auth:
199                self.auth = auth
200            else:
201                self.log.error( "[access]: No authorizer initialized, " +\
202                        "creating local one.")
203                auth = authorizer()
204        elif self.auth_type == 'abac':
205            self.auth = abac_authorizer(load=self.auth_dir)
206        else:
207            raise service_error(service_error.internal, 
208                    "Unknown auth_type: %s" % self.auth_type)
209
210
211        if self.ssh_pubkey_file:
212            try:
213                f = open(self.ssh_pubkey_file, 'r')
214                self.ssh_pubkey = f.read()
215                f.close()
216            except EnvironmentError:
217                raise service_error(service_error.internal,
218                        "Cannot read sshpubkey")
219        else:
220            raise service_error(service_error.internal, 
221                    "No SSH public key file?")
222
223        if not self.ssh_privkey_file:
224            raise service_error(service_error.internal, 
225                    "No SSH public key file?")
226
227
228        if mapdb_file:
229            self.read_mapdb(mapdb_file)
230        else:
231            self.log.warn("[experiment_control] No testbed map, using defaults")
232            self.tbmap = { 
233                    'deter':'https://users.isi.deterlab.net:23235',
234                    'emulab':'https://users.isi.deterlab.net:23236',
235                    'ucb':'https://users.isi.deterlab.net:23237',
236                    }
237
238        if accessdb_file:
239                self.read_accessdb(accessdb_file)
240        else:
241            raise service_error(service_error.internal,
242                    "No accessdb specified in config")
243
244        # Grab saved state.  OK to do this w/o locking because it's read only
245        # and only one thread should be in existence that can see self.state at
246        # this point.
247        if self.state_filename:
248            self.read_state()
249
250        if self.store_filename:
251            self.read_store()
252        else:
253            self.log.warning("No saved synch store")
254            self.synch_store = synch_store
255
256        # Dispatch tables
257        self.soap_services = {\
258                'New': soap_handler('New', self.new_experiment),
259                'Create': soap_handler('Create', self.create_experiment),
260                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
261                'Vis': soap_handler('Vis', self.get_vis),
262                'Info': soap_handler('Info', self.get_info),
263                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
264                'Terminate': soap_handler('Terminate', 
265                    self.terminate_experiment),
266                'GetValue': soap_handler('GetValue', self.GetValue),
267                'SetValue': soap_handler('SetValue', self.SetValue),
268        }
269
270        self.xmlrpc_services = {\
271                'New': xmlrpc_handler('New', self.new_experiment),
272                'Create': xmlrpc_handler('Create', self.create_experiment),
273                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
274                'Vis': xmlrpc_handler('Vis', self.get_vis),
275                'Info': xmlrpc_handler('Info', self.get_info),
276                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
277                'Terminate': xmlrpc_handler('Terminate',
278                    self.terminate_experiment),
279                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
280                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
281        }
282
283    # Call while holding self.state_lock
284    def write_state(self):
285        """
286        Write a new copy of experiment state after copying the existing state
287        to a backup.
288
289        State format is a simple pickling of the state dictionary.
290        """
291        if os.access(self.state_filename, os.W_OK):
292            copy_file(self.state_filename, \
293                    "%s.bak" % self.state_filename)
294        try:
295            f = open(self.state_filename, 'w')
296            pickle.dump(self.state, f)
297        except EnvironmentError, e:
298            self.log.error("Can't write file %s: %s" % \
299                    (self.state_filename, e))
300        except pickle.PicklingError, e:
301            self.log.error("Pickling problem: %s" % e)
302        except TypeError, e:
303            self.log.error("Pickling problem (TypeError): %s" % e)
304
305    @staticmethod
306    def get_alloc_ids(state):
307        """
308        Pull the fedids of the identifiers of each allocation from the
309        state.  Again, a dict dive that's best isolated.
310
311        Used by read_store and read state
312        """
313
314        return [ f['allocID']['fedid'] 
315                for f in state.get('federant',[]) \
316                    if f.has_key('allocID') and \
317                        f['allocID'].has_key('fedid')]
318
319    # Call while holding self.state_lock
320    def read_state(self):
321        """
322        Read a new copy of experiment state.  Old state is overwritten.
323
324        State format is a simple pickling of the state dictionary.
325        """
326       
327        def get_experiment_id(state):
328            """
329            Pull the fedid experimentID out of the saved state.  This is kind
330            of a gross walk through the dict.
331            """
332
333            if state.has_key('experimentID'):
334                for e in state['experimentID']:
335                    if e.has_key('fedid'):
336                        return e['fedid']
337                else:
338                    return None
339            else:
340                return None
341
342        try:
343            f = open(self.state_filename, "r")
344            self.state = pickle.load(f)
345            self.log.debug("[read_state]: Read state from %s" % \
346                    self.state_filename)
347        except EnvironmentError, e:
348            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
349                    % (self.state_filename, e))
350        except pickle.UnpicklingError, e:
351            self.log.warning(("[read_state]: No saved state: " + \
352                    "Unpickling failed: %s") % e)
353       
354        for s in self.state.values():
355            try:
356
357                eid = get_experiment_id(s)
358                if eid : 
359                    if self.auth_type == 'legacy':
360                        # XXX: legacy
361                        # Give the owner rights to the experiment
362                        self.auth.set_attribute(s['owner'], eid)
363                        # And holders of the eid as well
364                        self.auth.set_attribute(eid, eid)
365                        # allow overrides to control experiments as well
366                        for o in self.overrides:
367                            self.auth.set_attribute(o, eid)
368                        # Set permissions to allow reading of the software
369                        # repo, if any, as well.
370                        for a in self.get_alloc_ids(s):
371                            self.auth.set_attribute(a, 'repo/%s' % eid)
372                else:
373                    raise KeyError("No experiment id")
374            except KeyError, e:
375                self.log.warning("[read_state]: State ownership or identity " +\
376                        "misformatted in %s: %s" % (self.state_filename, e))
377
378
379    def read_accessdb(self, accessdb_file):
380        """
381        Read the mapping from fedids that can create experiments to their name
382        in the 3-level access namespace.  All will be asserted from this
383        testbed and can include the local username and porject that will be
384        asserted on their behalf by this fedd.  Each fedid is also added to the
385        authorization system with the "create" attribute.
386        """
387        self.accessdb = {}
388        # These are the regexps for parsing the db
389        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
390        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
391                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
392        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
393                "\s*->\s*(" + name_expr + ")\s*$")
394        lineno = 0
395
396        # Parse the mappings and store in self.authdb, a dict of
397        # fedid -> (proj, user)
398        try:
399            f = open(accessdb_file, "r")
400            for line in f:
401                lineno += 1
402                line = line.strip()
403                if len(line) == 0 or line.startswith('#'):
404                    continue
405                m = project_line.match(line)
406                if m:
407                    fid = fedid(hexstr=m.group(1))
408                    project, user = m.group(2,3)
409                    if not self.accessdb.has_key(fid):
410                        self.accessdb[fid] = []
411                    self.accessdb[fid].append((project, user))
412                    continue
413
414                m = user_line.match(line)
415                if m:
416                    fid = fedid(hexstr=m.group(1))
417                    project = None
418                    user = m.group(2)
419                    if not self.accessdb.has_key(fid):
420                        self.accessdb[fid] = []
421                    self.accessdb[fid].append((project, user))
422                    continue
423                self.log.warn("[experiment_control] Error parsing access " +\
424                        "db %s at line %d" %  (accessdb_file, lineno))
425        except EnvironmentError:
426            raise service_error(service_error.internal,
427                    ("Error opening/reading %s as experiment " +\
428                            "control accessdb") %  accessdb_file)
429        f.close()
430
431        # Initialize the authorization attributes
432        # XXX: legacy
433        if self.auth_type == 'legacy':
434            for fid in self.accessdb.keys():
435                self.auth.set_attribute(fid, 'create')
436                self.auth.set_attribute(fid, 'new')
437
438    def read_mapdb(self, file):
439        """
440        Read a simple colon separated list of mappings for the
441        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
442        """
443
444        self.tbmap = { }
445        lineno =0
446        try:
447            f = open(file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if line.startswith('#') or len(line) == 0:
452                    continue
453                try:
454                    label, url = line.split(':', 1)
455                    self.tbmap[label] = url
456                except ValueError, e:
457                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
458                            "map db: %s %s" % (lineno, line, e))
459        except EnvironmentError, e:
460            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
461                    "open %s: %s" % (file, e))
462        f.close()
463
464    def read_store(self):
465        try:
466            self.synch_store = synch_store()
467            self.synch_store.load(self.store_filename)
468            self.log.debug("[read_store]: Read store from %s" % \
469                    self.store_filename)
470        except EnvironmentError, e:
471            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
472                    % (self.state_filename, e))
473            self.synch_store = synch_store()
474
475        # Set the initial permissions on data in the store.  XXX: This ad hoc
476        # authorization attribute initialization is getting out of hand.
477        # XXX: legacy
478        if self.auth_type == 'legacy':
479            for k in self.synch_store.all_keys():
480                try:
481                    if k.startswith('fedid:'):
482                        fid = fedid(hexstr=k[6:46])
483                        if self.state.has_key(fid):
484                            for a in self.get_alloc_ids(self.state[fid]):
485                                self.auth.set_attribute(a, k)
486                except ValueError, e:
487                    self.log.warn("Cannot deduce permissions for %s" % k)
488
489
490    def write_store(self):
491        """
492        Write a new copy of synch_store after writing current state
493        to a backup.  We use the internal synch_store pickle method to avoid
494        incinsistent data.
495
496        State format is a simple pickling of the store.
497        """
498        if os.access(self.store_filename, os.W_OK):
499            copy_file(self.store_filename, \
500                    "%s.bak" % self.store_filename)
501        try:
502            self.synch_store.save(self.store_filename)
503        except EnvironmentError, e:
504            self.log.error("Can't write file %s: %s" % \
505                    (self.store_filename, e))
506        except TypeError, e:
507            self.log.error("Pickling problem (TypeError): %s" % e)
508
509       
510    def generate_ssh_keys(self, dest, type="rsa" ):
511        """
512        Generate a set of keys for the gateways to use to talk.
513
514        Keys are of type type and are stored in the required dest file.
515        """
516        valid_types = ("rsa", "dsa")
517        t = type.lower();
518        if t not in valid_types: raise ValueError
519        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
520
521        try:
522            trace = open("/dev/null", "w")
523        except EnvironmentError:
524            raise service_error(service_error.internal,
525                    "Cannot open /dev/null??");
526
527        # May raise CalledProcessError
528        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
529        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
530        if rv != 0:
531            raise service_error(service_error.internal, 
532                    "Cannot generate nonce ssh keys.  %s return code %d" \
533                            % (self.ssh_keygen, rv))
534
535    def gentopo(self, str):
536        """
537        Generate the topology dtat structure from the splitter's XML
538        representation of it.
539
540        The topology XML looks like:
541            <experiment>
542                <nodes>
543                    <node><vname></vname><ips>ip1:ip2</ips></node>
544                </nodes>
545                <lans>
546                    <lan>
547                        <vname></vname><vnode></vnode><ip></ip>
548                        <bandwidth></bandwidth><member>node:port</member>
549                    </lan>
550                </lans>
551        """
552        class topo_parse:
553            """
554            Parse the topology XML and create the dats structure.
555            """
556            def __init__(self):
557                # Typing of the subelements for data conversion
558                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
559                self.int_subelements = ( 'bandwidth',)
560                self.float_subelements = ( 'delay',)
561                # The final data structure
562                self.nodes = [ ]
563                self.lans =  [ ]
564                self.topo = { \
565                        'node': self.nodes,\
566                        'lan' : self.lans,\
567                    }
568                self.element = { }  # Current element being created
569                self.chars = ""     # Last text seen
570
571            def end_element(self, name):
572                # After each sub element the contents is added to the current
573                # element or to the appropriate list.
574                if name == 'node':
575                    self.nodes.append(self.element)
576                    self.element = { }
577                elif name == 'lan':
578                    self.lans.append(self.element)
579                    self.element = { }
580                elif name in self.str_subelements:
581                    self.element[name] = self.chars
582                    self.chars = ""
583                elif name in self.int_subelements:
584                    self.element[name] = int(self.chars)
585                    self.chars = ""
586                elif name in self.float_subelements:
587                    self.element[name] = float(self.chars)
588                    self.chars = ""
589
590            def found_chars(self, data):
591                self.chars += data.rstrip()
592
593
594        tp = topo_parse();
595        parser = xml.parsers.expat.ParserCreate()
596        parser.EndElementHandler = tp.end_element
597        parser.CharacterDataHandler = tp.found_chars
598
599        parser.Parse(str)
600
601        return tp.topo
602       
603
604    def genviz(self, topo):
605        """
606        Generate the visualization the virtual topology
607        """
608
609        neato = "/usr/local/bin/neato"
610        # These are used to parse neato output and to create the visualization
611        # file.
612        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
613        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
614                "%s</type></node>"
615
616        try:
617            # Node names
618            nodes = [ n['vname'] for n in topo['node'] ]
619            topo_lans = topo['lan']
620        except KeyError, e:
621            raise service_error(service_error.internal, "Bad topology: %s" %e)
622
623        lans = { }
624        links = { }
625
626        # Walk through the virtual topology, organizing the connections into
627        # 2-node connections (links) and more-than-2-node connections (lans).
628        # When a lan is created, it's added to the list of nodes (there's a
629        # node in the visualization for the lan).
630        for l in topo_lans:
631            if links.has_key(l['vname']):
632                if len(links[l['vname']]) < 2:
633                    links[l['vname']].append(l['vnode'])
634                else:
635                    nodes.append(l['vname'])
636                    lans[l['vname']] = links[l['vname']]
637                    del links[l['vname']]
638                    lans[l['vname']].append(l['vnode'])
639            elif lans.has_key(l['vname']):
640                lans[l['vname']].append(l['vnode'])
641            else:
642                links[l['vname']] = [ l['vnode'] ]
643
644
645        # Open up a temporary file for dot to turn into a visualization
646        try:
647            df, dotname = tempfile.mkstemp()
648            dotfile = os.fdopen(df, 'w')
649        except EnvironmentError:
650            raise service_error(service_error.internal,
651                    "Failed to open file in genviz")
652
653        try:
654            dnull = open('/dev/null', 'w')
655        except EnvironmentError:
656            service_error(service_error.internal,
657                    "Failed to open /dev/null in genviz")
658
659        # Generate a dot/neato input file from the links, nodes and lans
660        try:
661            print >>dotfile, "graph G {"
662            for n in nodes:
663                print >>dotfile, '\t"%s"' % n
664            for l in links.keys():
665                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
666            for l in lans.keys():
667                for n in lans[l]:
668                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
669            print >>dotfile, "}"
670            dotfile.close()
671        except TypeError:
672            raise service_error(service_error.internal,
673                    "Single endpoint link in vtopo")
674        except EnvironmentError:
675            raise service_error(service_error.internal, "Cannot write dot file")
676
677        # Use dot to create a visualization
678        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
679                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
680                close_fds=True)
681        dnull.close()
682
683        # Translate dot to vis format
684        vis_nodes = [ ]
685        vis = { 'node': vis_nodes }
686        for line in dot.stdout:
687            m = vis_re.match(line)
688            if m:
689                vn = m.group(1)
690                vis_node = {'name': vn, \
691                        'x': float(m.group(2)),\
692                        'y' : float(m.group(3)),\
693                    }
694                if vn in links.keys() or vn in lans.keys():
695                    vis_node['type'] = 'lan'
696                else:
697                    vis_node['type'] = 'node'
698                vis_nodes.append(vis_node)
699        rv = dot.wait()
700
701        os.remove(dotname)
702        if rv == 0 : return vis
703        else: return None
704
705    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
706        """
707        Get access to testbed through fedd and set the parameters for that tb
708        """
709        def get_export_project(svcs):
710            """
711            Look through for the list of federated_service for this testbed
712            objects for a project_export service, and extract the project
713            parameter.
714            """
715
716            pe = [s for s in svcs if s.name=='project_export']
717            if len(pe) == 1:
718                return pe[0].params.get('project', None)
719            elif len(pe) == 0:
720                return None
721            else:
722                raise service_error(service_error.req,
723                        "More than one project export is not supported")
724
725        uri = tbmap.get(testbed_base(tb), None)
726        if not uri:
727            raise service_error(service_error.server_config, 
728                    "Unknown testbed: %s" % tb)
729
730        export_svcs = masters.get(tb,[])
731        import_svcs = [ s for m in masters.values() \
732                for s in m \
733                    if tb in s.importers ]
734
735        export_project = get_export_project(export_svcs)
736
737        # Tweak search order so that if there are entries in access_user that
738        # have a project matching the export project, we try them first
739        if export_project: 
740            access_sequence = [ (p, u) for p, u in access_user \
741                    if p == export_project] 
742            access_sequence.extend([(p, u) for p, u in access_user \
743                    if p != export_project]) 
744        else: 
745            access_sequence = access_user
746
747        for p, u in access_sequence: 
748            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
749                    "to %s") %  ((p or "None"), u, uri))
750
751            if p:
752                # Request with user and project specified
753                req = {\
754                        'credential': [ "project: %s" % p, "user: %s"  % u],
755                    }
756            else:
757                # Request with only user specified
758                req = {\
759                        'credential': [ 'user: %s' % u ],
760                    }
761
762            # Make the service request from the services we're importing and
763            # exporting.  Keep track of the export request ids so we can
764            # collect the resulting info from the access response.
765            e_keys = { }
766            if import_svcs or export_svcs:
767                req['service'] = [ ]
768
769                for i, s in enumerate(import_svcs):
770                    idx = 'import%d' % i
771                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
772                    if s.params:
773                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
774                                for k, v in s.params.items()]
775                    req['service'].append(sr)
776
777                for i, s in enumerate(export_svcs):
778                    idx = 'export%d' % i
779                    e_keys[idx] = s
780                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
781                    if s.params:
782                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
783                                for k, v in s.params.items()]
784                    req['service'].append(sr)
785
786            # node resources if any
787            if nodes != None and len(nodes) > 0:
788                rnodes = [ ]
789                for n in nodes:
790                    rn = { }
791                    image, hw, count = n.split(":")
792                    if image: rn['image'] = [ image ]
793                    if hw: rn['hardware'] = [ hw ]
794                    if count and int(count) >0 : rn['count'] = int(count)
795                    rnodes.append(rn)
796                req['resources']= { }
797                req['resources']['node'] = rnodes
798
799            try:
800                if self.local_access.has_key(uri):
801                    # Local access call
802                    req = { 'RequestAccessRequestBody' : req }
803                    r = self.local_access[uri].RequestAccess(req, 
804                            fedid(file=self.cert_file))
805                    r = { 'RequestAccessResponseBody' : r }
806                else:
807                    r = self.call_RequestAccess(uri, req, 
808                            self.cert_file, self.cert_pwd, self.trusted_certs)
809            except service_error, e:
810                if e.code == service_error.access:
811                    self.log.debug("[get_access] Access denied")
812                    r = None
813                    continue
814                else:
815                    raise e
816
817            if r.has_key('RequestAccessResponseBody'):
818                # Through to here we have a valid response, not a fault.
819                # Access denied is a fault, so something better or worse than
820                # access denied has happened.
821                r = r['RequestAccessResponseBody']
822                self.log.debug("[get_access] Access granted")
823                break
824            else:
825                raise service_error(service_error.protocol,
826                        "Bad proxy response")
827       
828        if not r:
829            raise service_error(service_error.access, 
830                    "Access denied by %s (%s)" % (tb, uri))
831
832        tbparam[tb] = { 
833                "allocID" : r['allocID'],
834                "uri": uri,
835                }
836
837        # Collect the responses corresponding to the services this testbed
838        # exports.  These will be the service requests that we will include in
839        # the start segment requests (with appropriate visibility values) to
840        # import and export the segments.
841        for s in r.get('service', []):
842            id = s.get('id', None)
843            if id and id in e_keys:
844                e_keys[id].reqs.append(s)
845
846        # Add attributes to parameter space.  We don't allow attributes to
847        # overlay any parameters already installed.
848        for a in r.get('fedAttr', []):
849            try:
850                if a['attribute'] and \
851                        isinstance(a['attribute'], basestring)\
852                        and not tbparam[tb].has_key(a['attribute'].lower()):
853                    tbparam[tb][a['attribute'].lower()] = a['value']
854            except KeyError:
855                self.log.error("Bad attribute in response: %s" % a)
856
857    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
858            cert_pwd=None):
859        """
860        Release access to testbed through fedd
861        """
862
863        if not uri and tbmap:
864            uri = tbmap.get(tb, None)
865        if not uri:
866            raise service_error(service_error.server_config, 
867                    "Unknown testbed: %s" % tb)
868
869        if self.local_access.has_key(uri):
870            resp = self.local_access[uri].ReleaseAccess(\
871                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
872                    fedid(file=cert_file))
873            resp = { 'ReleaseAccessResponseBody': resp } 
874        else:
875            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
876                    cert_file, cert_pwd, self.trusted_certs)
877
878        # better error coding
879
880    def remote_ns2topdl(self, uri, desc):
881
882        req = {
883                'description' : { 'ns2description': desc },
884            }
885
886        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
887                self.trusted_certs)
888
889        if r.has_key('Ns2TopdlResponseBody'):
890            r = r['Ns2TopdlResponseBody']
891            ed = r.get('experimentdescription', None)
892            if ed.has_key('topdldescription'):
893                return topdl.Topology(**ed['topdldescription'])
894            else:
895                raise service_error(service_error.protocol, 
896                        "Bad splitter response (no output)")
897        else:
898            raise service_error(service_error.protocol, "Bad splitter response")
899
900    class start_segment:
901        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
902                cert_pwd=None, trusted_certs=None, caller=None,
903                log_collector=None):
904            self.log = log
905            self.debug = debug
906            self.cert_file = cert_file
907            self.cert_pwd = cert_pwd
908            self.trusted_certs = None
909            self.caller = caller
910            self.testbed = testbed
911            self.log_collector = log_collector
912            self.response = None
913            self.node = { }
914
915        def make_map(self, resp):
916            for e in resp.get('embedding', []):
917                if 'toponame' in e and 'physname' in e:
918                    self.node[e['toponame']] = e['physname'][0]
919
920        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
921            req = {
922                    'allocID': { 'fedid' : aid }, 
923                    'segmentdescription': { 
924                        'topdldescription': topo.to_dict(),
925                    },
926                }
927
928            if connInfo:
929                req['connection'] = connInfo
930
931            import_svcs = [ s for m in masters.values() \
932                    for s in m if self.testbed in s.importers]
933
934            if import_svcs or self.testbed in masters:
935                req['service'] = []
936
937            for s in import_svcs:
938                for r in s.reqs:
939                    sr = copy.deepcopy(r)
940                    sr['visibility'] = 'import';
941                    req['service'].append(sr)
942
943            for s in masters.get(self.testbed, []):
944                for r in s.reqs:
945                    sr = copy.deepcopy(r)
946                    sr['visibility'] = 'export';
947                    req['service'].append(sr)
948
949            if attrs:
950                req['fedAttr'] = attrs
951
952            try:
953                self.log.debug("Calling StartSegment at %s " % uri)
954                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
955                        self.trusted_certs)
956                if r.has_key('StartSegmentResponseBody'):
957                    lval = r['StartSegmentResponseBody'].get('allocationLog',
958                            None)
959                    if lval and self.log_collector:
960                        for line in  lval.splitlines(True):
961                            self.log_collector.write(line)
962                    self.make_map(r['StartSegmentResponseBody'])
963                    self.response = r
964                else:
965                    raise service_error(service_error.internal, 
966                            "Bad response!?: %s" %r)
967                return True
968            except service_error, e:
969                self.log.error("Start segment failed on %s: %s" % \
970                        (self.testbed, e))
971                return False
972
973
974
975    class terminate_segment:
976        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
977                cert_pwd=None, trusted_certs=None, caller=None):
978            self.log = log
979            self.debug = debug
980            self.cert_file = cert_file
981            self.cert_pwd = cert_pwd
982            self.trusted_certs = None
983            self.caller = caller
984            self.testbed = testbed
985
986        def __call__(self, uri, aid ):
987            req = {
988                    'allocID': aid , 
989                }
990            try:
991                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
992                        self.trusted_certs)
993                return True
994            except service_error, e:
995                self.log.error("Terminate segment failed on %s: %s" % \
996                        (self.testbed, e))
997                return False
998   
999
1000    def allocate_resources(self, allocated, masters, eid, expid, 
1001            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1002            attrs=None, connInfo={}, tbmap=None, expcert=None):
1003
1004        started = { }           # Testbeds where a sub-experiment started
1005                                # successfully
1006
1007        # XXX
1008        fail_soft = False
1009
1010        if tbmap is None: tbmap = { }
1011
1012        log = alloc_log or self.log
1013
1014        tp = thread_pool(self.nthreads)
1015        threads = [ ]
1016        starters = [ ]
1017
1018        if expcert:
1019            cert = expcert
1020            pw = None
1021        else:
1022            cert = self.cert_file
1023            pw = self.cert_pwd
1024
1025        for tb in allocated.keys():
1026            # Create and start a thread to start the segment, and save it
1027            # to get the return value later
1028            tb_attrs = copy.copy(attrs)
1029            tp.wait_for_slot()
1030            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1031            base, suffix = split_testbed(tb)
1032            if suffix:
1033                tb_attrs.append({'attribute': 'experiment_name', 
1034                    'value': "%s-%s" % (eid, suffix)})
1035            else:
1036                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1037            if not uri:
1038                raise service_error(service_error.internal, 
1039                        "Unknown testbed %s !?" % tb)
1040
1041            if tbparams[tb].has_key('allocID') and \
1042                    tbparams[tb]['allocID'].has_key('fedid'):
1043                aid = tbparams[tb]['allocID']['fedid']
1044            else:
1045                raise service_error(service_error.internal, 
1046                        "No alloc id for testbed %s !?" % tb)
1047
1048            s = self.start_segment(log=log, debug=self.debug,
1049                    testbed=tb, cert_file=cert,
1050                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1051                    caller=self.call_StartSegment,
1052                    log_collector=log_collector)
1053            starters.append(s)
1054            t  = pooled_thread(\
1055                    target=s, name=tb,
1056                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1057                    pdata=tp, trace_file=self.trace_file)
1058            threads.append(t)
1059            t.start()
1060
1061        # Wait until all finish (keep pinging the log, though)
1062        mins = 0
1063        revoked = False
1064        while not tp.wait_for_all_done(60.0):
1065            mins += 1
1066            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1067                    % mins)
1068            if not revoked and \
1069                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1070                # a testbed has failed.  Revoke this experiment's
1071                # synchronizarion values so that sub experiments will not
1072                # deadlock waiting for synchronization that will never happen
1073                self.log.info("A subexperiment has failed to swap in, " + \
1074                        "revoking synch keys")
1075                var_key = "fedid:%s" % expid
1076                for k in self.synch_store.all_keys():
1077                    if len(k) > 45 and k[0:46] == var_key:
1078                        self.synch_store.revoke_key(k)
1079                revoked = True
1080
1081        failed = [ t.getName() for t in threads if not t.rv ]
1082        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1083
1084        # If one failed clean up, unless fail_soft is set
1085        if failed:
1086            if not fail_soft:
1087                tp.clear()
1088                for tb in succeeded:
1089                    # Create and start a thread to stop the segment
1090                    tp.wait_for_slot()
1091                    uri = tbparams[tb]['uri']
1092                    t  = pooled_thread(\
1093                            target=self.terminate_segment(log=log,
1094                                testbed=tb,
1095                                cert_file=cert, 
1096                                cert_pwd=pw,
1097                                trusted_certs=self.trusted_certs,
1098                                caller=self.call_TerminateSegment),
1099                            args=(uri, tbparams[tb]['federant']['allocID']),
1100                            name=tb,
1101                            pdata=tp, trace_file=self.trace_file)
1102                    t.start()
1103                # Wait until all finish (if any are being stopped)
1104                if succeeded:
1105                    tp.wait_for_all_done()
1106
1107                # release the allocations
1108                for tb in tbparams.keys():
1109                    try:
1110                        self.release_access(tb, tbparams[tb]['allocID'], 
1111                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1112                                cert_file=cert, cert_pwd=pw)
1113                    except service_error, e:
1114                        self.log.warn("Error releasing access: %s" % e.desc)
1115                # Remove the placeholder
1116                self.state_lock.acquire()
1117                self.state[eid]['experimentStatus'] = 'failed'
1118                if self.state_filename: self.write_state()
1119                self.state_lock.release()
1120                # Remove the repo dir
1121                self.remove_dirs("%s/%s" %(self.repodir, expid))
1122                # Walk up tmpdir, deleting as we go
1123                if self.cleanup:
1124                    self.remove_dirs(tmpdir)
1125                else:
1126                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1127
1128
1129                log.error("Swap in failed on %s" % ",".join(failed))
1130                return
1131        else:
1132            # Walk through the successes and gather the virtual to physical
1133            # mapping.
1134            embedding = [ ]
1135            for s in starters:
1136                for k, v in s.node.items():
1137                    embedding.append({
1138                        'toponame': k, 
1139                        'physname': [ v],
1140                        'testbed': s.testbed
1141                        })
1142            log.info("[start_segment]: Experiment %s active" % eid)
1143
1144
1145        # Walk up tmpdir, deleting as we go
1146        if self.cleanup:
1147            self.remove_dirs(tmpdir)
1148        else:
1149            log.debug("[start_experiment]: not removing %s" % tmpdir)
1150
1151        # Insert the experiment into our state and update the disk copy.
1152        self.state_lock.acquire()
1153        self.state[expid]['experimentStatus'] = 'active'
1154        self.state[eid] = self.state[expid]
1155        self.state[eid]['experimentdescription']['topdldescription'] = \
1156                top.to_dict()
1157        self.state[eid]['embedding'] = embedding
1158        if self.state_filename: self.write_state()
1159        self.state_lock.release()
1160        return
1161
1162
1163    def add_kit(self, e, kit):
1164        """
1165        Add a Software object created from the list of (install, location)
1166        tuples passed as kit  to the software attribute of an object e.  We
1167        do this enough to break out the code, but it's kind of a hack to
1168        avoid changing the old tuple rep.
1169        """
1170
1171        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1172
1173        if isinstance(e.software, list): e.software.extend(s)
1174        else: e.software = s
1175
1176
1177    def create_experiment_state(self, fid, req, expid, expcert,
1178            state='starting'):
1179        """
1180        Create the initial entry in the experiment's state.  The expid and
1181        expcert are the experiment's fedid and certifacte that represents that
1182        ID, which are installed in the experiment state.  If the request
1183        includes a suggested local name that is used if possible.  If the local
1184        name is already taken by an experiment owned by this user that has
1185        failed, it is overwritten.  Otherwise new letters are added until a
1186        valid localname is found.  The generated local name is returned.
1187        """
1188
1189        if req.has_key('experimentID') and \
1190                req['experimentID'].has_key('localname'):
1191            overwrite = False
1192            eid = req['experimentID']['localname']
1193            # If there's an old failed experiment here with the same local name
1194            # and accessible by this user, we'll overwrite it, otherwise we'll
1195            # fall through and do the collision avoidance.
1196            old_expid = self.get_experiment_fedid(eid)
1197            if old_expid and self.check_experiment_access(fid, old_expid):
1198                self.state_lock.acquire()
1199                status = self.state[eid].get('experimentStatus', None)
1200                if status and status == 'failed':
1201                    # remove the old access attribute
1202                    self.auth.unset_attribute(fid, old_expid)
1203                    self.auth.save()
1204                    overwrite = True
1205                    del self.state[eid]
1206                    del self.state[old_expid]
1207                self.state_lock.release()
1208            self.state_lock.acquire()
1209            while (self.state.has_key(eid) and not overwrite):
1210                eid += random.choice(string.ascii_letters)
1211            # Initial state
1212            self.state[eid] = {
1213                    'experimentID' : \
1214                            [ { 'localname' : eid }, {'fedid': expid } ],
1215                    'experimentStatus': state,
1216                    'experimentAccess': { 'X509' : expcert },
1217                    'owner': fid,
1218                    'log' : [],
1219                }
1220            self.state[expid] = self.state[eid]
1221            if self.state_filename: self.write_state()
1222            self.state_lock.release()
1223        else:
1224            eid = self.exp_stem
1225            for i in range(0,5):
1226                eid += random.choice(string.ascii_letters)
1227            self.state_lock.acquire()
1228            while (self.state.has_key(eid)):
1229                eid = self.exp_stem
1230                for i in range(0,5):
1231                    eid += random.choice(string.ascii_letters)
1232            # Initial state
1233            self.state[eid] = {
1234                    'experimentID' : \
1235                            [ { 'localname' : eid }, {'fedid': expid } ],
1236                    'experimentStatus': state,
1237                    'experimentAccess': { 'X509' : expcert },
1238                    'owner': fid,
1239                    'log' : [],
1240                }
1241            self.state[expid] = self.state[eid]
1242            if self.state_filename: self.write_state()
1243            self.state_lock.release()
1244
1245        return eid
1246
1247
1248    def allocate_ips_to_topo(self, top):
1249        """
1250        Add an ip4_address attribute to all the hosts in the topology, based on
1251        the shared substrates on which they sit.  An /etc/hosts file is also
1252        created and returned as a list of hostfiles entries.  We also return
1253        the allocator, because we may need to allocate IPs to portals
1254        (specifically DRAGON portals).
1255        """
1256        subs = sorted(top.substrates, 
1257                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1258                reverse=True)
1259        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1260        ifs = { }
1261        hosts = [ ]
1262
1263        for idx, s in enumerate(subs):
1264            net_size = len(s.interfaces)+2
1265
1266            a = ips.allocate(net_size)
1267            if a :
1268                base, num = a
1269                if num < net_size: 
1270                    raise service_error(service_error.internal,
1271                            "Allocator returned wrong number of IPs??")
1272            else:
1273                raise service_error(service_error.req, 
1274                        "Cannot allocate IP addresses")
1275            mask = ips.min_alloc
1276            while mask < net_size:
1277                mask *= 2
1278
1279            netmask = ((2**32-1) ^ (mask-1))
1280
1281            base += 1
1282            for i in s.interfaces:
1283                i.attribute.append(
1284                        topdl.Attribute('ip4_address', 
1285                            "%s" % ip_addr(base)))
1286                i.attribute.append(
1287                        topdl.Attribute('ip4_netmask', 
1288                            "%s" % ip_addr(int(netmask))))
1289
1290                hname = i.element.name
1291                if ifs.has_key(hname):
1292                    hosts.append("%s\t%s-%s %s-%d" % \
1293                            (ip_addr(base), hname, s.name, hname,
1294                                ifs[hname]))
1295                else:
1296                    ifs[hname] = 0
1297                    hosts.append("%s\t%s-%s %s-%d %s" % \
1298                            (ip_addr(base), hname, s.name, hname,
1299                                ifs[hname], hname))
1300
1301                ifs[hname] += 1
1302                base += 1
1303        return hosts, ips
1304
1305    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1306            tbparams, masters, tbmap):
1307        """
1308        Request access to the various testbeds required for this instantiation
1309        (passed in as testbeds).  User, access_user, expoert_project and master
1310        are used to construct the correct requests.  Per-testbed parameters are
1311        returned in tbparams.
1312        """
1313        for tb in testbeds:
1314            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1315            allocated[tb] = 1
1316
1317    def get_abac_access_to_testbeds(self, testbeds, fid, allocated, 
1318            tbparam, masters, tbmap, expid=None, expcert=None):
1319        for tb in testbeds:
1320            self.get_abac_access(tb, tbparam, fid, masters, tbmap, expid,
1321                    expcert)
1322            allocated[tb] = 1
1323
1324    def get_abac_access(self, tb, tbparam,fid, masters, tbmap, expid=None, expcert=None):
1325        """
1326        Get access to testbed through fedd and set the parameters for that tb
1327        """
1328        def get_export_project(svcs):
1329            """
1330            Look through for the list of federated_service for this testbed
1331            objects for a project_export service, and extract the project
1332            parameter.
1333            """
1334
1335            pe = [s for s in svcs if s.name=='project_export']
1336            if len(pe) == 1:
1337                return pe[0].params.get('project', None)
1338            elif len(pe) == 0:
1339                return None
1340            else:
1341                raise service_error(service_error.req,
1342                        "More than one project export is not supported")
1343
1344        uri = tbmap.get(testbed_base(tb), None)
1345        if not uri:
1346            raise service_error(service_error.server_config, 
1347                    "Unknown testbed: %s" % tb)
1348
1349        export_svcs = masters.get(tb,[])
1350        import_svcs = [ s for m in masters.values() \
1351                for s in m \
1352                    if tb in s.importers ]
1353
1354        export_project = get_export_project(export_svcs)
1355        # Compose the credential list so that IDs come before attributes
1356        creds = set()
1357        keys = set()
1358        certs = self.auth.get_creds_for_principal(fid)
1359        if expid:
1360            certs.update(self.auth.get_creds_for_principal(expid))
1361        for c in certs:
1362            keys.add(c.issuer_cert())
1363            creds.add(c.attribute_cert())
1364        creds = list(keys) + list(creds)
1365
1366        if expcert: cert, pw = expcert, None
1367        else: cert, pw = self.cert_file, self.cert_pw
1368
1369        # Request credentials
1370        req = {
1371                'abac_credential': creds,
1372            }
1373        # Make the service request from the services we're importing and
1374        # exporting.  Keep track of the export request ids so we can
1375        # collect the resulting info from the access response.
1376        e_keys = { }
1377        if import_svcs or export_svcs:
1378            req['service'] = [ ]
1379
1380            for i, s in enumerate(import_svcs):
1381                idx = 'import%d' % i
1382                sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
1383                if s.params:
1384                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1385                            for k, v in s.params.items()]
1386                req['service'].append(sr)
1387
1388            for i, s in enumerate(export_svcs):
1389                idx = 'export%d' % i
1390                e_keys[idx] = s
1391                sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
1392                if s.params:
1393                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
1394                            for k, v in s.params.items()]
1395                req['service'].append(sr)
1396
1397
1398        if self.local_access.has_key(uri):
1399            # Local access call
1400            req = { 'RequestAccessRequestBody' : req }
1401            r = self.local_access[uri].RequestAccess(req, 
1402                    fedid(file=self.cert_file))
1403            r = { 'RequestAccessResponseBody' : r }
1404        else:
1405            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1406
1407        if r.has_key('RequestAccessResponseBody'):
1408            # Through to here we have a valid response, not a fault.
1409            # Access denied is a fault, so something better or worse than
1410            # access denied has happened.
1411            r = r['RequestAccessResponseBody']
1412            self.log.debug("[get_access] Access granted")
1413        else:
1414            raise service_error(service_error.protocol,
1415                        "Bad proxy response")
1416       
1417        tbparam[tb] = { 
1418                "allocID" : r['allocID'],
1419                "uri": uri,
1420                }
1421
1422        # Collect the responses corresponding to the services this testbed
1423        # exports.  These will be the service requests that we will include in
1424        # the start segment requests (with appropriate visibility values) to
1425        # import and export the segments.
1426        for s in r.get('service', []):
1427            id = s.get('id', None)
1428            if id and id in e_keys:
1429                e_keys[id].reqs.append(s)
1430
1431        # Add attributes to parameter space.  We don't allow attributes to
1432        # overlay any parameters already installed.
1433        for a in r.get('fedAttr', []):
1434            try:
1435                if a['attribute'] and \
1436                        isinstance(a['attribute'], basestring)\
1437                        and not tbparam[tb].has_key(a['attribute'].lower()):
1438                    tbparam[tb][a['attribute'].lower()] = a['value']
1439            except KeyError:
1440                self.log.error("Bad attribute in response: %s" % a)
1441
1442
1443    def split_topology(self, top, topo, testbeds):
1444        """
1445        Create the sub-topologies that are needed for experiment instantiation.
1446        """
1447        for tb in testbeds:
1448            topo[tb] = top.clone()
1449            # copy in for loop allows deletions from the original
1450            for e in [ e for e in topo[tb].elements]:
1451                etb = e.get_attribute('testbed')
1452                # NB: elements without a testbed attribute won't appear in any
1453                # sub topologies. 
1454                if not etb or etb != tb:
1455                    for i in e.interface:
1456                        for s in i.subs:
1457                            try:
1458                                s.interfaces.remove(i)
1459                            except ValueError:
1460                                raise service_error(service_error.internal,
1461                                        "Can't remove interface??")
1462                    topo[tb].elements.remove(e)
1463            topo[tb].make_indices()
1464
1465    def wrangle_software(self, expid, top, topo, tbparams):
1466        """
1467        Copy software out to the repository directory, allocate permissions and
1468        rewrite the segment topologies to look for the software in local
1469        places.
1470        """
1471
1472        # Copy the rpms and tarfiles to a distribution directory from
1473        # which the federants can retrieve them
1474        linkpath = "%s/software" %  expid
1475        softdir ="%s/%s" % ( self.repodir, linkpath)
1476        softmap = { }
1477        # These are in a list of tuples format (each kit).  This comprehension
1478        # unwraps them into a single list of tuples that initilaizes the set of
1479        # tuples.
1480        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1481                for p, t in l ])
1482        pkgs.update([x.location for e in top.elements \
1483                for x in e.software])
1484        try:
1485            os.makedirs(softdir)
1486        except EnvironmentError, e:
1487            raise service_error(
1488                    "Cannot create software directory: %s" % e)
1489        # The actual copying.  Everything's converted into a url for copying.
1490        for pkg in pkgs:
1491            loc = pkg
1492
1493            scheme, host, path = urlparse(loc)[0:3]
1494            dest = os.path.basename(path)
1495            if not scheme:
1496                if not loc.startswith('/'):
1497                    loc = "/%s" % loc
1498                loc = "file://%s" %loc
1499            try:
1500                u = urlopen(loc)
1501            except Exception, e:
1502                raise service_error(service_error.req, 
1503                        "Cannot open %s: %s" % (loc, e))
1504            try:
1505                f = open("%s/%s" % (softdir, dest) , "w")
1506                self.log.debug("Writing %s/%s" % (softdir,dest) )
1507                data = u.read(4096)
1508                while data:
1509                    f.write(data)
1510                    data = u.read(4096)
1511                f.close()
1512                u.close()
1513            except Exception, e:
1514                raise service_error(service_error.internal,
1515                        "Could not copy %s: %s" % (loc, e))
1516            path = re.sub("/tmp", "", linkpath)
1517            # XXX
1518            softmap[pkg] = \
1519                    "%s/%s/%s" %\
1520                    ( self.repo_url, path, dest)
1521
1522            # Allow the individual segments to access the software.
1523            for tb in tbparams.keys():
1524                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1525                        "/%s/%s" % ( path, dest))
1526            self.auth.save()
1527
1528        # Convert the software locations in the segments into the local
1529        # copies on this host
1530        for soft in [ s for tb in topo.values() \
1531                for e in tb.elements \
1532                    if getattr(e, 'software', False) \
1533                        for s in e.software ]:
1534            if softmap.has_key(soft.location):
1535                soft.location = softmap[soft.location]
1536
1537
1538    def new_experiment(self, req, fid):
1539        """
1540        The external interface to empty initial experiment creation called from
1541        the dispatcher.
1542
1543        Creates a working directory, splits the incoming description using the
1544        splitter script and parses out the avrious subsections using the
1545        lcasses above.  Once each sub-experiment is created, use pooled threads
1546        to instantiate them and start it all up.
1547        """
1548        req = req.get('NewRequestBody', None)
1549        if not req:
1550            raise service_error(service_error.req,
1551                    "Bad request format (no NewRequestBody)")
1552
1553        if self.auth.import_credentials(data_list=req.get('credential', [])):
1554            self.auth.save()
1555
1556        if not self.auth.check_attribute(fid, 'new'):
1557            raise service_error(service_error.access, "New access denied")
1558
1559        try:
1560            tmpdir = tempfile.mkdtemp(prefix="split-")
1561        except EnvironmentError:
1562            raise service_error(service_error.internal, "Cannot create tmp dir")
1563
1564        try:
1565            access_user = self.accessdb[fid]
1566        except KeyError:
1567            raise service_error(service_error.internal,
1568                    "Access map and authorizer out of sync in " + \
1569                            "new_experiment for fedid %s"  % fid)
1570
1571        pid = "dummy"
1572        gid = "dummy"
1573
1574        # Generate an ID for the experiment (slice) and a certificate that the
1575        # allocator can use to prove they own it.  We'll ship it back through
1576        # the encrypted connection.  If the requester supplied one, use it.
1577        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1578            expcert = req['experimentAccess']['X509']
1579            expid = fedid(certstr=expcert)
1580            self.state_lock.acquire()
1581            if expid in self.state:
1582                self.state_lock.release()
1583                raise service_error(service_error.req, 
1584                        'fedid %s identifies an existing experiment' % expid)
1585            self.state_lock.release()
1586        else:
1587            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1588
1589        #now we're done with the tmpdir, and it should be empty
1590        if self.cleanup:
1591            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1592            os.rmdir(tmpdir)
1593        else:
1594            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1595
1596        eid = self.create_experiment_state(fid, req, expid, expcert, 
1597                state='empty')
1598
1599        # Let users touch the state
1600        self.auth.set_attribute(fid, expid)
1601        self.auth.set_attribute(expid, expid)
1602        # Override fedids can manipulate state as well
1603        for o in self.overrides:
1604            self.auth.set_attribute(o, expid)
1605        self.auth.save()
1606
1607        rv = {
1608                'experimentID': [
1609                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1610                ],
1611                'experimentStatus': 'empty',
1612                'experimentAccess': { 'X509' : expcert }
1613            }
1614
1615        return rv
1616
1617    def create_experiment(self, req, fid):
1618        """
1619        The external interface to experiment creation called from the
1620        dispatcher.
1621
1622        Creates a working directory, splits the incoming description using the
1623        splitter script and parses out the various subsections using the
1624        classes above.  Once each sub-experiment is created, use pooled threads
1625        to instantiate them and start it all up.
1626        """
1627
1628        req = req.get('CreateRequestBody', None)
1629        if not req:
1630            raise service_error(service_error.req,
1631                    "Bad request format (no CreateRequestBody)")
1632
1633        # Get the experiment access
1634        exp = req.get('experimentID', None)
1635        if exp:
1636            if exp.has_key('fedid'):
1637                key = exp['fedid']
1638                expid = key
1639                eid = None
1640            elif exp.has_key('localname'):
1641                key = exp['localname']
1642                eid = key
1643                expid = None
1644            else:
1645                raise service_error(service_error.req, "Unknown lookup type")
1646        else:
1647            raise service_error(service_error.req, "No request?")
1648
1649        # Import information from the requester
1650        if self.auth.import_credentials(data_list=req.get('credential', [])):
1651            self.auth.save()
1652
1653        self.check_experiment_access(fid, key)
1654
1655        # Install the testbed map entries supplied with the request into a copy
1656        # of the testbed map.
1657        tbmap = dict(self.tbmap)
1658        for m in req.get('testbedmap', []):
1659            if 'testbed' in m and 'uri' in m:
1660                tbmap[m['testbed']] = m['uri']
1661
1662        try:
1663            tmpdir = tempfile.mkdtemp(prefix="split-")
1664            os.mkdir(tmpdir+"/keys")
1665        except EnvironmentError:
1666            raise service_error(service_error.internal, "Cannot create tmp dir")
1667
1668        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1669        gw_secretkey_base = "fed.%s" % self.ssh_type
1670        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1671        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1672        tclfile = tmpdir + "/experiment.tcl"
1673        tbparams = { }
1674        try:
1675            access_user = self.accessdb[fid]
1676        except KeyError:
1677            raise service_error(service_error.internal,
1678                    "Access map and authorizer out of sync in " + \
1679                            "create_experiment for fedid %s"  % fid)
1680
1681        pid = "dummy"
1682        gid = "dummy"
1683
1684        # The tcl parser needs to read a file so put the content into that file
1685        descr=req.get('experimentdescription', None)
1686        if descr:
1687            file_content=descr.get('ns2description', None)
1688            if file_content:
1689                try:
1690                    f = open(tclfile, 'w')
1691                    f.write(file_content)
1692                    f.close()
1693                except EnvironmentError:
1694                    raise service_error(service_error.internal,
1695                            "Cannot write temp experiment description")
1696            else:
1697                raise service_error(service_error.req, 
1698                        "Only ns2descriptions supported")
1699        else:
1700            raise service_error(service_error.req, "No experiment description")
1701
1702        self.state_lock.acquire()
1703        if self.state.has_key(key):
1704            self.state[key]['experimentStatus'] = "starting"
1705            for e in self.state[key].get('experimentID',[]):
1706                if not expid and e.has_key('fedid'):
1707                    expid = e['fedid']
1708                elif not eid and e.has_key('localname'):
1709                    eid = e['localname']
1710            if 'experimentAccess' in self.state[key] and \
1711                    'X509' in self.state[key]['experimentAccess']:
1712                expcert = self.state[key]['experimentAccess']['X509']
1713            else:
1714                expcert = None
1715        self.state_lock.release()
1716
1717        if not (eid and expid):
1718            raise service_error(service_error.internal, 
1719                    "Cannot find local experiment info!?")
1720
1721        # make a protected copy of the access certificate so the experiment
1722        # controller can act as the experiment principal.
1723        if expcert and self.auth_type != 'legacy':
1724            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1725            if not expcert_file:
1726                raise service_error(service_error.internal, 
1727                        "Cannot create temp cert file?")
1728        else:
1729            expcert_file = None
1730
1731        try: 
1732            # This catches exceptions to clear the placeholder if necessary
1733            try:
1734                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1735            except ValueError:
1736                raise service_error(service_error.server_config, 
1737                        "Bad key type (%s)" % self.ssh_type)
1738
1739            # Copy the service request
1740            tb_services = [ s for s in req.get('service',[]) ]
1741            # Translate to topdl
1742            if self.splitter_url:
1743                self.log.debug("Calling remote topdl translator at %s" % \
1744                        self.splitter_url)
1745                top = self.remote_ns2topdl(self.splitter_url, file_content)
1746            else:
1747                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1748                    str(self.muxmax), '-m', 'dummy']
1749
1750                tclcmd.extend([pid, gid, eid, tclfile])
1751
1752                self.log.debug("running local splitter %s", " ".join(tclcmd))
1753                # This is just fantastic.  As a side effect the parser copies
1754                # tb_compat.tcl into the current directory, so that directory
1755                # must be writable by the fedd user.  Doing this in the
1756                # temporary subdir ensures this is the case.
1757                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1758                        cwd=tmpdir)
1759                split_data = tclparser.stdout
1760
1761                top = topdl.topology_from_xml(file=split_data, top="experiment")
1762
1763            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1764            # Find the testbeds to look up
1765            testbeds = set([ a.value for e in top.elements \
1766                    for a in e.attribute \
1767                        if a.attribute == 'testbed'])
1768
1769            tb_hosts = { }
1770            for tb in testbeds:
1771                tb_hosts[tb] = [ e.name for e in top.elements \
1772                        if isinstance(e, topdl.Computer) and \
1773                            e.get_attribute('testbed') and \
1774                            e.get_attribute('testbed') == tb]
1775
1776            masters = { }           # testbeds exporting services
1777            pmasters = { }          # Testbeds exporting services that
1778                                    # need portals
1779            for s in tb_services:
1780                # If this is a service request with the importall field
1781                # set, fill it out.
1782
1783                if s.get('importall', False):
1784                    s['import'] = [ tb for tb in testbeds \
1785                            if tb not in s.get('export',[])]
1786                    del s['importall']
1787
1788                # Add the service to masters
1789                for tb in s.get('export', []):
1790                    if s.get('name', None):
1791                        if tb not in masters:
1792                            masters[tb] = [ ]
1793
1794                        params = { }
1795                        if 'fedAttr' in s:
1796                            for a in s['fedAttr']:
1797                                params[a.get('attribute', '')] = \
1798                                        a.get('value','')
1799
1800                        fser = federated_service(name=s['name'],
1801                                exporter=tb, importers=s.get('import',[]),
1802                                params=params)
1803                        if fser.name == 'hide_hosts' \
1804                                and 'hosts' not in fser.params:
1805                            fser.params['hosts'] = \
1806                                    ",".join(tb_hosts.get(fser.exporter, []))
1807                        masters[tb].append(fser)
1808
1809                        if fser.portal:
1810                            if tb not in pmasters: pmasters[tb] = [ fser ]
1811                            else: pmasters[tb].append(fser)
1812                    else:
1813                        self.log.error('Testbed service does not have name " + \
1814                                "and importers')
1815
1816
1817            allocated = { }         # Testbeds we can access
1818            topo ={ }               # Sub topologies
1819            connInfo = { }          # Connection information
1820
1821            if self.auth_type == 'legacy':
1822                self.get_access_to_testbeds(testbeds, access_user, allocated, 
1823                        tbparams, masters, tbmap)
1824            elif self.auth_type == 'abac':
1825                self.get_abac_access_to_testbeds(testbeds, fid, allocated, 
1826                        tbparams, masters, tbmap, expid, expcert_file)
1827            else:
1828                raise service_error(service_error.internal, 
1829                        "Unknown auth_type %s" % self.auth_type)
1830
1831            self.split_topology(top, topo, testbeds)
1832
1833            # Copy configuration files into the remote file store
1834            # The config urlpath
1835            configpath = "/%s/config" % expid
1836            # The config file system location
1837            configdir ="%s%s" % ( self.repodir, configpath)
1838            try:
1839                os.makedirs(configdir)
1840            except EnvironmentError, e:
1841                raise service_error(service_error.internal,
1842                        "Cannot create config directory: %s" % e)
1843            try:
1844                f = open("%s/hosts" % configdir, "w")
1845                f.write('\n'.join(hosts))
1846                f.close()
1847            except EnvironmentError, e:
1848                raise service_error(service_error.internal, 
1849                        "Cannot write hosts file: %s" % e)
1850            try:
1851                copy_file("%s" % gw_pubkey, "%s/%s" % \
1852                        (configdir, gw_pubkey_base))
1853                copy_file("%s" % gw_secretkey, "%s/%s" % \
1854                        (configdir, gw_secretkey_base))
1855            except EnvironmentError, e:
1856                raise service_error(service_error.internal, 
1857                        "Cannot copy keyfiles: %s" % e)
1858
1859            # Allow the individual testbeds to access the configuration files.
1860            for tb in tbparams.keys():
1861                asignee = tbparams[tb]['allocID']['fedid']
1862                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1863                    self.auth.set_attribute(asignee, "%s/%s" % \
1864                            (configpath, f))
1865
1866            part = experiment_partition(self.auth, self.store_url, tbmap,
1867                    self.muxmax, self.direct_transit)
1868            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1869                    connInfo, expid)
1870            # Now get access to the dynamic testbeds (those added above)
1871            for tb in [ t for t in topo if t not in allocated]:
1872                #XXX: ABAC
1873                if self.auth_type =='legacy':
1874                    self.get_access(tb, None, tbparams, access_user, 
1875                            masters, tbmap)
1876                elif self.auth_type == 'abac':
1877                    self.get_abac_access(tb, tbparams, fid, masters, tbmap, 
1878                            expid, expcert_file)
1879                else:
1880                    raise service_error(service_error.internal, 
1881                            "Unknown auth_type %s" % self.auth_type)
1882                allocated[tb] = 1
1883                store_keys = topo[tb].get_attribute('store_keys')
1884                # Give the testbed access to keys it exports or imports
1885                if store_keys:
1886                    for sk in store_keys.split(" "):
1887                        self.auth.set_attribute(\
1888                                tbparams[tb]['allocID']['fedid'], sk)
1889            self.auth.save()
1890
1891            self.wrangle_software(expid, top, topo, tbparams)
1892
1893            vtopo = topdl.topology_to_vtopo(top)
1894            vis = self.genviz(vtopo)
1895
1896            # save federant information
1897            for k in allocated.keys():
1898                tbparams[k]['federant'] = {
1899                        'name': [ { 'localname' : eid} ],
1900                        'allocID' : tbparams[k]['allocID'],
1901                        'uri': tbparams[k]['uri'],
1902                    }
1903
1904            self.state_lock.acquire()
1905            self.state[eid]['vtopo'] = vtopo
1906            self.state[eid]['vis'] = vis
1907            self.state[eid]['experimentdescription'] = \
1908                    { 'topdldescription': top.to_dict() }
1909            self.state[eid]['federant'] = \
1910                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1911                        if tbparams[tb].has_key('federant') ]
1912            if self.state_filename: 
1913                self.write_state()
1914            self.state_lock.release()
1915        except service_error, e:
1916            # If something goes wrong in the parse (usually an access error)
1917            # clear the placeholder state.  From here on out the code delays
1918            # exceptions.  Failing at this point returns a fault to the remote
1919            # caller.
1920
1921            self.state_lock.acquire()
1922            del self.state[eid]
1923            del self.state[expid]
1924            if self.state_filename: self.write_state()
1925            self.state_lock.release()
1926            if tmpdir and self.cleanup:
1927                self.remove_dirs(tmpdir)
1928            raise e
1929
1930
1931        # Start the background swapper and return the starting state.  From
1932        # here on out, the state will stick around a while.
1933
1934        # Let users touch the state
1935        self.auth.set_attribute(fid, expid)
1936        self.auth.set_attribute(expid, expid)
1937        # Override fedids can manipulate state as well
1938        for o in self.overrides:
1939            self.auth.set_attribute(o, expid)
1940        self.auth.save()
1941
1942        # Create a logger that logs to the experiment's state object as well as
1943        # to the main log file.
1944        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1945        alloc_collector = self.list_log(self.state[eid]['log'])
1946        h = logging.StreamHandler(alloc_collector)
1947        # XXX: there should be a global one of these rather than repeating the
1948        # code.
1949        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1950                    '%d %b %y %H:%M:%S'))
1951        alloc_log.addHandler(h)
1952       
1953        attrs = [ 
1954                {
1955                    'attribute': 'ssh_pubkey', 
1956                    'value': '%s/%s/config/%s' % \
1957                            (self.repo_url, expid, gw_pubkey_base)
1958                },
1959                {
1960                    'attribute': 'ssh_secretkey', 
1961                    'value': '%s/%s/config/%s' % \
1962                            (self.repo_url, expid, gw_secretkey_base)
1963                },
1964                {
1965                    'attribute': 'hosts', 
1966                    'value': '%s/%s/config/hosts' % \
1967                            (self.repo_url, expid)
1968                },
1969            ]
1970
1971        # transit and disconnected testbeds may not have a connInfo entry.
1972        # Fill in the blanks.
1973        for t in allocated.keys():
1974            if not connInfo.has_key(t):
1975                connInfo[t] = { }
1976
1977        # Start a thread to do the resource allocation
1978        t  = Thread(target=self.allocate_resources,
1979                args=(allocated, masters, eid, expid, tbparams, 
1980                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1981                    connInfo, tbmap, expcert_file),
1982                name=eid)
1983        t.start()
1984
1985        rv = {
1986                'experimentID': [
1987                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1988                ],
1989                'experimentStatus': 'starting',
1990            }
1991
1992        return rv
1993   
1994    def get_experiment_fedid(self, key):
1995        """
1996        find the fedid associated with the localname key in the state database.
1997        """
1998
1999        rv = None
2000        self.state_lock.acquire()
2001        if self.state.has_key(key):
2002            if isinstance(self.state[key], dict):
2003                try:
2004                    kl = [ f['fedid'] for f in \
2005                            self.state[key]['experimentID']\
2006                                if f.has_key('fedid') ]
2007                except KeyError:
2008                    self.state_lock.release()
2009                    raise service_error(service_error.internal, 
2010                            "No fedid for experiment %s when getting "+\
2011                                    "fedid(!?)" % key)
2012                if len(kl) == 1:
2013                    rv = kl[0]
2014                else:
2015                    self.state_lock.release()
2016                    raise service_error(service_error.internal, 
2017                            "multiple fedids for experiment %s when " +\
2018                                    "getting fedid(!?)" % key)
2019            else:
2020                self.state_lock.release()
2021                raise service_error(service_error.internal, 
2022                        "Unexpected state for %s" % key)
2023        self.state_lock.release()
2024        return rv
2025
2026    def check_experiment_access(self, fid, key):
2027        """
2028        Confirm that the fid has access to the experiment.  Though a request
2029        may be made in terms of a local name, the access attribute is always
2030        the experiment's fedid.
2031        """
2032        if not isinstance(key, fedid):
2033            key = self.get_experiment_fedid(key)
2034
2035        if self.auth.check_attribute(fid, key):
2036            return True
2037        else:
2038            raise service_error(service_error.access, "Access Denied")
2039
2040
2041    def get_handler(self, path, fid):
2042        self.log.info("Get handler %s %s" % (path, fid))
2043        if self.auth.check_attribute(fid, path):
2044            return ("%s/%s" % (self.repodir, path), "application/binary")
2045        else:
2046            return (None, None)
2047
2048    def get_vtopo(self, req, fid):
2049        """
2050        Return the stored virtual topology for this experiment
2051        """
2052        rv = None
2053        state = None
2054
2055        req = req.get('VtopoRequestBody', None)
2056        if not req:
2057            raise service_error(service_error.req,
2058                    "Bad request format (no VtopoRequestBody)")
2059        exp = req.get('experiment', None)
2060        if exp:
2061            if exp.has_key('fedid'):
2062                key = exp['fedid']
2063                keytype = "fedid"
2064            elif exp.has_key('localname'):
2065                key = exp['localname']
2066                keytype = "localname"
2067            else:
2068                raise service_error(service_error.req, "Unknown lookup type")
2069        else:
2070            raise service_error(service_error.req, "No request?")
2071
2072        self.check_experiment_access(fid, key)
2073
2074        self.state_lock.acquire()
2075        if self.state.has_key(key):
2076            if self.state[key].has_key('vtopo'):
2077                rv = { 'experiment' : {keytype: key },\
2078                        'vtopo': self.state[key]['vtopo'],\
2079                    }
2080            else:
2081                state = self.state[key]['experimentStatus']
2082        self.state_lock.release()
2083
2084        if rv: return rv
2085        else: 
2086            if state:
2087                raise service_error(service_error.partial, 
2088                        "Not ready: %s" % state)
2089            else:
2090                raise service_error(service_error.req, "No such experiment")
2091
2092    def get_vis(self, req, fid):
2093        """
2094        Return the stored visualization for this experiment
2095        """
2096        rv = None
2097        state = None
2098
2099        req = req.get('VisRequestBody', None)
2100        if not req:
2101            raise service_error(service_error.req,
2102                    "Bad request format (no VisRequestBody)")
2103        exp = req.get('experiment', None)
2104        if exp:
2105            if exp.has_key('fedid'):
2106                key = exp['fedid']
2107                keytype = "fedid"
2108            elif exp.has_key('localname'):
2109                key = exp['localname']
2110                keytype = "localname"
2111            else:
2112                raise service_error(service_error.req, "Unknown lookup type")
2113        else:
2114            raise service_error(service_error.req, "No request?")
2115
2116        self.check_experiment_access(fid, key)
2117
2118        self.state_lock.acquire()
2119        if self.state.has_key(key):
2120            if self.state[key].has_key('vis'):
2121                rv =  { 'experiment' : {keytype: key },\
2122                        'vis': self.state[key]['vis'],\
2123                        }
2124            else:
2125                state = self.state[key]['experimentStatus']
2126        self.state_lock.release()
2127
2128        if rv: return rv
2129        else:
2130            if state:
2131                raise service_error(service_error.partial, 
2132                        "Not ready: %s" % state)
2133            else:
2134                raise service_error(service_error.req, "No such experiment")
2135
2136    def clean_info_response(self, rv):
2137        """
2138        Remove the information in the experiment's state object that is not in
2139        the info response.
2140        """
2141        # Remove the owner info (should always be there, but...)
2142        if rv.has_key('owner'): del rv['owner']
2143
2144        # Convert the log into the allocationLog parameter and remove the
2145        # log entry (with defensive programming)
2146        if rv.has_key('log'):
2147            rv['allocationLog'] = "".join(rv['log'])
2148            del rv['log']
2149        else:
2150            rv['allocationLog'] = ""
2151
2152        if rv['experimentStatus'] != 'active':
2153            if rv.has_key('federant'): del rv['federant']
2154        else:
2155            # remove the allocationID and uri info from each federant
2156            for f in rv.get('federant', []):
2157                if f.has_key('allocID'): del f['allocID']
2158                if f.has_key('uri'): del f['uri']
2159
2160        return rv
2161
2162    def get_info(self, req, fid):
2163        """
2164        Return all the stored info about this experiment
2165        """
2166        rv = None
2167
2168        req = req.get('InfoRequestBody', None)
2169        if not req:
2170            raise service_error(service_error.req,
2171                    "Bad request format (no InfoRequestBody)")
2172        exp = req.get('experiment', None)
2173        if exp:
2174            if exp.has_key('fedid'):
2175                key = exp['fedid']
2176                keytype = "fedid"
2177            elif exp.has_key('localname'):
2178                key = exp['localname']
2179                keytype = "localname"
2180            else:
2181                raise service_error(service_error.req, "Unknown lookup type")
2182        else:
2183            raise service_error(service_error.req, "No request?")
2184
2185        self.check_experiment_access(fid, key)
2186
2187        # The state may be massaged by the service function that called
2188        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2189        # state.
2190        self.state_lock.acquire()
2191        if self.state.has_key(key):
2192            rv = copy.deepcopy(self.state[key])
2193        self.state_lock.release()
2194
2195        if rv:
2196            return self.clean_info_response(rv)
2197        else:
2198            raise service_error(service_error.req, "No such experiment")
2199
2200    def get_multi_info(self, req, fid):
2201        """
2202        Return all the stored info that this fedid can access
2203        """
2204        rv = { 'info': [ ] }
2205
2206        self.state_lock.acquire()
2207        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2208            try:
2209                self.check_experiment_access(fid, key)
2210            except service_error, e:
2211                if e.code == service_error.access:
2212                    continue
2213                else:
2214                    self.state_lock.release()
2215                    raise e
2216
2217            if self.state.has_key(key):
2218                e = copy.deepcopy(self.state[key])
2219                e = self.clean_info_response(e)
2220                rv['info'].append(e)
2221        self.state_lock.release()
2222        return rv
2223
2224    def remove_dirs(self, dir):
2225        """
2226        Remove the directory tree and all files rooted at dir.  Log any errors,
2227        but continue.
2228        """
2229        self.log.debug("[removedirs]: removing %s" % dir)
2230        try:
2231            for path, dirs, files in os.walk(dir, topdown=False):
2232                for f in files:
2233                    os.remove(os.path.join(path, f))
2234                for d in dirs:
2235                    os.rmdir(os.path.join(path, d))
2236            os.rmdir(dir)
2237        except EnvironmentError, e:
2238            self.log.error("Error deleting directory tree in %s" % e);
2239
2240    @staticmethod
2241    def make_temp_certfile(expcert, tmpdir):
2242        """
2243        make a protected copy of the access certificate so the experiment
2244        controller can act as the experiment principal.  mkstemp is the most
2245        secure way to do that. The directory should be created by
2246        mkdtemp.  Return the filename.
2247        """
2248        if expcert and tmpdir:
2249            try:
2250                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
2251                f = os.fdopen(certf, 'w')
2252                print >> f, expcert
2253                f.close()
2254            except EnvironmentError, e:
2255                raise service_error(service_error.internal, 
2256                        "Cannot create temp cert file?")
2257            return certfn
2258        else:
2259            return None
2260
2261    def terminate_experiment(self, req, fid):
2262        """
2263        Swap this experiment out on the federants and delete the shared
2264        information
2265        """
2266        tbparams = { }
2267        req = req.get('TerminateRequestBody', None)
2268        if not req:
2269            raise service_error(service_error.req,
2270                    "Bad request format (no TerminateRequestBody)")
2271        force = req.get('force', False)
2272        exp = req.get('experiment', None)
2273        if exp:
2274            if exp.has_key('fedid'):
2275                key = exp['fedid']
2276                keytype = "fedid"
2277            elif exp.has_key('localname'):
2278                key = exp['localname']
2279                keytype = "localname"
2280            else:
2281                raise service_error(service_error.req, "Unknown lookup type")
2282        else:
2283            raise service_error(service_error.req, "No request?")
2284
2285        self.check_experiment_access(fid, key)
2286
2287        dealloc_list = [ ]
2288
2289
2290        # Create a logger that logs to the dealloc_list as well as to the main
2291        # log file.
2292        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2293        h = logging.StreamHandler(self.list_log(dealloc_list))
2294        # XXX: there should be a global one of these rather than repeating the
2295        # code.
2296        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2297                    '%d %b %y %H:%M:%S'))
2298        dealloc_log.addHandler(h)
2299
2300        self.state_lock.acquire()
2301        fed_exp = self.state.get(key, None)
2302        repo = None
2303
2304        if fed_exp:
2305            # This branch of the conditional holds the lock to generate a
2306            # consistent temporary tbparams variable to deallocate experiments.
2307            # It releases the lock to do the deallocations and reacquires it to
2308            # remove the experiment state when the termination is complete.
2309
2310            # First make sure that the experiment creation is complete.
2311            status = fed_exp.get('experimentStatus', None)
2312
2313            if status:
2314                if status in ('starting', 'terminating'):
2315                    if not force:
2316                        self.state_lock.release()
2317                        raise service_error(service_error.partial, 
2318                                'Experiment still being created or destroyed')
2319                    else:
2320                        self.log.warning('Experiment in %s state ' % status + \
2321                                'being terminated by force.')
2322            else:
2323                # No status??? trouble
2324                self.state_lock.release()
2325                raise service_error(service_error.internal,
2326                        "Experiment has no status!?")
2327
2328            ids = []
2329            #  experimentID is a list of dicts that are self-describing
2330            #  identifiers.  This finds all the fedids and localnames - the
2331            #  keys of self.state - and puts them into ids.
2332            for id in fed_exp.get('experimentID', []):
2333                if id.has_key('fedid'): 
2334                    ids.append(id['fedid'])
2335                    repo = "%s" % id['fedid']
2336                if id.has_key('localname'): ids.append(id['localname'])
2337
2338            # Get the experimentAccess - the principal for this experiment.  It
2339            # is this principal to which credentials have been delegated, and
2340            # as which the experiment controller must act.
2341            if 'experimentAccess' in self.state[key] and \
2342                    'X509' in self.state[key]['experimentAccess']:
2343                expcert = self.state[key]['experimentAccess']['X509']
2344            else:
2345                expcert = None
2346            # Collect the allocation/segment ids into a dict keyed by the fedid
2347            # of the allocation (or a monotonically increasing integer) that
2348            # contains a tuple of uri, aid (which is a dict...)
2349            for i, fed in enumerate(fed_exp.get('federant', [])):
2350                try:
2351                    uri = fed['uri']
2352                    aid = fed['allocID']
2353                    k = fed['allocID'].get('fedid', i)
2354                except KeyError, e:
2355                    continue
2356                tbparams[k] = (uri, aid)
2357            fed_exp['experimentStatus'] = 'terminating'
2358            if self.state_filename: self.write_state()
2359            self.state_lock.release()
2360
2361            try:
2362                tmpdir = tempfile.mkdtemp(prefix="split-")
2363            except EnvironmentError:
2364                raise service_error(service_error.internal, 
2365                        "Cannot create tmp dir")
2366            # This try block makes sure the tempdir is cleared
2367            try:
2368                # If no expcert, try the deallocation as the experiment
2369                # controller instance.
2370                if expcert and self.auth_type != 'legacy': 
2371                    cert_file = self.make_temp_certfile(expcert, tmpdir)
2372                    pw = None
2373                else: 
2374                    cert_file = self.cert_file
2375                    pw = self.cert_pwd
2376
2377                # Stop everyone.  NB, wait_for_all waits until a thread starts
2378                # and then completes, so we can't wait if nothing starts.  So,
2379                # no tbparams, no start.
2380                if len(tbparams) > 0:
2381                    tp = thread_pool(self.nthreads)
2382                    for k in tbparams.keys():
2383                        # Create and start a thread to stop the segment
2384                        tp.wait_for_slot()
2385                        uri, aid = tbparams[k]
2386                        t  = pooled_thread(\
2387                                target=self.terminate_segment(log=dealloc_log,
2388                                    testbed=uri,
2389                                    cert_file=cert_file, 
2390                                    cert_pwd=pw,
2391                                    trusted_certs=self.trusted_certs,
2392                                    caller=self.call_TerminateSegment),
2393                                args=(uri, aid), name=k,
2394                                pdata=tp, trace_file=self.trace_file)
2395                        t.start()
2396                    # Wait for completions
2397                    tp.wait_for_all_done()
2398
2399                # release the allocations (failed experiments have done this
2400                # already, and starting experiments may be in odd states, so we
2401                # ignore errors releasing those allocations
2402                try: 
2403                    for k in tbparams.keys():
2404                        # This releases access by uri
2405                        uri, aid = tbparams[k]
2406                        self.release_access(None, aid, uri=uri,
2407                                cert_file=cert_file, cert_pwd=pw)
2408                except service_error, e:
2409                    if status != 'failed' and not force:
2410                        raise e
2411
2412            # Clean up the tmpdir no matter what
2413            finally:
2414                self.remove_dirs(tmpdir)
2415
2416            # Remove the terminated experiment
2417            self.state_lock.acquire()
2418            for id in ids:
2419                if self.state.has_key(id): del self.state[id]
2420
2421            if self.state_filename: self.write_state()
2422            self.state_lock.release()
2423
2424            # Delete any synch points associated with this experiment.  All
2425            # synch points begin with the fedid of the experiment.
2426            fedid_keys = set(["fedid:%s" % f for f in ids \
2427                    if isinstance(f, fedid)])
2428            for k in self.synch_store.all_keys():
2429                try:
2430                    if len(k) > 45 and k[0:46] in fedid_keys:
2431                        self.synch_store.del_value(k)
2432                except synch_store.BadDeletionError:
2433                    pass
2434            self.write_store()
2435
2436            # Remove software and other cached stuff from the filesystem.
2437            if repo:
2438                self.remove_dirs("%s/%s" % (self.repodir, repo))
2439               
2440            return { 
2441                    'experiment': exp , 
2442                    'deallocationLog': "".join(dealloc_list),
2443                    }
2444        else:
2445            # Don't forget to release the lock
2446            self.state_lock.release()
2447            raise service_error(service_error.req, "No saved state")
2448
2449
2450    def GetValue(self, req, fid):
2451        """
2452        Get a value from the synchronized store
2453        """
2454        req = req.get('GetValueRequestBody', None)
2455        if not req:
2456            raise service_error(service_error.req,
2457                    "Bad request format (no GetValueRequestBody)")
2458       
2459        name = req['name']
2460        wait = req['wait']
2461        rv = { 'name': name }
2462
2463        if self.auth.check_attribute(fid, name):
2464            self.log.debug("[GetValue] asking for %s " % name)
2465            try:
2466                v = self.synch_store.get_value(name, wait)
2467            except synch_store.RevokedKeyError:
2468                # No more synch on this key
2469                raise service_error(service_error.federant, 
2470                        "Synch key %s revoked" % name)
2471            if v is not None:
2472                rv['value'] = v
2473            self.log.debug("[GetValue] got %s from %s" % (v, name))
2474            return rv
2475        else:
2476            raise service_error(service_error.access, "Access Denied")
2477       
2478
2479    def SetValue(self, req, fid):
2480        """
2481        Set a value in the synchronized store
2482        """
2483        req = req.get('SetValueRequestBody', None)
2484        if not req:
2485            raise service_error(service_error.req,
2486                    "Bad request format (no SetValueRequestBody)")
2487       
2488        name = req['name']
2489        v = req['value']
2490
2491        if self.auth.check_attribute(fid, name):
2492            try:
2493                self.synch_store.set_value(name, v)
2494                self.write_store()
2495                self.log.debug("[SetValue] set %s to %s" % (name, v))
2496            except synch_store.CollisionError:
2497                # Translate into a service_error
2498                raise service_error(service_error.req,
2499                        "Value already set: %s" %name)
2500            except synch_store.RevokedKeyError:
2501                # No more synch on this key
2502                raise service_error(service_error.federant, 
2503                        "Synch key %s revoked" % name)
2504            return { 'name': name, 'value': v }
2505        else:
2506            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.