source: fedd/federation/experiment_control.py @ 4c65f67

axis_examplecompt_changesinfo-ops
Last change on this file since 4c65f67 was dee164e, checked in by Ted Faber <faber@…>, 14 years ago

Looks like internal works now.

Had to add default entries to the access list to accomodate that, and discovered that ABAC requires strings - not unicode.

Moved lookup_access into the aceess class as most should be able to use it directly now.

  • Property mode set to 100644
File size: 90.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from authorizer import abac_authorizer
34
35import topdl
36import list_log
37from ip_allocator import ip_allocator
38from ip_addr import ip_addr
39
40
41class nullHandler(logging.Handler):
42    def emit(self, record): pass
43
44fl = logging.getLogger("fedd.experiment_control")
45fl.addHandler(nullHandler())
46
47
48# Right now, no support for composition.
49class federated_service:
50    def __init__(self, name, exporter=None, importers=None, params=None, 
51            reqs=None, portal=None):
52        self.name=name
53        self.exporter=exporter
54        if importers is None: self.importers = []
55        else: self.importers=importers
56        if params is None: self.params = { }
57        else: self.params = params
58        if reqs is None: self.reqs = []
59        else: self.reqs = reqs
60
61        if portal is not None:
62            self.portal = portal
63        else:
64            self.portal = (name in federated_service.needs_portal)
65
66    def __str__(self):
67        return "name %s export %s import %s params %s reqs %s" % \
68                (self.name, self.exporter, self.importers, self.params,
69                        [ (r['name'], r['visibility']) for r in self.reqs] )
70
71    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
72
73class experiment_control_local:
74    """
75    Control of experiments that this system can directly access.
76
77    Includes experiment creation, termination and information dissemination.
78    Thred safe.
79    """
80
81    class ssh_cmd_timeout(RuntimeError): pass
82   
83    class thread_pool:
84        """
85        A class to keep track of a set of threads all invoked for the same
86        task.  Manages the mutual exclusion of the states.
87        """
88        def __init__(self, nthreads):
89            """
90            Start a pool.
91            """
92            self.changed = Condition()
93            self.started = 0
94            self.terminated = 0
95            self.nthreads = nthreads
96
97        def acquire(self):
98            """
99            Get the pool's lock.
100            """
101            self.changed.acquire()
102
103        def release(self):
104            """
105            Release the pool's lock.
106            """
107            self.changed.release()
108
109        def wait(self, timeout = None):
110            """
111            Wait for a pool thread to start or stop.
112            """
113            self.changed.wait(timeout)
114
115        def start(self):
116            """
117            Called by a pool thread to report starting.
118            """
119            self.changed.acquire()
120            self.started += 1
121            self.changed.notifyAll()
122            self.changed.release()
123
124        def terminate(self):
125            """
126            Called by a pool thread to report finishing.
127            """
128            self.changed.acquire()
129            self.terminated += 1
130            self.changed.notifyAll()
131            self.changed.release()
132
133        def clear(self):
134            """
135            Clear all pool data.
136            """
137            self.changed.acquire()
138            self.started = 0
139            self.terminated =0
140            self.changed.notifyAll()
141            self.changed.release()
142
143        def wait_for_slot(self):
144            """
145            Wait until we have a free slot to start another pooled thread
146            """
147            self.acquire()
148            while self.started - self.terminated >= self.nthreads:
149                self.wait()
150            self.release()
151
152        def wait_for_all_done(self, timeout=None):
153            """
154            Wait until all active threads finish (and at least one has
155            started).  If a timeout is given, return after waiting that long
156            for termination.  If all threads are done (and one has started in
157            the since the last clear()) return True, otherwise False.
158            """
159            if timeout:
160                deadline = time.time() + timeout
161            self.acquire()
162            while self.started == 0 or self.started > self.terminated:
163                self.wait(timeout)
164                if timeout:
165                    if time.time() > deadline:
166                        break
167                    timeout = deadline - time.time()
168            self.release()
169            return not (self.started == 0 or self.started > self.terminated)
170
171    class pooled_thread(Thread):
172        """
173        One of a set of threads dedicated to a specific task.  Uses the
174        thread_pool class above for coordination.
175        """
176        def __init__(self, group=None, target=None, name=None, args=(), 
177                kwargs={}, pdata=None, trace_file=None):
178            Thread.__init__(self, group, target, name, args, kwargs)
179            self.rv = None          # Return value of the ops in this thread
180            self.exception = None   # Exception that terminated this thread
181            self.target=target      # Target function to run on start()
182            self.args = args        # Args to pass to target
183            self.kwargs = kwargs    # Additional kw args
184            self.pdata = pdata      # thread_pool for this class
185            # Logger for this thread
186            self.log = logging.getLogger("fedd.experiment_control")
187       
188        def run(self):
189            """
190            Emulate Thread.run, except add pool data manipulation and error
191            logging.
192            """
193            if self.pdata:
194                self.pdata.start()
195
196            if self.target:
197                try:
198                    self.rv = self.target(*self.args, **self.kwargs)
199                except service_error, s:
200                    self.exception = s
201                    self.log.error("Thread exception: %s %s" % \
202                            (s.code_string(), s.desc))
203                except:
204                    self.exception = sys.exc_info()[1]
205                    self.log.error(("Unexpected thread exception: %s" +\
206                            "Trace %s") % (self.exception,\
207                                traceback.format_exc()))
208            if self.pdata:
209                self.pdata.terminate()
210
211    call_RequestAccess = service_caller('RequestAccess')
212    call_ReleaseAccess = service_caller('ReleaseAccess')
213    call_StartSegment = service_caller('StartSegment')
214    call_TerminateSegment = service_caller('TerminateSegment')
215    call_Ns2Topdl = service_caller('Ns2Topdl')
216
217    def __init__(self, config=None, auth=None):
218        """
219        Intialize the various attributes, most from the config object
220        """
221
222        def parse_tarfile_list(tf):
223            """
224            Parse a tarfile list from the configuration.  This is a set of
225            paths and tarfiles separated by spaces.
226            """
227            rv = [ ]
228            if tf is not None:
229                tl = tf.split()
230                while len(tl) > 1:
231                    p, t = tl[0:2]
232                    del tl[0:2]
233                    rv.append((p, t))
234            return rv
235
236        self.thread_with_rv = experiment_control_local.pooled_thread
237        self.thread_pool = experiment_control_local.thread_pool
238        self.list_log = list_log.list_log
239
240        self.cert_file = config.get("experiment_control", "cert_file")
241        if self.cert_file:
242            self.cert_pwd = config.get("experiment_control", "cert_pwd")
243        else:
244            self.cert_file = config.get("globals", "cert_file")
245            self.cert_pwd = config.get("globals", "cert_pwd")
246
247        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
248                or config.get("globals", "trusted_certs")
249
250        self.repodir = config.get("experiment_control", "repodir")
251        self.repo_url = config.get("experiment_control", "repo_url", 
252                "https://users.isi.deterlab.net:23235");
253
254        self.exp_stem = "fed-stem"
255        self.log = logging.getLogger("fedd.experiment_control")
256        set_log_level(config, "experiment_control", self.log)
257        self.muxmax = 2
258        self.nthreads = 10
259        self.randomize_experiments = False
260
261        self.splitter = None
262        self.ssh_keygen = "/usr/bin/ssh-keygen"
263        self.ssh_identity_file = None
264
265
266        self.debug = config.getboolean("experiment_control", "create_debug")
267        self.cleanup = not config.getboolean("experiment_control", 
268                "leave_tmpfiles")
269        self.state_filename = config.get("experiment_control", 
270                "experiment_state")
271        self.store_filename = config.get("experiment_control", 
272                "synch_store")
273        self.store_url = config.get("experiment_control", "store_url")
274        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
275        self.fedkit = parse_tarfile_list(\
276                config.get("experiment_control", "fedkit"))
277        self.gatewaykit = parse_tarfile_list(\
278                config.get("experiment_control", "gatewaykit"))
279        accessdb_file = config.get("experiment_control", "accessdb")
280
281        self.ssh_pubkey_file = config.get("experiment_control", 
282                "ssh_pubkey_file")
283        self.ssh_privkey_file = config.get("experiment_control",
284                "ssh_privkey_file")
285        dt = config.get("experiment_control", "direct_transit")
286        self.auth_type = config.get('experiment_control', 'auth_type') \
287                or 'legacy'
288        self.auth_dir = config.get('experiment_control', 'auth_dir')
289        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
290        else: self.direct_transit = [ ]
291        # NB for internal master/slave ops, not experiment setup
292        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
293
294        self.overrides = set([])
295        ovr = config.get('experiment_control', 'overrides')
296        if ovr:
297            for o in ovr.split(","):
298                o = o.strip()
299                if o.startswith('fedid:'): o = o[len('fedid:'):]
300                self.overrides.add(fedid(hexstr=o))
301
302        self.state = { }
303        self.state_lock = Lock()
304        self.tclsh = "/usr/local/bin/otclsh"
305        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
306                config.get("experiment_control", "tcl_splitter",
307                        "/usr/testbed/lib/ns2ir/parse.tcl")
308        mapdb_file = config.get("experiment_control", "mapdb")
309        self.trace_file = sys.stderr
310
311        self.def_expstart = \
312                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
313                "/tmp/federate";
314        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
315                "FEDDIR/hosts";
316        self.def_gwstart = \
317                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
318                "/tmp/bridge.log";
319        self.def_mgwstart = \
320                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
321                "/tmp/bridge.log";
322        self.def_gwimage = "FBSD61-TUNNEL2";
323        self.def_gwtype = "pc";
324        self.local_access = { }
325
326        if self.auth_type == 'legacy':
327            if auth:
328                self.auth = auth
329            else:
330                self.log.error( "[access]: No authorizer initialized, " +\
331                        "creating local one.")
332                auth = authorizer()
333        elif self.auth_type == 'abac':
334            self.auth = abac_authorizer(load=self.auth_dir)
335        else:
336            raise service_error(service_error.internal, 
337                    "Unknown auth_type: %s" % self.auth_type)
338
339
340        if self.ssh_pubkey_file:
341            try:
342                f = open(self.ssh_pubkey_file, 'r')
343                self.ssh_pubkey = f.read()
344                f.close()
345            except EnvironmentError:
346                raise service_error(service_error.internal,
347                        "Cannot read sshpubkey")
348        else:
349            raise service_error(service_error.internal, 
350                    "No SSH public key file?")
351
352        if not self.ssh_privkey_file:
353            raise service_error(service_error.internal, 
354                    "No SSH public key file?")
355
356
357        if mapdb_file:
358            self.read_mapdb(mapdb_file)
359        else:
360            self.log.warn("[experiment_control] No testbed map, using defaults")
361            self.tbmap = { 
362                    'deter':'https://users.isi.deterlab.net:23235',
363                    'emulab':'https://users.isi.deterlab.net:23236',
364                    'ucb':'https://users.isi.deterlab.net:23237',
365                    }
366
367        if accessdb_file:
368                self.read_accessdb(accessdb_file)
369        else:
370            raise service_error(service_error.internal,
371                    "No accessdb specified in config")
372
373        # Grab saved state.  OK to do this w/o locking because it's read only
374        # and only one thread should be in existence that can see self.state at
375        # this point.
376        if self.state_filename:
377            self.read_state()
378
379        if self.store_filename:
380            self.read_store()
381        else:
382            self.log.warning("No saved synch store")
383            self.synch_store = synch_store
384
385        # Dispatch tables
386        self.soap_services = {\
387                'New': soap_handler('New', self.new_experiment),
388                'Create': soap_handler('Create', self.create_experiment),
389                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
390                'Vis': soap_handler('Vis', self.get_vis),
391                'Info': soap_handler('Info', self.get_info),
392                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
393                'Terminate': soap_handler('Terminate', 
394                    self.terminate_experiment),
395                'GetValue': soap_handler('GetValue', self.GetValue),
396                'SetValue': soap_handler('SetValue', self.SetValue),
397        }
398
399        self.xmlrpc_services = {\
400                'New': xmlrpc_handler('New', self.new_experiment),
401                'Create': xmlrpc_handler('Create', self.create_experiment),
402                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
403                'Vis': xmlrpc_handler('Vis', self.get_vis),
404                'Info': xmlrpc_handler('Info', self.get_info),
405                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
406                'Terminate': xmlrpc_handler('Terminate',
407                    self.terminate_experiment),
408                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
409                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
410        }
411
412    # Call while holding self.state_lock
413    def write_state(self):
414        """
415        Write a new copy of experiment state after copying the existing state
416        to a backup.
417
418        State format is a simple pickling of the state dictionary.
419        """
420        if os.access(self.state_filename, os.W_OK):
421            copy_file(self.state_filename, \
422                    "%s.bak" % self.state_filename)
423        try:
424            f = open(self.state_filename, 'w')
425            pickle.dump(self.state, f)
426        except EnvironmentError, e:
427            self.log.error("Can't write file %s: %s" % \
428                    (self.state_filename, e))
429        except pickle.PicklingError, e:
430            self.log.error("Pickling problem: %s" % e)
431        except TypeError, e:
432            self.log.error("Pickling problem (TypeError): %s" % e)
433
434    @staticmethod
435    def get_alloc_ids(state):
436        """
437        Pull the fedids of the identifiers of each allocation from the
438        state.  Again, a dict dive that's best isolated.
439
440        Used by read_store and read state
441        """
442
443        return [ f['allocID']['fedid'] 
444                for f in state.get('federant',[]) \
445                    if f.has_key('allocID') and \
446                        f['allocID'].has_key('fedid')]
447
448    # Call while holding self.state_lock
449    def read_state(self):
450        """
451        Read a new copy of experiment state.  Old state is overwritten.
452
453        State format is a simple pickling of the state dictionary.
454        """
455       
456        def get_experiment_id(state):
457            """
458            Pull the fedid experimentID out of the saved state.  This is kind
459            of a gross walk through the dict.
460            """
461
462            if state.has_key('experimentID'):
463                for e in state['experimentID']:
464                    if e.has_key('fedid'):
465                        return e['fedid']
466                else:
467                    return None
468            else:
469                return None
470
471        try:
472            f = open(self.state_filename, "r")
473            self.state = pickle.load(f)
474            self.log.debug("[read_state]: Read state from %s" % \
475                    self.state_filename)
476        except EnvironmentError, e:
477            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
478                    % (self.state_filename, e))
479        except pickle.UnpicklingError, e:
480            self.log.warning(("[read_state]: No saved state: " + \
481                    "Unpickling failed: %s") % e)
482       
483        for s in self.state.values():
484            try:
485
486                eid = get_experiment_id(s)
487                if eid : 
488                    if self.auth_type == 'legacy':
489                        # XXX: legacy
490                        # Give the owner rights to the experiment
491                        self.auth.set_attribute(s['owner'], eid)
492                        # And holders of the eid as well
493                        self.auth.set_attribute(eid, eid)
494                        # allow overrides to control experiments as well
495                        for o in self.overrides:
496                            self.auth.set_attribute(o, eid)
497                        # Set permissions to allow reading of the software
498                        # repo, if any, as well.
499                        for a in self.get_alloc_ids(s):
500                            self.auth.set_attribute(a, 'repo/%s' % eid)
501                else:
502                    raise KeyError("No experiment id")
503            except KeyError, e:
504                self.log.warning("[read_state]: State ownership or identity " +\
505                        "misformatted in %s: %s" % (self.state_filename, e))
506
507
508    def read_accessdb(self, accessdb_file):
509        """
510        Read the mapping from fedids that can create experiments to their name
511        in the 3-level access namespace.  All will be asserted from this
512        testbed and can include the local username and porject that will be
513        asserted on their behalf by this fedd.  Each fedid is also added to the
514        authorization system with the "create" attribute.
515        """
516        self.accessdb = {}
517        # These are the regexps for parsing the db
518        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
519        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
520                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
521        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
522                "\s*->\s*(" + name_expr + ")\s*$")
523        lineno = 0
524
525        # Parse the mappings and store in self.authdb, a dict of
526        # fedid -> (proj, user)
527        try:
528            f = open(accessdb_file, "r")
529            for line in f:
530                lineno += 1
531                line = line.strip()
532                if len(line) == 0 or line.startswith('#'):
533                    continue
534                m = project_line.match(line)
535                if m:
536                    fid = fedid(hexstr=m.group(1))
537                    project, user = m.group(2,3)
538                    if not self.accessdb.has_key(fid):
539                        self.accessdb[fid] = []
540                    self.accessdb[fid].append((project, user))
541                    continue
542
543                m = user_line.match(line)
544                if m:
545                    fid = fedid(hexstr=m.group(1))
546                    project = None
547                    user = m.group(2)
548                    if not self.accessdb.has_key(fid):
549                        self.accessdb[fid] = []
550                    self.accessdb[fid].append((project, user))
551                    continue
552                self.log.warn("[experiment_control] Error parsing access " +\
553                        "db %s at line %d" %  (accessdb_file, lineno))
554        except EnvironmentError:
555            raise service_error(service_error.internal,
556                    ("Error opening/reading %s as experiment " +\
557                            "control accessdb") %  accessdb_file)
558        f.close()
559
560        # Initialize the authorization attributes
561        # XXX: legacy
562        if self.auth_type == 'legacy':
563            for fid in self.accessdb.keys():
564                self.auth.set_attribute(fid, 'create')
565                self.auth.set_attribute(fid, 'new')
566
567    def read_mapdb(self, file):
568        """
569        Read a simple colon separated list of mappings for the
570        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
571        """
572
573        self.tbmap = { }
574        lineno =0
575        try:
576            f = open(file, "r")
577            for line in f:
578                lineno += 1
579                line = line.strip()
580                if line.startswith('#') or len(line) == 0:
581                    continue
582                try:
583                    label, url = line.split(':', 1)
584                    self.tbmap[label] = url
585                except ValueError, e:
586                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
587                            "map db: %s %s" % (lineno, line, e))
588        except EnvironmentError, e:
589            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
590                    "open %s: %s" % (file, e))
591        f.close()
592
593    def read_store(self):
594        try:
595            self.synch_store = synch_store()
596            self.synch_store.load(self.store_filename)
597            self.log.debug("[read_store]: Read store from %s" % \
598                    self.store_filename)
599        except EnvironmentError, e:
600            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
601                    % (self.state_filename, e))
602            self.synch_store = synch_store()
603
604        # Set the initial permissions on data in the store.  XXX: This ad hoc
605        # authorization attribute initialization is getting out of hand.
606        # XXX: legacy
607        if self.auth_type == 'legacy':
608            for k in self.synch_store.all_keys():
609                try:
610                    if k.startswith('fedid:'):
611                        fid = fedid(hexstr=k[6:46])
612                        if self.state.has_key(fid):
613                            for a in self.get_alloc_ids(self.state[fid]):
614                                self.auth.set_attribute(a, k)
615                except ValueError, e:
616                    self.log.warn("Cannot deduce permissions for %s" % k)
617
618
619    def write_store(self):
620        """
621        Write a new copy of synch_store after writing current state
622        to a backup.  We use the internal synch_store pickle method to avoid
623        incinsistent data.
624
625        State format is a simple pickling of the store.
626        """
627        if os.access(self.store_filename, os.W_OK):
628            copy_file(self.store_filename, \
629                    "%s.bak" % self.store_filename)
630        try:
631            self.synch_store.save(self.store_filename)
632        except EnvironmentError, e:
633            self.log.error("Can't write file %s: %s" % \
634                    (self.store_filename, e))
635        except TypeError, e:
636            self.log.error("Pickling problem (TypeError): %s" % e)
637
638       
639    def generate_ssh_keys(self, dest, type="rsa" ):
640        """
641        Generate a set of keys for the gateways to use to talk.
642
643        Keys are of type type and are stored in the required dest file.
644        """
645        valid_types = ("rsa", "dsa")
646        t = type.lower();
647        if t not in valid_types: raise ValueError
648        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
649
650        try:
651            trace = open("/dev/null", "w")
652        except EnvironmentError:
653            raise service_error(service_error.internal,
654                    "Cannot open /dev/null??");
655
656        # May raise CalledProcessError
657        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
658        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
659        if rv != 0:
660            raise service_error(service_error.internal, 
661                    "Cannot generate nonce ssh keys.  %s return code %d" \
662                            % (self.ssh_keygen, rv))
663
664    def gentopo(self, str):
665        """
666        Generate the topology dtat structure from the splitter's XML
667        representation of it.
668
669        The topology XML looks like:
670            <experiment>
671                <nodes>
672                    <node><vname></vname><ips>ip1:ip2</ips></node>
673                </nodes>
674                <lans>
675                    <lan>
676                        <vname></vname><vnode></vnode><ip></ip>
677                        <bandwidth></bandwidth><member>node:port</member>
678                    </lan>
679                </lans>
680        """
681        class topo_parse:
682            """
683            Parse the topology XML and create the dats structure.
684            """
685            def __init__(self):
686                # Typing of the subelements for data conversion
687                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
688                self.int_subelements = ( 'bandwidth',)
689                self.float_subelements = ( 'delay',)
690                # The final data structure
691                self.nodes = [ ]
692                self.lans =  [ ]
693                self.topo = { \
694                        'node': self.nodes,\
695                        'lan' : self.lans,\
696                    }
697                self.element = { }  # Current element being created
698                self.chars = ""     # Last text seen
699
700            def end_element(self, name):
701                # After each sub element the contents is added to the current
702                # element or to the appropriate list.
703                if name == 'node':
704                    self.nodes.append(self.element)
705                    self.element = { }
706                elif name == 'lan':
707                    self.lans.append(self.element)
708                    self.element = { }
709                elif name in self.str_subelements:
710                    self.element[name] = self.chars
711                    self.chars = ""
712                elif name in self.int_subelements:
713                    self.element[name] = int(self.chars)
714                    self.chars = ""
715                elif name in self.float_subelements:
716                    self.element[name] = float(self.chars)
717                    self.chars = ""
718
719            def found_chars(self, data):
720                self.chars += data.rstrip()
721
722
723        tp = topo_parse();
724        parser = xml.parsers.expat.ParserCreate()
725        parser.EndElementHandler = tp.end_element
726        parser.CharacterDataHandler = tp.found_chars
727
728        parser.Parse(str)
729
730        return tp.topo
731       
732
733    def genviz(self, topo):
734        """
735        Generate the visualization the virtual topology
736        """
737
738        neato = "/usr/local/bin/neato"
739        # These are used to parse neato output and to create the visualization
740        # file.
741        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
742        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
743                "%s</type></node>"
744
745        try:
746            # Node names
747            nodes = [ n['vname'] for n in topo['node'] ]
748            topo_lans = topo['lan']
749        except KeyError, e:
750            raise service_error(service_error.internal, "Bad topology: %s" %e)
751
752        lans = { }
753        links = { }
754
755        # Walk through the virtual topology, organizing the connections into
756        # 2-node connections (links) and more-than-2-node connections (lans).
757        # When a lan is created, it's added to the list of nodes (there's a
758        # node in the visualization for the lan).
759        for l in topo_lans:
760            if links.has_key(l['vname']):
761                if len(links[l['vname']]) < 2:
762                    links[l['vname']].append(l['vnode'])
763                else:
764                    nodes.append(l['vname'])
765                    lans[l['vname']] = links[l['vname']]
766                    del links[l['vname']]
767                    lans[l['vname']].append(l['vnode'])
768            elif lans.has_key(l['vname']):
769                lans[l['vname']].append(l['vnode'])
770            else:
771                links[l['vname']] = [ l['vnode'] ]
772
773
774        # Open up a temporary file for dot to turn into a visualization
775        try:
776            df, dotname = tempfile.mkstemp()
777            dotfile = os.fdopen(df, 'w')
778        except EnvironmentError:
779            raise service_error(service_error.internal,
780                    "Failed to open file in genviz")
781
782        try:
783            dnull = open('/dev/null', 'w')
784        except EnvironmentError:
785            service_error(service_error.internal,
786                    "Failed to open /dev/null in genviz")
787
788        # Generate a dot/neato input file from the links, nodes and lans
789        try:
790            print >>dotfile, "graph G {"
791            for n in nodes:
792                print >>dotfile, '\t"%s"' % n
793            for l in links.keys():
794                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
795            for l in lans.keys():
796                for n in lans[l]:
797                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
798            print >>dotfile, "}"
799            dotfile.close()
800        except TypeError:
801            raise service_error(service_error.internal,
802                    "Single endpoint link in vtopo")
803        except EnvironmentError:
804            raise service_error(service_error.internal, "Cannot write dot file")
805
806        # Use dot to create a visualization
807        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
808                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
809                close_fds=True)
810        dnull.close()
811
812        # Translate dot to vis format
813        vis_nodes = [ ]
814        vis = { 'node': vis_nodes }
815        for line in dot.stdout:
816            m = vis_re.match(line)
817            if m:
818                vn = m.group(1)
819                vis_node = {'name': vn, \
820                        'x': float(m.group(2)),\
821                        'y' : float(m.group(3)),\
822                    }
823                if vn in links.keys() or vn in lans.keys():
824                    vis_node['type'] = 'lan'
825                else:
826                    vis_node['type'] = 'node'
827                vis_nodes.append(vis_node)
828        rv = dot.wait()
829
830        os.remove(dotname)
831        if rv == 0 : return vis
832        else: return None
833
834    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
835        """
836        Get access to testbed through fedd and set the parameters for that tb
837        """
838        def get_export_project(svcs):
839            """
840            Look through for the list of federated_service for this testbed
841            objects for a project_export service, and extract the project
842            parameter.
843            """
844
845            pe = [s for s in svcs if s.name=='project_export']
846            if len(pe) == 1:
847                return pe[0].params.get('project', None)
848            elif len(pe) == 0:
849                return None
850            else:
851                raise service_error(service_error.req,
852                        "More than one project export is not supported")
853
854        uri = tbmap.get(testbed_base(tb), None)
855        if not uri:
856            raise service_error(service_error.server_config, 
857                    "Unknown testbed: %s" % tb)
858
859        export_svcs = masters.get(tb,[])
860        import_svcs = [ s for m in masters.values() \
861                for s in m \
862                    if tb in s.importers ]
863
864        export_project = get_export_project(export_svcs)
865
866        # Tweak search order so that if there are entries in access_user that
867        # have a project matching the export project, we try them first
868        if export_project: 
869            access_sequence = [ (p, u) for p, u in access_user \
870                    if p == export_project] 
871            access_sequence.extend([(p, u) for p, u in access_user \
872                    if p != export_project]) 
873        else: 
874            access_sequence = access_user
875
876        for p, u in access_sequence: 
877            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
878                    "to %s") %  ((p or "None"), u, uri))
879
880            if p:
881                # Request with user and project specified
882                req = {\
883                        'credential': [ "project: %s" % p, "user: %s"  % u],
884                    }
885            else:
886                # Request with only user specified
887                req = {\
888                        'credential': [ 'user: %s' % u ],
889                    }
890
891            # Make the service request from the services we're importing and
892            # exporting.  Keep track of the export request ids so we can
893            # collect the resulting info from the access response.
894            e_keys = { }
895            if import_svcs or export_svcs:
896                req['service'] = [ ]
897
898                for i, s in enumerate(import_svcs):
899                    idx = 'import%d' % i
900                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
901                    if s.params:
902                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
903                                for k, v in s.params.items()]
904                    req['service'].append(sr)
905
906                for i, s in enumerate(export_svcs):
907                    idx = 'export%d' % i
908                    e_keys[idx] = s
909                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
910                    if s.params:
911                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
912                                for k, v in s.params.items()]
913                    req['service'].append(sr)
914
915            # node resources if any
916            if nodes != None and len(nodes) > 0:
917                rnodes = [ ]
918                for n in nodes:
919                    rn = { }
920                    image, hw, count = n.split(":")
921                    if image: rn['image'] = [ image ]
922                    if hw: rn['hardware'] = [ hw ]
923                    if count and int(count) >0 : rn['count'] = int(count)
924                    rnodes.append(rn)
925                req['resources']= { }
926                req['resources']['node'] = rnodes
927
928            try:
929                if self.local_access.has_key(uri):
930                    # Local access call
931                    req = { 'RequestAccessRequestBody' : req }
932                    r = self.local_access[uri].RequestAccess(req, 
933                            fedid(file=self.cert_file))
934                    r = { 'RequestAccessResponseBody' : r }
935                else:
936                    r = self.call_RequestAccess(uri, req, 
937                            self.cert_file, self.cert_pwd, self.trusted_certs)
938            except service_error, e:
939                if e.code == service_error.access:
940                    self.log.debug("[get_access] Access denied")
941                    r = None
942                    continue
943                else:
944                    raise e
945
946            if r.has_key('RequestAccessResponseBody'):
947                # Through to here we have a valid response, not a fault.
948                # Access denied is a fault, so something better or worse than
949                # access denied has happened.
950                r = r['RequestAccessResponseBody']
951                self.log.debug("[get_access] Access granted")
952                break
953            else:
954                raise service_error(service_error.protocol,
955                        "Bad proxy response")
956       
957        if not r:
958            raise service_error(service_error.access, 
959                    "Access denied by %s (%s)" % (tb, uri))
960
961        tbparam[tb] = { 
962                "allocID" : r['allocID'],
963                "uri": uri,
964                }
965
966        # Collect the responses corresponding to the services this testbed
967        # exports.  These will be the service requests that we will include in
968        # the start segment requests (with appropriate visibility values) to
969        # import and export the segments.
970        for s in r.get('service', []):
971            id = s.get('id', None)
972            if id and id in e_keys:
973                e_keys[id].reqs.append(s)
974
975        # Add attributes to parameter space.  We don't allow attributes to
976        # overlay any parameters already installed.
977        for a in r.get('fedAttr', []):
978            try:
979                if a['attribute'] and \
980                        isinstance(a['attribute'], basestring)\
981                        and not tbparam[tb].has_key(a['attribute'].lower()):
982                    tbparam[tb][a['attribute'].lower()] = a['value']
983            except KeyError:
984                self.log.error("Bad attribute in response: %s" % a)
985
986    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
987            cert_pwd=None):
988        """
989        Release access to testbed through fedd
990        """
991
992        if not uri and tbmap:
993            uri = tbmap.get(tb, None)
994        if not uri:
995            raise service_error(service_error.server_config, 
996                    "Unknown testbed: %s" % tb)
997
998        if self.local_access.has_key(uri):
999            resp = self.local_access[uri].ReleaseAccess(\
1000                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1001                    fedid(file=cert_file))
1002            resp = { 'ReleaseAccessResponseBody': resp } 
1003        else:
1004            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1005                    cert_file, cert_pwd, self.trusted_certs)
1006
1007        # better error coding
1008
1009    def remote_ns2topdl(self, uri, desc):
1010
1011        req = {
1012                'description' : { 'ns2description': desc },
1013            }
1014
1015        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1016                self.trusted_certs)
1017
1018        if r.has_key('Ns2TopdlResponseBody'):
1019            r = r['Ns2TopdlResponseBody']
1020            ed = r.get('experimentdescription', None)
1021            if ed.has_key('topdldescription'):
1022                return topdl.Topology(**ed['topdldescription'])
1023            else:
1024                raise service_error(service_error.protocol, 
1025                        "Bad splitter response (no output)")
1026        else:
1027            raise service_error(service_error.protocol, "Bad splitter response")
1028
1029    class start_segment:
1030        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1031                cert_pwd=None, trusted_certs=None, caller=None,
1032                log_collector=None):
1033            self.log = log
1034            self.debug = debug
1035            self.cert_file = cert_file
1036            self.cert_pwd = cert_pwd
1037            self.trusted_certs = None
1038            self.caller = caller
1039            self.testbed = testbed
1040            self.log_collector = log_collector
1041            self.response = None
1042            self.node = { }
1043
1044        def make_map(self, resp):
1045            for e in resp.get('embedding', []):
1046                if 'toponame' in e and 'physname' in e:
1047                    self.node[e['toponame']] = e['physname'][0]
1048
1049        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1050            req = {
1051                    'allocID': { 'fedid' : aid }, 
1052                    'segmentdescription': { 
1053                        'topdldescription': topo.to_dict(),
1054                    },
1055                }
1056
1057            if connInfo:
1058                req['connection'] = connInfo
1059
1060            import_svcs = [ s for m in masters.values() \
1061                    for s in m if self.testbed in s.importers]
1062
1063            if import_svcs or self.testbed in masters:
1064                req['service'] = []
1065
1066            for s in import_svcs:
1067                for r in s.reqs:
1068                    sr = copy.deepcopy(r)
1069                    sr['visibility'] = 'import';
1070                    req['service'].append(sr)
1071
1072            for s in masters.get(self.testbed, []):
1073                for r in s.reqs:
1074                    sr = copy.deepcopy(r)
1075                    sr['visibility'] = 'export';
1076                    req['service'].append(sr)
1077
1078            if attrs:
1079                req['fedAttr'] = attrs
1080
1081            try:
1082                self.log.debug("Calling StartSegment at %s " % uri)
1083                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1084                        self.trusted_certs)
1085                if r.has_key('StartSegmentResponseBody'):
1086                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1087                            None)
1088                    if lval and self.log_collector:
1089                        for line in  lval.splitlines(True):
1090                            self.log_collector.write(line)
1091                    self.make_map(r['StartSegmentResponseBody'])
1092                    self.response = r
1093                else:
1094                    raise service_error(service_error.internal, 
1095                            "Bad response!?: %s" %r)
1096                return True
1097            except service_error, e:
1098                self.log.error("Start segment failed on %s: %s" % \
1099                        (self.testbed, e))
1100                return False
1101
1102
1103
1104    class terminate_segment:
1105        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1106                cert_pwd=None, trusted_certs=None, caller=None):
1107            self.log = log
1108            self.debug = debug
1109            self.cert_file = cert_file
1110            self.cert_pwd = cert_pwd
1111            self.trusted_certs = None
1112            self.caller = caller
1113            self.testbed = testbed
1114
1115        def __call__(self, uri, aid ):
1116            req = {
1117                    'allocID': aid , 
1118                }
1119            try:
1120                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1121                        self.trusted_certs)
1122                return True
1123            except service_error, e:
1124                self.log.error("Terminate segment failed on %s: %s" % \
1125                        (self.testbed, e))
1126                return False
1127   
1128
1129    def allocate_resources(self, allocated, masters, eid, expid, 
1130            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1131            attrs=None, connInfo={}, tbmap=None, expcert=None):
1132
1133        started = { }           # Testbeds where a sub-experiment started
1134                                # successfully
1135
1136        # XXX
1137        fail_soft = False
1138
1139        if tbmap is None: tbmap = { }
1140
1141        log = alloc_log or self.log
1142
1143        thread_pool = self.thread_pool(self.nthreads)
1144        threads = [ ]
1145        starters = [ ]
1146
1147        if expcert:
1148            cert = expcert
1149            pw = None
1150        else:
1151            cert = self.cert_file
1152            pw = self.cert_pwd
1153
1154        for tb in allocated.keys():
1155            # Create and start a thread to start the segment, and save it
1156            # to get the return value later
1157            tb_attrs = copy.copy(attrs)
1158            thread_pool.wait_for_slot()
1159            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1160            base, suffix = split_testbed(tb)
1161            if suffix:
1162                tb_attrs.append({'attribute': 'experiment_name', 
1163                    'value': "%s-%s" % (eid, suffix)})
1164            else:
1165                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1166            if not uri:
1167                raise service_error(service_error.internal, 
1168                        "Unknown testbed %s !?" % tb)
1169
1170            if tbparams[tb].has_key('allocID') and \
1171                    tbparams[tb]['allocID'].has_key('fedid'):
1172                aid = tbparams[tb]['allocID']['fedid']
1173            else:
1174                raise service_error(service_error.internal, 
1175                        "No alloc id for testbed %s !?" % tb)
1176
1177            s = self.start_segment(log=log, debug=self.debug,
1178                    testbed=tb, cert_file=cert,
1179                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1180                    caller=self.call_StartSegment,
1181                    log_collector=log_collector)
1182            starters.append(s)
1183            t  = self.pooled_thread(\
1184                    target=s, name=tb,
1185                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1186                    pdata=thread_pool, trace_file=self.trace_file)
1187            threads.append(t)
1188            t.start()
1189
1190        # Wait until all finish (keep pinging the log, though)
1191        mins = 0
1192        revoked = False
1193        while not thread_pool.wait_for_all_done(60.0):
1194            mins += 1
1195            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1196                    % mins)
1197            if not revoked and \
1198                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1199                # a testbed has failed.  Revoke this experiment's
1200                # synchronizarion values so that sub experiments will not
1201                # deadlock waiting for synchronization that will never happen
1202                self.log.info("A subexperiment has failed to swap in, " + \
1203                        "revoking synch keys")
1204                var_key = "fedid:%s" % expid
1205                for k in self.synch_store.all_keys():
1206                    if len(k) > 45 and k[0:46] == var_key:
1207                        self.synch_store.revoke_key(k)
1208                revoked = True
1209
1210        failed = [ t.getName() for t in threads if not t.rv ]
1211        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1212
1213        # If one failed clean up, unless fail_soft is set
1214        if failed:
1215            if not fail_soft:
1216                thread_pool.clear()
1217                for tb in succeeded:
1218                    # Create and start a thread to stop the segment
1219                    thread_pool.wait_for_slot()
1220                    uri = tbparams[tb]['uri']
1221                    t  = self.pooled_thread(\
1222                            target=self.terminate_segment(log=log,
1223                                testbed=tb,
1224                                cert_file=self.cert_file, 
1225                                cert_pwd=self.cert_pwd,
1226                                trusted_certs=self.trusted_certs,
1227                                caller=self.call_TerminateSegment),
1228                            args=(uri, tbparams[tb]['federant']['allocID']),
1229                            name=tb,
1230                            pdata=thread_pool, trace_file=self.trace_file)
1231                    t.start()
1232                # Wait until all finish (if any are being stopped)
1233                if succeeded:
1234                    thread_pool.wait_for_all_done()
1235
1236                # release the allocations
1237                for tb in tbparams.keys():
1238                    try:
1239                        self.release_access(tb, tbparams[tb]['allocID'], 
1240                                tbmap=tbmap, uri=tbparams[tb].get('uri', None))
1241                    except service_error, e:
1242                        self.log.warn("Error releasing access: %s" % e.desc)
1243                # Remove the placeholder
1244                self.state_lock.acquire()
1245                self.state[eid]['experimentStatus'] = 'failed'
1246                if self.state_filename: self.write_state()
1247                self.state_lock.release()
1248                # Remove the repo dir
1249                self.remove_dirs("%s/%s" %(self.repodir, expid))
1250                # Walk up tmpdir, deleting as we go
1251                if self.cleanup:
1252                    self.remove_dirs(tmpdir)
1253                else:
1254                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1255
1256
1257                log.error("Swap in failed on %s" % ",".join(failed))
1258                return
1259        else:
1260            # Walk through the successes and gather the virtual to physical
1261            # mapping.
1262            embedding = [ ]
1263            for s in starters:
1264                for k, v in s.node.items():
1265                    embedding.append({
1266                        'toponame': k, 
1267                        'physname': [ v],
1268                        'testbed': s.testbed
1269                        })
1270            log.info("[start_segment]: Experiment %s active" % eid)
1271
1272
1273        # Walk up tmpdir, deleting as we go
1274        if self.cleanup:
1275            self.remove_dirs(tmpdir)
1276        else:
1277            log.debug("[start_experiment]: not removing %s" % tmpdir)
1278
1279        # Insert the experiment into our state and update the disk copy.
1280        self.state_lock.acquire()
1281        self.state[expid]['experimentStatus'] = 'active'
1282        self.state[eid] = self.state[expid]
1283        self.state[eid]['experimentdescription']['topdldescription'] = \
1284                top.to_dict()
1285        self.state[eid]['embedding'] = embedding
1286        if self.state_filename: self.write_state()
1287        self.state_lock.release()
1288        return
1289
1290
1291    def add_kit(self, e, kit):
1292        """
1293        Add a Software object created from the list of (install, location)
1294        tuples passed as kit  to the software attribute of an object e.  We
1295        do this enough to break out the code, but it's kind of a hack to
1296        avoid changing the old tuple rep.
1297        """
1298
1299        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1300
1301        if isinstance(e.software, list): e.software.extend(s)
1302        else: e.software = s
1303
1304
1305    def create_experiment_state(self, fid, req, expid, expcert,
1306            state='starting'):
1307        """
1308        Create the initial entry in the experiment's state.  The expid and
1309        expcert are the experiment's fedid and certifacte that represents that
1310        ID, which are installed in the experiment state.  If the request
1311        includes a suggested local name that is used if possible.  If the local
1312        name is already taken by an experiment owned by this user that has
1313        failed, it is overwritten.  Otherwise new letters are added until a
1314        valid localname is found.  The generated local name is returned.
1315        """
1316
1317        if req.has_key('experimentID') and \
1318                req['experimentID'].has_key('localname'):
1319            overwrite = False
1320            eid = req['experimentID']['localname']
1321            # If there's an old failed experiment here with the same local name
1322            # and accessible by this user, we'll overwrite it, otherwise we'll
1323            # fall through and do the collision avoidance.
1324            old_expid = self.get_experiment_fedid(eid)
1325            if old_expid and self.check_experiment_access(fid, old_expid):
1326                self.state_lock.acquire()
1327                status = self.state[eid].get('experimentStatus', None)
1328                if status and status == 'failed':
1329                    # remove the old access attribute
1330                    self.auth.unset_attribute(fid, old_expid)
1331                    self.auth.save()
1332                    overwrite = True
1333                    del self.state[eid]
1334                    del self.state[old_expid]
1335                self.state_lock.release()
1336            self.state_lock.acquire()
1337            while (self.state.has_key(eid) and not overwrite):
1338                eid += random.choice(string.ascii_letters)
1339            # Initial state
1340            self.state[eid] = {
1341                    'experimentID' : \
1342                            [ { 'localname' : eid }, {'fedid': expid } ],
1343                    'experimentStatus': state,
1344                    'experimentAccess': { 'X509' : expcert },
1345                    'owner': fid,
1346                    'log' : [],
1347                }
1348            self.state[expid] = self.state[eid]
1349            if self.state_filename: self.write_state()
1350            self.state_lock.release()
1351        else:
1352            eid = self.exp_stem
1353            for i in range(0,5):
1354                eid += random.choice(string.ascii_letters)
1355            self.state_lock.acquire()
1356            while (self.state.has_key(eid)):
1357                eid = self.exp_stem
1358                for i in range(0,5):
1359                    eid += random.choice(string.ascii_letters)
1360            # Initial state
1361            self.state[eid] = {
1362                    'experimentID' : \
1363                            [ { 'localname' : eid }, {'fedid': expid } ],
1364                    'experimentStatus': state,
1365                    'experimentAccess': { 'X509' : expcert },
1366                    'owner': fid,
1367                    'log' : [],
1368                }
1369            self.state[expid] = self.state[eid]
1370            if self.state_filename: self.write_state()
1371            self.state_lock.release()
1372
1373        return eid
1374
1375
1376    def allocate_ips_to_topo(self, top):
1377        """
1378        Add an ip4_address attribute to all the hosts in the topology, based on
1379        the shared substrates on which they sit.  An /etc/hosts file is also
1380        created and returned as a list of hostfiles entries.  We also return
1381        the allocator, because we may need to allocate IPs to portals
1382        (specifically DRAGON portals).
1383        """
1384        subs = sorted(top.substrates, 
1385                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1386                reverse=True)
1387        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1388        ifs = { }
1389        hosts = [ ]
1390
1391        for idx, s in enumerate(subs):
1392            net_size = len(s.interfaces)+2
1393
1394            a = ips.allocate(net_size)
1395            if a :
1396                base, num = a
1397                if num < net_size: 
1398                    raise service_error(service_error.internal,
1399                            "Allocator returned wrong number of IPs??")
1400            else:
1401                raise service_error(service_error.req, 
1402                        "Cannot allocate IP addresses")
1403            mask = ips.min_alloc
1404            while mask < net_size:
1405                mask *= 2
1406
1407            netmask = ((2**32-1) ^ (mask-1))
1408
1409            base += 1
1410            for i in s.interfaces:
1411                i.attribute.append(
1412                        topdl.Attribute('ip4_address', 
1413                            "%s" % ip_addr(base)))
1414                i.attribute.append(
1415                        topdl.Attribute('ip4_netmask', 
1416                            "%s" % ip_addr(int(netmask))))
1417
1418                hname = i.element.name
1419                if ifs.has_key(hname):
1420                    hosts.append("%s\t%s-%s %s-%d" % \
1421                            (ip_addr(base), hname, s.name, hname,
1422                                ifs[hname]))
1423                else:
1424                    ifs[hname] = 0
1425                    hosts.append("%s\t%s-%s %s-%d %s" % \
1426                            (ip_addr(base), hname, s.name, hname,
1427                                ifs[hname], hname))
1428
1429                ifs[hname] += 1
1430                base += 1
1431        return hosts, ips
1432
1433    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1434            tbparams, masters, tbmap):
1435        """
1436        Request access to the various testbeds required for this instantiation
1437        (passed in as testbeds).  User, access_user, expoert_project and master
1438        are used to construct the correct requests.  Per-testbed parameters are
1439        returned in tbparams.
1440        """
1441        for tb in testbeds:
1442            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1443            allocated[tb] = 1
1444
1445    def get_abac_access_to_testbeds(self, testbeds, fid, allocated, 
1446            tbparam, masters, tbmap, expid=None, expcert=None):
1447        for tb in testbeds:
1448            self.get_abac_access(tb, tbparam, fid, masters, tbmap, expid,
1449                    expcert)
1450            allocated[tb] = 1
1451
1452    def get_abac_access(self, tb, tbparam,fid, masters, tbmap, expid=None, expcert=None):
1453        """
1454        Get access to testbed through fedd and set the parameters for that tb
1455        """
1456        def get_export_project(svcs):
1457            """
1458            Look through for the list of federated_service for this testbed
1459            objects for a project_export service, and extract the project
1460            parameter.
1461            """
1462
1463            pe = [s for s in svcs if s.name=='project_export']
1464            if len(pe) == 1:
1465                return pe[0].params.get('project', None)
1466            elif len(pe) == 0:
1467                return None
1468            else:
1469                raise service_error(service_error.req,
1470                        "More than one project export is not supported")
1471
1472        uri = tbmap.get(testbed_base(tb), None)
1473        if not uri:
1474            raise service_error(service_error.server_config, 
1475                    "Unknown testbed: %s" % tb)
1476
1477        export_svcs = masters.get(tb,[])
1478        import_svcs = [ s for m in masters.values() \
1479                for s in m \
1480                    if tb in s.importers ]
1481
1482        export_project = get_export_project(export_svcs)
1483        # Compose the credential list so that IDs come before attributes
1484        creds = set()
1485        keys = set()
1486        certs = self.auth.get_creds_for_principal(fid)
1487        if expid:
1488            certs.update(self.auth.get_creds_for_principal(expid))
1489        for c in certs:
1490            keys.add(c.issuer_cert())
1491            creds.add(c.attribute_cert())
1492        creds = list(keys) + list(creds)
1493
1494        if expcert: cert, pw = expcert, None
1495        else: cert, pw = self.cert_file, self.cert_pw
1496
1497        # Request credentials
1498        req = {
1499                'abac_credential': creds,
1500            }
1501        # Make the service request from the services we're importing and
1502        # exporting.  Keep track of the export request ids so we can
1503        # collect the resulting info from the access response.
1504        e_keys = { }
1505        if import_svcs or export_svcs:
1506            req['service'] = [ ]
1507
1508            for i, s in enumerate(import_svcs):
1509                idx = 'import%d' % i
1510                sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
1511                if s.params:
1512                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1513                            for k, v in s.params.items()]
1514                req['service'].append(sr)
1515
1516            for i, s in enumerate(export_svcs):
1517                idx = 'export%d' % i
1518                e_keys[idx] = s
1519                sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
1520                if s.params:
1521                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
1522                            for k, v in s.params.items()]
1523                req['service'].append(sr)
1524
1525
1526        if self.local_access.has_key(uri):
1527            # Local access call
1528            req = { 'RequestAccessRequestBody' : req }
1529            r = self.local_access[uri].RequestAccess(req, 
1530                    fedid(file=self.cert_file))
1531            r = { 'RequestAccessResponseBody' : r }
1532        else:
1533            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1534
1535        if r.has_key('RequestAccessResponseBody'):
1536            # Through to here we have a valid response, not a fault.
1537            # Access denied is a fault, so something better or worse than
1538            # access denied has happened.
1539            r = r['RequestAccessResponseBody']
1540            self.log.debug("[get_access] Access granted")
1541        else:
1542            raise service_error(service_error.protocol,
1543                        "Bad proxy response")
1544       
1545        tbparam[tb] = { 
1546                "allocID" : r['allocID'],
1547                "uri": uri,
1548                }
1549
1550        # Collect the responses corresponding to the services this testbed
1551        # exports.  These will be the service requests that we will include in
1552        # the start segment requests (with appropriate visibility values) to
1553        # import and export the segments.
1554        for s in r.get('service', []):
1555            id = s.get('id', None)
1556            if id and id in e_keys:
1557                e_keys[id].reqs.append(s)
1558
1559        # Add attributes to parameter space.  We don't allow attributes to
1560        # overlay any parameters already installed.
1561        for a in r.get('fedAttr', []):
1562            try:
1563                if a['attribute'] and \
1564                        isinstance(a['attribute'], basestring)\
1565                        and not tbparam[tb].has_key(a['attribute'].lower()):
1566                    tbparam[tb][a['attribute'].lower()] = a['value']
1567            except KeyError:
1568                self.log.error("Bad attribute in response: %s" % a)
1569
1570
1571    def split_topology(self, top, topo, testbeds):
1572        """
1573        Create the sub-topologies that are needed for experiment instantiation.
1574        """
1575        for tb in testbeds:
1576            topo[tb] = top.clone()
1577            # copy in for loop allows deletions from the original
1578            for e in [ e for e in topo[tb].elements]:
1579                etb = e.get_attribute('testbed')
1580                # NB: elements without a testbed attribute won't appear in any
1581                # sub topologies. 
1582                if not etb or etb != tb:
1583                    for i in e.interface:
1584                        for s in i.subs:
1585                            try:
1586                                s.interfaces.remove(i)
1587                            except ValueError:
1588                                raise service_error(service_error.internal,
1589                                        "Can't remove interface??")
1590                    topo[tb].elements.remove(e)
1591            topo[tb].make_indices()
1592
1593    def wrangle_software(self, expid, top, topo, tbparams):
1594        """
1595        Copy software out to the repository directory, allocate permissions and
1596        rewrite the segment topologies to look for the software in local
1597        places.
1598        """
1599
1600        # Copy the rpms and tarfiles to a distribution directory from
1601        # which the federants can retrieve them
1602        linkpath = "%s/software" %  expid
1603        softdir ="%s/%s" % ( self.repodir, linkpath)
1604        softmap = { }
1605        # These are in a list of tuples format (each kit).  This comprehension
1606        # unwraps them into a single list of tuples that initilaizes the set of
1607        # tuples.
1608        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1609                for p, t in l ])
1610        pkgs.update([x.location for e in top.elements \
1611                for x in e.software])
1612        try:
1613            os.makedirs(softdir)
1614        except EnvironmentError, e:
1615            raise service_error(
1616                    "Cannot create software directory: %s" % e)
1617        # The actual copying.  Everything's converted into a url for copying.
1618        for pkg in pkgs:
1619            loc = pkg
1620
1621            scheme, host, path = urlparse(loc)[0:3]
1622            dest = os.path.basename(path)
1623            if not scheme:
1624                if not loc.startswith('/'):
1625                    loc = "/%s" % loc
1626                loc = "file://%s" %loc
1627            try:
1628                u = urlopen(loc)
1629            except Exception, e:
1630                raise service_error(service_error.req, 
1631                        "Cannot open %s: %s" % (loc, e))
1632            try:
1633                f = open("%s/%s" % (softdir, dest) , "w")
1634                self.log.debug("Writing %s/%s" % (softdir,dest) )
1635                data = u.read(4096)
1636                while data:
1637                    f.write(data)
1638                    data = u.read(4096)
1639                f.close()
1640                u.close()
1641            except Exception, e:
1642                raise service_error(service_error.internal,
1643                        "Could not copy %s: %s" % (loc, e))
1644            path = re.sub("/tmp", "", linkpath)
1645            # XXX
1646            softmap[pkg] = \
1647                    "%s/%s/%s" %\
1648                    ( self.repo_url, path, dest)
1649
1650            # Allow the individual segments to access the software.
1651            for tb in tbparams.keys():
1652                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1653                        "/%s/%s" % ( path, dest))
1654            self.auth.save()
1655
1656        # Convert the software locations in the segments into the local
1657        # copies on this host
1658        for soft in [ s for tb in topo.values() \
1659                for e in tb.elements \
1660                    if getattr(e, 'software', False) \
1661                        for s in e.software ]:
1662            if softmap.has_key(soft.location):
1663                soft.location = softmap[soft.location]
1664
1665
1666    def new_experiment(self, req, fid):
1667        """
1668        The external interface to empty initial experiment creation called from
1669        the dispatcher.
1670
1671        Creates a working directory, splits the incoming description using the
1672        splitter script and parses out the avrious subsections using the
1673        lcasses above.  Once each sub-experiment is created, use pooled threads
1674        to instantiate them and start it all up.
1675        """
1676        req = req.get('NewRequestBody', None)
1677        if not req:
1678            raise service_error(service_error.req,
1679                    "Bad request format (no NewRequestBody)")
1680
1681        if self.auth.import_credentials(data_list=req.get('credential', [])):
1682            self.auth.save()
1683
1684        if not self.auth.check_attribute(fid, 'new'):
1685            raise service_error(service_error.access, "New access denied")
1686
1687        try:
1688            tmpdir = tempfile.mkdtemp(prefix="split-")
1689        except EnvironmentError:
1690            raise service_error(service_error.internal, "Cannot create tmp dir")
1691
1692        try:
1693            access_user = self.accessdb[fid]
1694        except KeyError:
1695            raise service_error(service_error.internal,
1696                    "Access map and authorizer out of sync in " + \
1697                            "new_experiment for fedid %s"  % fid)
1698
1699        pid = "dummy"
1700        gid = "dummy"
1701
1702        # Generate an ID for the experiment (slice) and a certificate that the
1703        # allocator can use to prove they own it.  We'll ship it back through
1704        # the encrypted connection.  If the requester supplied one, use it.
1705        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1706            expcert = req['experimentAccess']['X509']
1707            expid = fedid(certstr=expcert)
1708            self.state_lock.acquire()
1709            if expid in self.state:
1710                self.state_lock.release()
1711                raise service_error(service_error.req, 
1712                        'fedid %s identifies an existing experiment' % expid)
1713            self.state_lock.release()
1714        else:
1715            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1716
1717        #now we're done with the tmpdir, and it should be empty
1718        if self.cleanup:
1719            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1720            os.rmdir(tmpdir)
1721        else:
1722            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1723
1724        eid = self.create_experiment_state(fid, req, expid, expcert, 
1725                state='empty')
1726
1727        # Let users touch the state
1728        self.auth.set_attribute(fid, expid)
1729        self.auth.set_attribute(expid, expid)
1730        # Override fedids can manipulate state as well
1731        for o in self.overrides:
1732            self.auth.set_attribute(o, expid)
1733        self.auth.save()
1734
1735        rv = {
1736                'experimentID': [
1737                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1738                ],
1739                'experimentStatus': 'empty',
1740                'experimentAccess': { 'X509' : expcert }
1741            }
1742
1743        return rv
1744
1745    def create_experiment(self, req, fid):
1746        """
1747        The external interface to experiment creation called from the
1748        dispatcher.
1749
1750        Creates a working directory, splits the incoming description using the
1751        splitter script and parses out the various subsections using the
1752        classes above.  Once each sub-experiment is created, use pooled threads
1753        to instantiate them and start it all up.
1754        """
1755
1756        req = req.get('CreateRequestBody', None)
1757        if not req:
1758            raise service_error(service_error.req,
1759                    "Bad request format (no CreateRequestBody)")
1760
1761        # Get the experiment access
1762        exp = req.get('experimentID', None)
1763        if exp:
1764            if exp.has_key('fedid'):
1765                key = exp['fedid']
1766                expid = key
1767                eid = None
1768            elif exp.has_key('localname'):
1769                key = exp['localname']
1770                eid = key
1771                expid = None
1772            else:
1773                raise service_error(service_error.req, "Unknown lookup type")
1774        else:
1775            raise service_error(service_error.req, "No request?")
1776
1777        # Import information from the requester
1778        if self.auth.import_credentials(data_list=req.get('credential', [])):
1779            self.auth.save()
1780
1781        self.check_experiment_access(fid, key)
1782
1783        # Install the testbed map entries supplied with the request into a copy
1784        # of the testbed map.
1785        tbmap = dict(self.tbmap)
1786        for m in req.get('testbedmap', []):
1787            if 'testbed' in m and 'uri' in m:
1788                tbmap[m['testbed']] = m['uri']
1789
1790        try:
1791            tmpdir = tempfile.mkdtemp(prefix="split-")
1792            os.mkdir(tmpdir+"/keys")
1793        except EnvironmentError:
1794            raise service_error(service_error.internal, "Cannot create tmp dir")
1795
1796        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1797        gw_secretkey_base = "fed.%s" % self.ssh_type
1798        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1799        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1800        tclfile = tmpdir + "/experiment.tcl"
1801        tbparams = { }
1802        try:
1803            access_user = self.accessdb[fid]
1804        except KeyError:
1805            raise service_error(service_error.internal,
1806                    "Access map and authorizer out of sync in " + \
1807                            "create_experiment for fedid %s"  % fid)
1808
1809        pid = "dummy"
1810        gid = "dummy"
1811
1812        # The tcl parser needs to read a file so put the content into that file
1813        descr=req.get('experimentdescription', None)
1814        if descr:
1815            file_content=descr.get('ns2description', None)
1816            if file_content:
1817                try:
1818                    f = open(tclfile, 'w')
1819                    f.write(file_content)
1820                    f.close()
1821                except EnvironmentError:
1822                    raise service_error(service_error.internal,
1823                            "Cannot write temp experiment description")
1824            else:
1825                raise service_error(service_error.req, 
1826                        "Only ns2descriptions supported")
1827        else:
1828            raise service_error(service_error.req, "No experiment description")
1829
1830        self.state_lock.acquire()
1831        if self.state.has_key(key):
1832            self.state[key]['experimentStatus'] = "starting"
1833            for e in self.state[key].get('experimentID',[]):
1834                if not expid and e.has_key('fedid'):
1835                    expid = e['fedid']
1836                elif not eid and e.has_key('localname'):
1837                    eid = e['localname']
1838            if 'experimentAccess' in self.state[key] and \
1839                    'X509' in self.state[key]['experimentAccess']:
1840                expcert = self.state[key]['experimentAccess']['X509']
1841            else:
1842                expcert = None
1843        self.state_lock.release()
1844
1845        if not (eid and expid):
1846            raise service_error(service_error.internal, 
1847                    "Cannot find local experiment info!?")
1848
1849        # make a protected copy of the access certificate so the experiment
1850        # controller can act as the experiment principal.
1851        if expcert and self.auth_type != 'legacy':
1852            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1853            if not expcert_file:
1854                raise service_error(service_error.internal, 
1855                        "Cannot create temp cert file?")
1856        else:
1857            expcert_file = None
1858
1859        try: 
1860            # This catches exceptions to clear the placeholder if necessary
1861            try:
1862                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1863            except ValueError:
1864                raise service_error(service_error.server_config, 
1865                        "Bad key type (%s)" % self.ssh_type)
1866
1867            # Copy the service request
1868            tb_services = [ s for s in req.get('service',[]) ]
1869            # Translate to topdl
1870            if self.splitter_url:
1871                self.log.debug("Calling remote topdl translator at %s" % \
1872                        self.splitter_url)
1873                top = self.remote_ns2topdl(self.splitter_url, file_content)
1874            else:
1875                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1876                    str(self.muxmax), '-m', 'dummy']
1877
1878                tclcmd.extend([pid, gid, eid, tclfile])
1879
1880                self.log.debug("running local splitter %s", " ".join(tclcmd))
1881                # This is just fantastic.  As a side effect the parser copies
1882                # tb_compat.tcl into the current directory, so that directory
1883                # must be writable by the fedd user.  Doing this in the
1884                # temporary subdir ensures this is the case.
1885                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1886                        cwd=tmpdir)
1887                split_data = tclparser.stdout
1888
1889                top = topdl.topology_from_xml(file=split_data, top="experiment")
1890
1891            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1892            # Find the testbeds to look up
1893            testbeds = set([ a.value for e in top.elements \
1894                    for a in e.attribute \
1895                        if a.attribute == 'testbed'])
1896
1897            tb_hosts = { }
1898            for tb in testbeds:
1899                tb_hosts[tb] = [ e.name for e in top.elements \
1900                        if isinstance(e, topdl.Computer) and \
1901                            e.get_attribute('testbed') and \
1902                            e.get_attribute('testbed') == tb]
1903
1904            masters = { }           # testbeds exporting services
1905            pmasters = { }          # Testbeds exporting services that
1906                                    # need portals
1907            for s in tb_services:
1908                # If this is a service request with the importall field
1909                # set, fill it out.
1910
1911                if s.get('importall', False):
1912                    s['import'] = [ tb for tb in testbeds \
1913                            if tb not in s.get('export',[])]
1914                    del s['importall']
1915
1916                # Add the service to masters
1917                for tb in s.get('export', []):
1918                    if s.get('name', None):
1919                        if tb not in masters:
1920                            masters[tb] = [ ]
1921
1922                        params = { }
1923                        if 'fedAttr' in s:
1924                            for a in s['fedAttr']:
1925                                params[a.get('attribute', '')] = \
1926                                        a.get('value','')
1927
1928                        fser = federated_service(name=s['name'],
1929                                exporter=tb, importers=s.get('import',[]),
1930                                params=params)
1931                        if fser.name == 'hide_hosts' \
1932                                and 'hosts' not in fser.params:
1933                            fser.params['hosts'] = \
1934                                    ",".join(tb_hosts.get(fser.exporter, []))
1935                        masters[tb].append(fser)
1936
1937                        if fser.portal:
1938                            if tb not in pmasters: pmasters[tb] = [ fser ]
1939                            else: pmasters[tb].append(fser)
1940                    else:
1941                        self.log.error('Testbed service does not have name " + \
1942                                "and importers')
1943
1944
1945            allocated = { }         # Testbeds we can access
1946            topo ={ }               # Sub topologies
1947            connInfo = { }          # Connection information
1948
1949            if self.auth_type == 'legacy':
1950                self.get_access_to_testbeds(testbeds, access_user, allocated, 
1951                        tbparams, masters, tbmap)
1952            elif self.auth_type == 'abac':
1953                self.get_abac_access_to_testbeds(testbeds, fid, allocated, 
1954                        tbparams, masters, tbmap, expid, expcert_file)
1955            else:
1956                raise service_error(service_error.internal, 
1957                        "Unknown auth_type %s" % self.auth_type)
1958
1959            self.split_topology(top, topo, testbeds)
1960
1961            # Copy configuration files into the remote file store
1962            # The config urlpath
1963            configpath = "/%s/config" % expid
1964            # The config file system location
1965            configdir ="%s%s" % ( self.repodir, configpath)
1966            try:
1967                os.makedirs(configdir)
1968            except EnvironmentError, e:
1969                raise service_error(service_error.internal,
1970                        "Cannot create config directory: %s" % e)
1971            try:
1972                f = open("%s/hosts" % configdir, "w")
1973                f.write('\n'.join(hosts))
1974                f.close()
1975            except EnvironmentError, e:
1976                raise service_error(service_error.internal, 
1977                        "Cannot write hosts file: %s" % e)
1978            try:
1979                copy_file("%s" % gw_pubkey, "%s/%s" % \
1980                        (configdir, gw_pubkey_base))
1981                copy_file("%s" % gw_secretkey, "%s/%s" % \
1982                        (configdir, gw_secretkey_base))
1983            except EnvironmentError, e:
1984                raise service_error(service_error.internal, 
1985                        "Cannot copy keyfiles: %s" % e)
1986
1987            # Allow the individual testbeds to access the configuration files.
1988            for tb in tbparams.keys():
1989                asignee = tbparams[tb]['allocID']['fedid']
1990                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1991                    self.auth.set_attribute(asignee, "%s/%s" % \
1992                            (configpath, f))
1993
1994            part = experiment_partition(self.auth, self.store_url, tbmap,
1995                    self.muxmax, self.direct_transit)
1996            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1997                    connInfo, expid)
1998            # Now get access to the dynamic testbeds (those added above)
1999            for tb in [ t for t in topo if t not in allocated]:
2000                #XXX: ABAC
2001                if self.auth_type =='legacy':
2002                    self.get_access(tb, None, tbparams, access_user, 
2003                            masters, tbmap)
2004                elif self.auth_type == 'abac':
2005                    self.get_abac_access(tb, tbparams, fid, masters, tbmap, 
2006                            expid, expcert_file)
2007                else:
2008                    raise service_error(service_error.internal, 
2009                            "Unknown auth_type %s" % self.auth_type)
2010                allocated[tb] = 1
2011                store_keys = topo[tb].get_attribute('store_keys')
2012                # Give the testbed access to keys it exports or imports
2013                if store_keys:
2014                    for sk in store_keys.split(" "):
2015                        self.auth.set_attribute(\
2016                                tbparams[tb]['allocID']['fedid'], sk)
2017            self.auth.save()
2018
2019            self.wrangle_software(expid, top, topo, tbparams)
2020
2021            vtopo = topdl.topology_to_vtopo(top)
2022            vis = self.genviz(vtopo)
2023
2024            # save federant information
2025            for k in allocated.keys():
2026                tbparams[k]['federant'] = {
2027                        'name': [ { 'localname' : eid} ],
2028                        'allocID' : tbparams[k]['allocID'],
2029                        'uri': tbparams[k]['uri'],
2030                    }
2031
2032            self.state_lock.acquire()
2033            self.state[eid]['vtopo'] = vtopo
2034            self.state[eid]['vis'] = vis
2035            self.state[eid]['experimentdescription'] = \
2036                    { 'topdldescription': top.to_dict() }
2037            self.state[eid]['federant'] = \
2038                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2039                        if tbparams[tb].has_key('federant') ]
2040            if self.state_filename: 
2041                self.write_state()
2042            self.state_lock.release()
2043        except service_error, e:
2044            # If something goes wrong in the parse (usually an access error)
2045            # clear the placeholder state.  From here on out the code delays
2046            # exceptions.  Failing at this point returns a fault to the remote
2047            # caller.
2048
2049            self.state_lock.acquire()
2050            del self.state[eid]
2051            del self.state[expid]
2052            if self.state_filename: self.write_state()
2053            self.state_lock.release()
2054            if tmpdir and self.cleanup:
2055                self.remove_dirs(tmpdir)
2056            raise e
2057
2058
2059        # Start the background swapper and return the starting state.  From
2060        # here on out, the state will stick around a while.
2061
2062        # Let users touch the state
2063        self.auth.set_attribute(fid, expid)
2064        self.auth.set_attribute(expid, expid)
2065        # Override fedids can manipulate state as well
2066        for o in self.overrides:
2067            self.auth.set_attribute(o, expid)
2068        self.auth.save()
2069
2070        # Create a logger that logs to the experiment's state object as well as
2071        # to the main log file.
2072        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2073        alloc_collector = self.list_log(self.state[eid]['log'])
2074        h = logging.StreamHandler(alloc_collector)
2075        # XXX: there should be a global one of these rather than repeating the
2076        # code.
2077        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2078                    '%d %b %y %H:%M:%S'))
2079        alloc_log.addHandler(h)
2080       
2081        attrs = [ 
2082                {
2083                    'attribute': 'ssh_pubkey', 
2084                    'value': '%s/%s/config/%s' % \
2085                            (self.repo_url, expid, gw_pubkey_base)
2086                },
2087                {
2088                    'attribute': 'ssh_secretkey', 
2089                    'value': '%s/%s/config/%s' % \
2090                            (self.repo_url, expid, gw_secretkey_base)
2091                },
2092                {
2093                    'attribute': 'hosts', 
2094                    'value': '%s/%s/config/hosts' % \
2095                            (self.repo_url, expid)
2096                },
2097            ]
2098
2099        # transit and disconnected testbeds may not have a connInfo entry.
2100        # Fill in the blanks.
2101        for t in allocated.keys():
2102            if not connInfo.has_key(t):
2103                connInfo[t] = { }
2104
2105        # Start a thread to do the resource allocation
2106        t  = Thread(target=self.allocate_resources,
2107                args=(allocated, masters, eid, expid, tbparams, 
2108                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2109                    connInfo, tbmap, expcert_file),
2110                name=eid)
2111        t.start()
2112
2113        rv = {
2114                'experimentID': [
2115                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2116                ],
2117                'experimentStatus': 'starting',
2118            }
2119
2120        return rv
2121   
2122    def get_experiment_fedid(self, key):
2123        """
2124        find the fedid associated with the localname key in the state database.
2125        """
2126
2127        rv = None
2128        self.state_lock.acquire()
2129        if self.state.has_key(key):
2130            if isinstance(self.state[key], dict):
2131                try:
2132                    kl = [ f['fedid'] for f in \
2133                            self.state[key]['experimentID']\
2134                                if f.has_key('fedid') ]
2135                except KeyError:
2136                    self.state_lock.release()
2137                    raise service_error(service_error.internal, 
2138                            "No fedid for experiment %s when getting "+\
2139                                    "fedid(!?)" % key)
2140                if len(kl) == 1:
2141                    rv = kl[0]
2142                else:
2143                    self.state_lock.release()
2144                    raise service_error(service_error.internal, 
2145                            "multiple fedids for experiment %s when " +\
2146                                    "getting fedid(!?)" % key)
2147            else:
2148                self.state_lock.release()
2149                raise service_error(service_error.internal, 
2150                        "Unexpected state for %s" % key)
2151        self.state_lock.release()
2152        return rv
2153
2154    def check_experiment_access(self, fid, key):
2155        """
2156        Confirm that the fid has access to the experiment.  Though a request
2157        may be made in terms of a local name, the access attribute is always
2158        the experiment's fedid.
2159        """
2160        if not isinstance(key, fedid):
2161            key = self.get_experiment_fedid(key)
2162
2163        if self.auth.check_attribute(fid, key):
2164            return True
2165        else:
2166            raise service_error(service_error.access, "Access Denied")
2167
2168
2169    def get_handler(self, path, fid):
2170        self.log.info("Get handler %s %s" % (path, fid))
2171        if self.auth.check_attribute(fid, path):
2172            return ("%s/%s" % (self.repodir, path), "application/binary")
2173        else:
2174            return (None, None)
2175
2176    def get_vtopo(self, req, fid):
2177        """
2178        Return the stored virtual topology for this experiment
2179        """
2180        rv = None
2181        state = None
2182
2183        req = req.get('VtopoRequestBody', None)
2184        if not req:
2185            raise service_error(service_error.req,
2186                    "Bad request format (no VtopoRequestBody)")
2187        exp = req.get('experiment', None)
2188        if exp:
2189            if exp.has_key('fedid'):
2190                key = exp['fedid']
2191                keytype = "fedid"
2192            elif exp.has_key('localname'):
2193                key = exp['localname']
2194                keytype = "localname"
2195            else:
2196                raise service_error(service_error.req, "Unknown lookup type")
2197        else:
2198            raise service_error(service_error.req, "No request?")
2199
2200        self.check_experiment_access(fid, key)
2201
2202        self.state_lock.acquire()
2203        if self.state.has_key(key):
2204            if self.state[key].has_key('vtopo'):
2205                rv = { 'experiment' : {keytype: key },\
2206                        'vtopo': self.state[key]['vtopo'],\
2207                    }
2208            else:
2209                state = self.state[key]['experimentStatus']
2210        self.state_lock.release()
2211
2212        if rv: return rv
2213        else: 
2214            if state:
2215                raise service_error(service_error.partial, 
2216                        "Not ready: %s" % state)
2217            else:
2218                raise service_error(service_error.req, "No such experiment")
2219
2220    def get_vis(self, req, fid):
2221        """
2222        Return the stored visualization for this experiment
2223        """
2224        rv = None
2225        state = None
2226
2227        req = req.get('VisRequestBody', None)
2228        if not req:
2229            raise service_error(service_error.req,
2230                    "Bad request format (no VisRequestBody)")
2231        exp = req.get('experiment', None)
2232        if exp:
2233            if exp.has_key('fedid'):
2234                key = exp['fedid']
2235                keytype = "fedid"
2236            elif exp.has_key('localname'):
2237                key = exp['localname']
2238                keytype = "localname"
2239            else:
2240                raise service_error(service_error.req, "Unknown lookup type")
2241        else:
2242            raise service_error(service_error.req, "No request?")
2243
2244        self.check_experiment_access(fid, key)
2245
2246        self.state_lock.acquire()
2247        if self.state.has_key(key):
2248            if self.state[key].has_key('vis'):
2249                rv =  { 'experiment' : {keytype: key },\
2250                        'vis': self.state[key]['vis'],\
2251                        }
2252            else:
2253                state = self.state[key]['experimentStatus']
2254        self.state_lock.release()
2255
2256        if rv: return rv
2257        else:
2258            if state:
2259                raise service_error(service_error.partial, 
2260                        "Not ready: %s" % state)
2261            else:
2262                raise service_error(service_error.req, "No such experiment")
2263
2264    def clean_info_response(self, rv):
2265        """
2266        Remove the information in the experiment's state object that is not in
2267        the info response.
2268        """
2269        # Remove the owner info (should always be there, but...)
2270        if rv.has_key('owner'): del rv['owner']
2271
2272        # Convert the log into the allocationLog parameter and remove the
2273        # log entry (with defensive programming)
2274        if rv.has_key('log'):
2275            rv['allocationLog'] = "".join(rv['log'])
2276            del rv['log']
2277        else:
2278            rv['allocationLog'] = ""
2279
2280        if rv['experimentStatus'] != 'active':
2281            if rv.has_key('federant'): del rv['federant']
2282        else:
2283            # remove the allocationID and uri info from each federant
2284            for f in rv.get('federant', []):
2285                if f.has_key('allocID'): del f['allocID']
2286                if f.has_key('uri'): del f['uri']
2287
2288        return rv
2289
2290    def get_info(self, req, fid):
2291        """
2292        Return all the stored info about this experiment
2293        """
2294        rv = None
2295
2296        req = req.get('InfoRequestBody', None)
2297        if not req:
2298            raise service_error(service_error.req,
2299                    "Bad request format (no InfoRequestBody)")
2300        exp = req.get('experiment', None)
2301        if exp:
2302            if exp.has_key('fedid'):
2303                key = exp['fedid']
2304                keytype = "fedid"
2305            elif exp.has_key('localname'):
2306                key = exp['localname']
2307                keytype = "localname"
2308            else:
2309                raise service_error(service_error.req, "Unknown lookup type")
2310        else:
2311            raise service_error(service_error.req, "No request?")
2312
2313        self.check_experiment_access(fid, key)
2314
2315        # The state may be massaged by the service function that called
2316        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2317        # state.
2318        self.state_lock.acquire()
2319        if self.state.has_key(key):
2320            rv = copy.deepcopy(self.state[key])
2321        self.state_lock.release()
2322
2323        if rv:
2324            return self.clean_info_response(rv)
2325        else:
2326            raise service_error(service_error.req, "No such experiment")
2327
2328    def get_multi_info(self, req, fid):
2329        """
2330        Return all the stored info that this fedid can access
2331        """
2332        rv = { 'info': [ ] }
2333
2334        self.state_lock.acquire()
2335        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2336            try:
2337                self.check_experiment_access(fid, key)
2338            except service_error, e:
2339                if e.code == service_error.access:
2340                    continue
2341                else:
2342                    self.state_lock.release()
2343                    raise e
2344
2345            if self.state.has_key(key):
2346                e = copy.deepcopy(self.state[key])
2347                e = self.clean_info_response(e)
2348                rv['info'].append(e)
2349        self.state_lock.release()
2350        return rv
2351
2352    def remove_dirs(self, dir):
2353        """
2354        Remove the directory tree and all files rooted at dir.  Log any errors,
2355        but continue.
2356        """
2357        self.log.debug("[removedirs]: removing %s" % dir)
2358        try:
2359            for path, dirs, files in os.walk(dir, topdown=False):
2360                for f in files:
2361                    os.remove(os.path.join(path, f))
2362                for d in dirs:
2363                    os.rmdir(os.path.join(path, d))
2364            os.rmdir(dir)
2365        except EnvironmentError, e:
2366            self.log.error("Error deleting directory tree in %s" % e);
2367
2368    @staticmethod
2369    def make_temp_certfile(expcert, tmpdir):
2370        """
2371        make a protected copy of the access certificate so the experiment
2372        controller can act as the experiment principal.  mkstemp is the most
2373        secure way to do that. The directory should be created by
2374        mkdtemp.  Return the filename.
2375        """
2376        if expcert and tmpdir:
2377            try:
2378                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
2379                f = os.fdopen(certf, 'w')
2380                print >> f, expcert
2381                f.close()
2382            except EnvironmentError, e:
2383                raise service_error(service_error.internal, 
2384                        "Cannot create temp cert file?")
2385            return certfn
2386        else:
2387            return None
2388
2389    def terminate_experiment(self, req, fid):
2390        """
2391        Swap this experiment out on the federants and delete the shared
2392        information
2393        """
2394        tbparams = { }
2395        req = req.get('TerminateRequestBody', None)
2396        if not req:
2397            raise service_error(service_error.req,
2398                    "Bad request format (no TerminateRequestBody)")
2399        force = req.get('force', False)
2400        exp = req.get('experiment', None)
2401        if exp:
2402            if exp.has_key('fedid'):
2403                key = exp['fedid']
2404                keytype = "fedid"
2405            elif exp.has_key('localname'):
2406                key = exp['localname']
2407                keytype = "localname"
2408            else:
2409                raise service_error(service_error.req, "Unknown lookup type")
2410        else:
2411            raise service_error(service_error.req, "No request?")
2412
2413        self.check_experiment_access(fid, key)
2414
2415        dealloc_list = [ ]
2416
2417
2418        # Create a logger that logs to the dealloc_list as well as to the main
2419        # log file.
2420        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2421        h = logging.StreamHandler(self.list_log(dealloc_list))
2422        # XXX: there should be a global one of these rather than repeating the
2423        # code.
2424        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2425                    '%d %b %y %H:%M:%S'))
2426        dealloc_log.addHandler(h)
2427
2428        self.state_lock.acquire()
2429        fed_exp = self.state.get(key, None)
2430        repo = None
2431
2432        if fed_exp:
2433            # This branch of the conditional holds the lock to generate a
2434            # consistent temporary tbparams variable to deallocate experiments.
2435            # It releases the lock to do the deallocations and reacquires it to
2436            # remove the experiment state when the termination is complete.
2437
2438            # First make sure that the experiment creation is complete.
2439            status = fed_exp.get('experimentStatus', None)
2440
2441            if status:
2442                if status in ('starting', 'terminating'):
2443                    if not force:
2444                        self.state_lock.release()
2445                        raise service_error(service_error.partial, 
2446                                'Experiment still being created or destroyed')
2447                    else:
2448                        self.log.warning('Experiment in %s state ' % status + \
2449                                'being terminated by force.')
2450            else:
2451                # No status??? trouble
2452                self.state_lock.release()
2453                raise service_error(service_error.internal,
2454                        "Experiment has no status!?")
2455
2456            ids = []
2457            #  experimentID is a list of dicts that are self-describing
2458            #  identifiers.  This finds all the fedids and localnames - the
2459            #  keys of self.state - and puts them into ids.
2460            for id in fed_exp.get('experimentID', []):
2461                if id.has_key('fedid'): 
2462                    ids.append(id['fedid'])
2463                    repo = "%s" % id['fedid']
2464                if id.has_key('localname'): ids.append(id['localname'])
2465
2466            # Get the experimentAccess - the principal for this experiment.  It
2467            # is this principal to which credentials have been delegated, and
2468            # as which the experiment controller must act.
2469            if 'experimentAccess' in self.state[key] and \
2470                    'X509' in self.state[key]['experimentAccess']:
2471                expcert = self.state[key]['experimentAccess']['X509']
2472            else:
2473                expcert = None
2474            # Collect the allocation/segment ids into a dict keyed by the fedid
2475            # of the allocation (or a monotonically increasing integer) that
2476            # contains a tuple of uri, aid (which is a dict...)
2477            for i, fed in enumerate(fed_exp.get('federant', [])):
2478                try:
2479                    uri = fed['uri']
2480                    aid = fed['allocID']
2481                    k = fed['allocID'].get('fedid', i)
2482                except KeyError, e:
2483                    continue
2484                tbparams[k] = (uri, aid)
2485            fed_exp['experimentStatus'] = 'terminating'
2486            if self.state_filename: self.write_state()
2487            self.state_lock.release()
2488
2489            try:
2490                tmpdir = tempfile.mkdtemp(prefix="split-")
2491            except EnvironmentError:
2492                raise service_error(service_error.internal, 
2493                        "Cannot create tmp dir")
2494            # This try block makes sure the tempdir is cleared
2495            try:
2496                # If no expcert, try the deallocation as the experiment
2497                # controller instance.
2498                if expcert and self.auth_type != 'legacy': 
2499                    cert_file = self.make_temp_certfile(expcert, tmpdir)
2500                    pw = None
2501                else: 
2502                    cert_file = self.cert_file
2503                    pw = self.cert_pwd
2504
2505                # Stop everyone.  NB, wait_for_all waits until a thread starts
2506                # and then completes, so we can't wait if nothing starts.  So,
2507                # no tbparams, no start.
2508                if len(tbparams) > 0:
2509                    thread_pool = self.thread_pool(self.nthreads)
2510                    for k in tbparams.keys():
2511                        # Create and start a thread to stop the segment
2512                        thread_pool.wait_for_slot()
2513                        uri, aid = tbparams[k]
2514                        t  = self.pooled_thread(\
2515                                target=self.terminate_segment(log=dealloc_log,
2516                                    testbed=uri,
2517                                    cert_file=cert_file, 
2518                                    cert_pwd=pw,
2519                                    trusted_certs=self.trusted_certs,
2520                                    caller=self.call_TerminateSegment),
2521                                args=(uri, aid), name=k,
2522                                pdata=thread_pool, trace_file=self.trace_file)
2523                        t.start()
2524                    # Wait for completions
2525                    thread_pool.wait_for_all_done()
2526
2527                # release the allocations (failed experiments have done this
2528                # already, and starting experiments may be in odd states, so we
2529                # ignore errors releasing those allocations
2530                try: 
2531                    for k in tbparams.keys():
2532                        # This releases access by uri
2533                        uri, aid = tbparams[k]
2534                        self.release_access(None, aid, uri=uri,
2535                                cert_file=cert_file, cert_pwd=pw)
2536                except service_error, e:
2537                    if status != 'failed' and not force:
2538                        raise e
2539
2540            # Clean up the tmpdir no matter what
2541            finally:
2542                self.remove_dirs(tmpdir)
2543
2544            # Remove the terminated experiment
2545            self.state_lock.acquire()
2546            for id in ids:
2547                if self.state.has_key(id): del self.state[id]
2548
2549            if self.state_filename: self.write_state()
2550            self.state_lock.release()
2551
2552            # Delete any synch points associated with this experiment.  All
2553            # synch points begin with the fedid of the experiment.
2554            fedid_keys = set(["fedid:%s" % f for f in ids \
2555                    if isinstance(f, fedid)])
2556            for k in self.synch_store.all_keys():
2557                try:
2558                    if len(k) > 45 and k[0:46] in fedid_keys:
2559                        self.synch_store.del_value(k)
2560                except synch_store.BadDeletionError:
2561                    pass
2562            self.write_store()
2563
2564            # Remove software and other cached stuff from the filesystem.
2565            if repo:
2566                self.remove_dirs("%s/%s" % (self.repodir, repo))
2567               
2568            return { 
2569                    'experiment': exp , 
2570                    'deallocationLog': "".join(dealloc_list),
2571                    }
2572        else:
2573            # Don't forget to release the lock
2574            self.state_lock.release()
2575            raise service_error(service_error.req, "No saved state")
2576
2577
2578    def GetValue(self, req, fid):
2579        """
2580        Get a value from the synchronized store
2581        """
2582        req = req.get('GetValueRequestBody', None)
2583        if not req:
2584            raise service_error(service_error.req,
2585                    "Bad request format (no GetValueRequestBody)")
2586       
2587        name = req['name']
2588        wait = req['wait']
2589        rv = { 'name': name }
2590
2591        if self.auth.check_attribute(fid, name):
2592            self.log.debug("[GetValue] asking for %s " % name)
2593            try:
2594                v = self.synch_store.get_value(name, wait)
2595            except synch_store.RevokedKeyError:
2596                # No more synch on this key
2597                raise service_error(service_error.federant, 
2598                        "Synch key %s revoked" % name)
2599            if v is not None:
2600                rv['value'] = v
2601            self.log.debug("[GetValue] got %s from %s" % (v, name))
2602            return rv
2603        else:
2604            raise service_error(service_error.access, "Access Denied")
2605       
2606
2607    def SetValue(self, req, fid):
2608        """
2609        Set a value in the synchronized store
2610        """
2611        req = req.get('SetValueRequestBody', None)
2612        if not req:
2613            raise service_error(service_error.req,
2614                    "Bad request format (no SetValueRequestBody)")
2615       
2616        name = req['name']
2617        v = req['value']
2618
2619        if self.auth.check_attribute(fid, name):
2620            try:
2621                self.synch_store.set_value(name, v)
2622                self.write_store()
2623                self.log.debug("[SetValue] set %s to %s" % (name, v))
2624            except synch_store.CollisionError:
2625                # Translate into a service_error
2626                raise service_error(service_error.req,
2627                        "Value already set: %s" %name)
2628            except synch_store.RevokedKeyError:
2629                # No more synch on this key
2630                raise service_error(service_error.federant, 
2631                        "Synch key %s revoked" % name)
2632            return { 'name': name, 'value': v }
2633        else:
2634            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.