source: fedd/federation/experiment_control.py @ c573278

axis_examplecompt_changesinfo-ops
Last change on this file since c573278 was c573278, checked in by Ted Faber <faber@…>, 13 years ago

Checkpoint. Still lots to do

  • Property mode set to 100644
File size: 88.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from authorizer import abac_authorizer
34
35import topdl
36import list_log
37from ip_allocator import ip_allocator
38from ip_addr import ip_addr
39
40
41class nullHandler(logging.Handler):
42    def emit(self, record): pass
43
44fl = logging.getLogger("fedd.experiment_control")
45fl.addHandler(nullHandler())
46
47
48# Right now, no support for composition.
49class federated_service:
50    def __init__(self, name, exporter=None, importers=None, params=None, 
51            reqs=None, portal=None):
52        self.name=name
53        self.exporter=exporter
54        if importers is None: self.importers = []
55        else: self.importers=importers
56        if params is None: self.params = { }
57        else: self.params = params
58        if reqs is None: self.reqs = []
59        else: self.reqs = reqs
60
61        if portal is not None:
62            self.portal = portal
63        else:
64            self.portal = (name in federated_service.needs_portal)
65
66    def __str__(self):
67        return "name %s export %s import %s params %s reqs %s" % \
68                (self.name, self.exporter, self.importers, self.params,
69                        [ (r['name'], r['visibility']) for r in self.reqs] )
70
71    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
72
73class experiment_control_local:
74    """
75    Control of experiments that this system can directly access.
76
77    Includes experiment creation, termination and information dissemination.
78    Thred safe.
79    """
80
81    class ssh_cmd_timeout(RuntimeError): pass
82   
83    class thread_pool:
84        """
85        A class to keep track of a set of threads all invoked for the same
86        task.  Manages the mutual exclusion of the states.
87        """
88        def __init__(self, nthreads):
89            """
90            Start a pool.
91            """
92            self.changed = Condition()
93            self.started = 0
94            self.terminated = 0
95            self.nthreads = nthreads
96
97        def acquire(self):
98            """
99            Get the pool's lock.
100            """
101            self.changed.acquire()
102
103        def release(self):
104            """
105            Release the pool's lock.
106            """
107            self.changed.release()
108
109        def wait(self, timeout = None):
110            """
111            Wait for a pool thread to start or stop.
112            """
113            self.changed.wait(timeout)
114
115        def start(self):
116            """
117            Called by a pool thread to report starting.
118            """
119            self.changed.acquire()
120            self.started += 1
121            self.changed.notifyAll()
122            self.changed.release()
123
124        def terminate(self):
125            """
126            Called by a pool thread to report finishing.
127            """
128            self.changed.acquire()
129            self.terminated += 1
130            self.changed.notifyAll()
131            self.changed.release()
132
133        def clear(self):
134            """
135            Clear all pool data.
136            """
137            self.changed.acquire()
138            self.started = 0
139            self.terminated =0
140            self.changed.notifyAll()
141            self.changed.release()
142
143        def wait_for_slot(self):
144            """
145            Wait until we have a free slot to start another pooled thread
146            """
147            self.acquire()
148            while self.started - self.terminated >= self.nthreads:
149                self.wait()
150            self.release()
151
152        def wait_for_all_done(self, timeout=None):
153            """
154            Wait until all active threads finish (and at least one has
155            started).  If a timeout is given, return after waiting that long
156            for termination.  If all threads are done (and one has started in
157            the since the last clear()) return True, otherwise False.
158            """
159            if timeout:
160                deadline = time.time() + timeout
161            self.acquire()
162            while self.started == 0 or self.started > self.terminated:
163                self.wait(timeout)
164                if timeout:
165                    if time.time() > deadline:
166                        break
167                    timeout = deadline - time.time()
168            self.release()
169            return not (self.started == 0 or self.started > self.terminated)
170
171    class pooled_thread(Thread):
172        """
173        One of a set of threads dedicated to a specific task.  Uses the
174        thread_pool class above for coordination.
175        """
176        def __init__(self, group=None, target=None, name=None, args=(), 
177                kwargs={}, pdata=None, trace_file=None):
178            Thread.__init__(self, group, target, name, args, kwargs)
179            self.rv = None          # Return value of the ops in this thread
180            self.exception = None   # Exception that terminated this thread
181            self.target=target      # Target function to run on start()
182            self.args = args        # Args to pass to target
183            self.kwargs = kwargs    # Additional kw args
184            self.pdata = pdata      # thread_pool for this class
185            # Logger for this thread
186            self.log = logging.getLogger("fedd.experiment_control")
187       
188        def run(self):
189            """
190            Emulate Thread.run, except add pool data manipulation and error
191            logging.
192            """
193            if self.pdata:
194                self.pdata.start()
195
196            if self.target:
197                try:
198                    self.rv = self.target(*self.args, **self.kwargs)
199                except service_error, s:
200                    self.exception = s
201                    self.log.error("Thread exception: %s %s" % \
202                            (s.code_string(), s.desc))
203                except:
204                    self.exception = sys.exc_info()[1]
205                    self.log.error(("Unexpected thread exception: %s" +\
206                            "Trace %s") % (self.exception,\
207                                traceback.format_exc()))
208            if self.pdata:
209                self.pdata.terminate()
210
211    call_RequestAccess = service_caller('RequestAccess')
212    call_ReleaseAccess = service_caller('ReleaseAccess')
213    call_StartSegment = service_caller('StartSegment')
214    call_TerminateSegment = service_caller('TerminateSegment')
215    call_Ns2Topdl = service_caller('Ns2Topdl')
216
217    def __init__(self, config=None, auth=None):
218        """
219        Intialize the various attributes, most from the config object
220        """
221
222        def parse_tarfile_list(tf):
223            """
224            Parse a tarfile list from the configuration.  This is a set of
225            paths and tarfiles separated by spaces.
226            """
227            rv = [ ]
228            if tf is not None:
229                tl = tf.split()
230                while len(tl) > 1:
231                    p, t = tl[0:2]
232                    del tl[0:2]
233                    rv.append((p, t))
234            return rv
235
236        self.thread_with_rv = experiment_control_local.pooled_thread
237        self.thread_pool = experiment_control_local.thread_pool
238        self.list_log = list_log.list_log
239
240        self.cert_file = config.get("experiment_control", "cert_file")
241        if self.cert_file:
242            self.cert_pwd = config.get("experiment_control", "cert_pwd")
243        else:
244            self.cert_file = config.get("globals", "cert_file")
245            self.cert_pwd = config.get("globals", "cert_pwd")
246
247        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
248                or config.get("globals", "trusted_certs")
249
250        self.repodir = config.get("experiment_control", "repodir")
251        self.repo_url = config.get("experiment_control", "repo_url", 
252                "https://users.isi.deterlab.net:23235");
253
254        self.exp_stem = "fed-stem"
255        self.log = logging.getLogger("fedd.experiment_control")
256        set_log_level(config, "experiment_control", self.log)
257        self.muxmax = 2
258        self.nthreads = 10
259        self.randomize_experiments = False
260
261        self.splitter = None
262        self.ssh_keygen = "/usr/bin/ssh-keygen"
263        self.ssh_identity_file = None
264
265
266        self.debug = config.getboolean("experiment_control", "create_debug")
267        self.cleanup = not config.getboolean("experiment_control", 
268                "leave_tmpfiles")
269        self.state_filename = config.get("experiment_control", 
270                "experiment_state")
271        self.store_filename = config.get("experiment_control", 
272                "synch_store")
273        self.store_url = config.get("experiment_control", "store_url")
274        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
275        self.fedkit = parse_tarfile_list(\
276                config.get("experiment_control", "fedkit"))
277        self.gatewaykit = parse_tarfile_list(\
278                config.get("experiment_control", "gatewaykit"))
279        accessdb_file = config.get("experiment_control", "accessdb")
280
281        self.ssh_pubkey_file = config.get("experiment_control", 
282                "ssh_pubkey_file")
283        self.ssh_privkey_file = config.get("experiment_control",
284                "ssh_privkey_file")
285        dt = config.get("experiment_control", "direct_transit")
286        self.auth_type = config.get('experiment_control', 'auth_type') \
287                or 'legacy'
288        self.auth_dir = config.get('experiment_control', 'auth_dir')
289        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
290        else: self.direct_transit = [ ]
291        # NB for internal master/slave ops, not experiment setup
292        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
293
294        self.overrides = set([])
295        ovr = config.get('experiment_control', 'overrides')
296        if ovr:
297            for o in ovr.split(","):
298                o = o.strip()
299                if o.startswith('fedid:'): o = o[len('fedid:'):]
300                self.overrides.add(fedid(hexstr=o))
301
302        self.state = { }
303        self.state_lock = Lock()
304        self.tclsh = "/usr/local/bin/otclsh"
305        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
306                config.get("experiment_control", "tcl_splitter",
307                        "/usr/testbed/lib/ns2ir/parse.tcl")
308        mapdb_file = config.get("experiment_control", "mapdb")
309        self.trace_file = sys.stderr
310
311        self.def_expstart = \
312                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
313                "/tmp/federate";
314        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
315                "FEDDIR/hosts";
316        self.def_gwstart = \
317                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
318                "/tmp/bridge.log";
319        self.def_mgwstart = \
320                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
321                "/tmp/bridge.log";
322        self.def_gwimage = "FBSD61-TUNNEL2";
323        self.def_gwtype = "pc";
324        self.local_access = { }
325
326        if self.auth_type == 'legacy':
327            if auth:
328                self.auth = auth
329            else:
330                self.log.error( "[access]: No authorizer initialized, " +\
331                        "creating local one.")
332                auth = authorizer()
333        elif self.auth_type == 'abac':
334            self.auth = abac_authorizer(load=self.auth_dir)
335        else:
336            raise service_error(service_error.internal, 
337                    "Unknown auth_type: %s" % self.auth_type)
338
339
340        if self.ssh_pubkey_file:
341            try:
342                f = open(self.ssh_pubkey_file, 'r')
343                self.ssh_pubkey = f.read()
344                f.close()
345            except EnvironmentError:
346                raise service_error(service_error.internal,
347                        "Cannot read sshpubkey")
348        else:
349            raise service_error(service_error.internal, 
350                    "No SSH public key file?")
351
352        if not self.ssh_privkey_file:
353            raise service_error(service_error.internal, 
354                    "No SSH public key file?")
355
356
357        if mapdb_file:
358            self.read_mapdb(mapdb_file)
359        else:
360            self.log.warn("[experiment_control] No testbed map, using defaults")
361            self.tbmap = { 
362                    'deter':'https://users.isi.deterlab.net:23235',
363                    'emulab':'https://users.isi.deterlab.net:23236',
364                    'ucb':'https://users.isi.deterlab.net:23237',
365                    }
366
367        if accessdb_file:
368                self.read_accessdb(accessdb_file)
369        else:
370            raise service_error(service_error.internal,
371                    "No accessdb specified in config")
372
373        # Grab saved state.  OK to do this w/o locking because it's read only
374        # and only one thread should be in existence that can see self.state at
375        # this point.
376        if self.state_filename:
377            self.read_state()
378
379        if self.store_filename:
380            self.read_store()
381        else:
382            self.log.warning("No saved synch store")
383            self.synch_store = synch_store
384
385        # Dispatch tables
386        self.soap_services = {\
387                'New': soap_handler('New', self.new_experiment),
388                'Create': soap_handler('Create', self.create_experiment),
389                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
390                'Vis': soap_handler('Vis', self.get_vis),
391                'Info': soap_handler('Info', self.get_info),
392                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
393                'Terminate': soap_handler('Terminate', 
394                    self.terminate_experiment),
395                'GetValue': soap_handler('GetValue', self.GetValue),
396                'SetValue': soap_handler('SetValue', self.SetValue),
397        }
398
399        self.xmlrpc_services = {\
400                'New': xmlrpc_handler('New', self.new_experiment),
401                'Create': xmlrpc_handler('Create', self.create_experiment),
402                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
403                'Vis': xmlrpc_handler('Vis', self.get_vis),
404                'Info': xmlrpc_handler('Info', self.get_info),
405                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
406                'Terminate': xmlrpc_handler('Terminate',
407                    self.terminate_experiment),
408                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
409                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
410        }
411
412    # Call while holding self.state_lock
413    def write_state(self):
414        """
415        Write a new copy of experiment state after copying the existing state
416        to a backup.
417
418        State format is a simple pickling of the state dictionary.
419        """
420        if os.access(self.state_filename, os.W_OK):
421            copy_file(self.state_filename, \
422                    "%s.bak" % self.state_filename)
423        try:
424            f = open(self.state_filename, 'w')
425            pickle.dump(self.state, f)
426        except EnvironmentError, e:
427            self.log.error("Can't write file %s: %s" % \
428                    (self.state_filename, e))
429        except pickle.PicklingError, e:
430            self.log.error("Pickling problem: %s" % e)
431        except TypeError, e:
432            self.log.error("Pickling problem (TypeError): %s" % e)
433
434    @staticmethod
435    def get_alloc_ids(state):
436        """
437        Pull the fedids of the identifiers of each allocation from the
438        state.  Again, a dict dive that's best isolated.
439
440        Used by read_store and read state
441        """
442
443        return [ f['allocID']['fedid'] 
444                for f in state.get('federant',[]) \
445                    if f.has_key('allocID') and \
446                        f['allocID'].has_key('fedid')]
447
448    # Call while holding self.state_lock
449    def read_state(self):
450        """
451        Read a new copy of experiment state.  Old state is overwritten.
452
453        State format is a simple pickling of the state dictionary.
454        """
455       
456        def get_experiment_id(state):
457            """
458            Pull the fedid experimentID out of the saved state.  This is kind
459            of a gross walk through the dict.
460            """
461
462            if state.has_key('experimentID'):
463                for e in state['experimentID']:
464                    if e.has_key('fedid'):
465                        return e['fedid']
466                else:
467                    return None
468            else:
469                return None
470
471        try:
472            f = open(self.state_filename, "r")
473            self.state = pickle.load(f)
474            self.log.debug("[read_state]: Read state from %s" % \
475                    self.state_filename)
476        except EnvironmentError, e:
477            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
478                    % (self.state_filename, e))
479        except pickle.UnpicklingError, e:
480            self.log.warning(("[read_state]: No saved state: " + \
481                    "Unpickling failed: %s") % e)
482       
483        for s in self.state.values():
484            try:
485
486                eid = get_experiment_id(s)
487                if eid : 
488                    if self.auth_type == 'legacy':
489                        # XXX: legacy
490                        # Give the owner rights to the experiment
491                        self.auth.set_attribute(s['owner'], eid)
492                        # And holders of the eid as well
493                        self.auth.set_attribute(eid, eid)
494                        # allow overrides to control experiments as well
495                        for o in self.overrides:
496                            self.auth.set_attribute(o, eid)
497                        # Set permissions to allow reading of the software
498                        # repo, if any, as well.
499                        for a in self.get_alloc_ids(s):
500                            self.auth.set_attribute(a, 'repo/%s' % eid)
501                else:
502                    raise KeyError("No experiment id")
503            except KeyError, e:
504                self.log.warning("[read_state]: State ownership or identity " +\
505                        "misformatted in %s: %s" % (self.state_filename, e))
506
507
508    def read_accessdb(self, accessdb_file):
509        """
510        Read the mapping from fedids that can create experiments to their name
511        in the 3-level access namespace.  All will be asserted from this
512        testbed and can include the local username and porject that will be
513        asserted on their behalf by this fedd.  Each fedid is also added to the
514        authorization system with the "create" attribute.
515        """
516        self.accessdb = {}
517        # These are the regexps for parsing the db
518        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
519        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
520                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
521        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
522                "\s*->\s*(" + name_expr + ")\s*$")
523        lineno = 0
524
525        # Parse the mappings and store in self.authdb, a dict of
526        # fedid -> (proj, user)
527        try:
528            f = open(accessdb_file, "r")
529            for line in f:
530                lineno += 1
531                line = line.strip()
532                if len(line) == 0 or line.startswith('#'):
533                    continue
534                m = project_line.match(line)
535                if m:
536                    fid = fedid(hexstr=m.group(1))
537                    project, user = m.group(2,3)
538                    if not self.accessdb.has_key(fid):
539                        self.accessdb[fid] = []
540                    self.accessdb[fid].append((project, user))
541                    continue
542
543                m = user_line.match(line)
544                if m:
545                    fid = fedid(hexstr=m.group(1))
546                    project = None
547                    user = m.group(2)
548                    if not self.accessdb.has_key(fid):
549                        self.accessdb[fid] = []
550                    self.accessdb[fid].append((project, user))
551                    continue
552                self.log.warn("[experiment_control] Error parsing access " +\
553                        "db %s at line %d" %  (accessdb_file, lineno))
554        except EnvironmentError:
555            raise service_error(service_error.internal,
556                    ("Error opening/reading %s as experiment " +\
557                            "control accessdb") %  accessdb_file)
558        f.close()
559
560        # Initialize the authorization attributes
561        # XXX: legacy
562        if self.auth_type == 'legacy':
563            for fid in self.accessdb.keys():
564                self.auth.set_attribute(fid, 'create')
565                self.auth.set_attribute(fid, 'new')
566
567    def read_mapdb(self, file):
568        """
569        Read a simple colon separated list of mappings for the
570        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
571        """
572
573        self.tbmap = { }
574        lineno =0
575        try:
576            f = open(file, "r")
577            for line in f:
578                lineno += 1
579                line = line.strip()
580                if line.startswith('#') or len(line) == 0:
581                    continue
582                try:
583                    label, url = line.split(':', 1)
584                    self.tbmap[label] = url
585                except ValueError, e:
586                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
587                            "map db: %s %s" % (lineno, line, e))
588        except EnvironmentError, e:
589            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
590                    "open %s: %s" % (file, e))
591        f.close()
592
593    def read_store(self):
594        try:
595            self.synch_store = synch_store()
596            self.synch_store.load(self.store_filename)
597            self.log.debug("[read_store]: Read store from %s" % \
598                    self.store_filename)
599        except EnvironmentError, e:
600            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
601                    % (self.state_filename, e))
602            self.synch_store = synch_store()
603
604        # Set the initial permissions on data in the store.  XXX: This ad hoc
605        # authorization attribute initialization is getting out of hand.
606        # XXX: legacy
607        if self.auth_type == 'legacy':
608            for k in self.synch_store.all_keys():
609                try:
610                    if k.startswith('fedid:'):
611                        fid = fedid(hexstr=k[6:46])
612                        if self.state.has_key(fid):
613                            for a in self.get_alloc_ids(self.state[fid]):
614                                self.auth.set_attribute(a, k)
615                except ValueError, e:
616                    self.log.warn("Cannot deduce permissions for %s" % k)
617
618
619    def write_store(self):
620        """
621        Write a new copy of synch_store after writing current state
622        to a backup.  We use the internal synch_store pickle method to avoid
623        incinsistent data.
624
625        State format is a simple pickling of the store.
626        """
627        if os.access(self.store_filename, os.W_OK):
628            copy_file(self.store_filename, \
629                    "%s.bak" % self.store_filename)
630        try:
631            self.synch_store.save(self.store_filename)
632        except EnvironmentError, e:
633            self.log.error("Can't write file %s: %s" % \
634                    (self.store_filename, e))
635        except TypeError, e:
636            self.log.error("Pickling problem (TypeError): %s" % e)
637
638       
639    def generate_ssh_keys(self, dest, type="rsa" ):
640        """
641        Generate a set of keys for the gateways to use to talk.
642
643        Keys are of type type and are stored in the required dest file.
644        """
645        valid_types = ("rsa", "dsa")
646        t = type.lower();
647        if t not in valid_types: raise ValueError
648        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
649
650        try:
651            trace = open("/dev/null", "w")
652        except EnvironmentError:
653            raise service_error(service_error.internal,
654                    "Cannot open /dev/null??");
655
656        # May raise CalledProcessError
657        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
658        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
659        if rv != 0:
660            raise service_error(service_error.internal, 
661                    "Cannot generate nonce ssh keys.  %s return code %d" \
662                            % (self.ssh_keygen, rv))
663
664    def gentopo(self, str):
665        """
666        Generate the topology dtat structure from the splitter's XML
667        representation of it.
668
669        The topology XML looks like:
670            <experiment>
671                <nodes>
672                    <node><vname></vname><ips>ip1:ip2</ips></node>
673                </nodes>
674                <lans>
675                    <lan>
676                        <vname></vname><vnode></vnode><ip></ip>
677                        <bandwidth></bandwidth><member>node:port</member>
678                    </lan>
679                </lans>
680        """
681        class topo_parse:
682            """
683            Parse the topology XML and create the dats structure.
684            """
685            def __init__(self):
686                # Typing of the subelements for data conversion
687                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
688                self.int_subelements = ( 'bandwidth',)
689                self.float_subelements = ( 'delay',)
690                # The final data structure
691                self.nodes = [ ]
692                self.lans =  [ ]
693                self.topo = { \
694                        'node': self.nodes,\
695                        'lan' : self.lans,\
696                    }
697                self.element = { }  # Current element being created
698                self.chars = ""     # Last text seen
699
700            def end_element(self, name):
701                # After each sub element the contents is added to the current
702                # element or to the appropriate list.
703                if name == 'node':
704                    self.nodes.append(self.element)
705                    self.element = { }
706                elif name == 'lan':
707                    self.lans.append(self.element)
708                    self.element = { }
709                elif name in self.str_subelements:
710                    self.element[name] = self.chars
711                    self.chars = ""
712                elif name in self.int_subelements:
713                    self.element[name] = int(self.chars)
714                    self.chars = ""
715                elif name in self.float_subelements:
716                    self.element[name] = float(self.chars)
717                    self.chars = ""
718
719            def found_chars(self, data):
720                self.chars += data.rstrip()
721
722
723        tp = topo_parse();
724        parser = xml.parsers.expat.ParserCreate()
725        parser.EndElementHandler = tp.end_element
726        parser.CharacterDataHandler = tp.found_chars
727
728        parser.Parse(str)
729
730        return tp.topo
731       
732
733    def genviz(self, topo):
734        """
735        Generate the visualization the virtual topology
736        """
737
738        neato = "/usr/local/bin/neato"
739        # These are used to parse neato output and to create the visualization
740        # file.
741        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
742        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
743                "%s</type></node>"
744
745        try:
746            # Node names
747            nodes = [ n['vname'] for n in topo['node'] ]
748            topo_lans = topo['lan']
749        except KeyError, e:
750            raise service_error(service_error.internal, "Bad topology: %s" %e)
751
752        lans = { }
753        links = { }
754
755        # Walk through the virtual topology, organizing the connections into
756        # 2-node connections (links) and more-than-2-node connections (lans).
757        # When a lan is created, it's added to the list of nodes (there's a
758        # node in the visualization for the lan).
759        for l in topo_lans:
760            if links.has_key(l['vname']):
761                if len(links[l['vname']]) < 2:
762                    links[l['vname']].append(l['vnode'])
763                else:
764                    nodes.append(l['vname'])
765                    lans[l['vname']] = links[l['vname']]
766                    del links[l['vname']]
767                    lans[l['vname']].append(l['vnode'])
768            elif lans.has_key(l['vname']):
769                lans[l['vname']].append(l['vnode'])
770            else:
771                links[l['vname']] = [ l['vnode'] ]
772
773
774        # Open up a temporary file for dot to turn into a visualization
775        try:
776            df, dotname = tempfile.mkstemp()
777            dotfile = os.fdopen(df, 'w')
778        except EnvironmentError:
779            raise service_error(service_error.internal,
780                    "Failed to open file in genviz")
781
782        try:
783            dnull = open('/dev/null', 'w')
784        except EnvironmentError:
785            service_error(service_error.internal,
786                    "Failed to open /dev/null in genviz")
787
788        # Generate a dot/neato input file from the links, nodes and lans
789        try:
790            print >>dotfile, "graph G {"
791            for n in nodes:
792                print >>dotfile, '\t"%s"' % n
793            for l in links.keys():
794                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
795            for l in lans.keys():
796                for n in lans[l]:
797                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
798            print >>dotfile, "}"
799            dotfile.close()
800        except TypeError:
801            raise service_error(service_error.internal,
802                    "Single endpoint link in vtopo")
803        except EnvironmentError:
804            raise service_error(service_error.internal, "Cannot write dot file")
805
806        # Use dot to create a visualization
807        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
808                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
809                close_fds=True)
810        dnull.close()
811
812        # Translate dot to vis format
813        vis_nodes = [ ]
814        vis = { 'node': vis_nodes }
815        for line in dot.stdout:
816            m = vis_re.match(line)
817            if m:
818                vn = m.group(1)
819                vis_node = {'name': vn, \
820                        'x': float(m.group(2)),\
821                        'y' : float(m.group(3)),\
822                    }
823                if vn in links.keys() or vn in lans.keys():
824                    vis_node['type'] = 'lan'
825                else:
826                    vis_node['type'] = 'node'
827                vis_nodes.append(vis_node)
828        rv = dot.wait()
829
830        os.remove(dotname)
831        if rv == 0 : return vis
832        else: return None
833
834    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
835        """
836        Get access to testbed through fedd and set the parameters for that tb
837        """
838        def get_export_project(svcs):
839            """
840            Look through for the list of federated_service for this testbed
841            objects for a project_export service, and extract the project
842            parameter.
843            """
844
845            pe = [s for s in svcs if s.name=='project_export']
846            if len(pe) == 1:
847                return pe[0].params.get('project', None)
848            elif len(pe) == 0:
849                return None
850            else:
851                raise service_error(service_error.req,
852                        "More than one project export is not supported")
853
854        uri = tbmap.get(testbed_base(tb), None)
855        if not uri:
856            raise service_error(service_error.server_config, 
857                    "Unknown testbed: %s" % tb)
858
859        export_svcs = masters.get(tb,[])
860        import_svcs = [ s for m in masters.values() \
861                for s in m \
862                    if tb in s.importers ]
863
864        export_project = get_export_project(export_svcs)
865
866        # Tweak search order so that if there are entries in access_user that
867        # have a project matching the export project, we try them first
868        if export_project: 
869            access_sequence = [ (p, u) for p, u in access_user \
870                    if p == export_project] 
871            access_sequence.extend([(p, u) for p, u in access_user \
872                    if p != export_project]) 
873        else: 
874            access_sequence = access_user
875
876        for p, u in access_sequence: 
877            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
878                    "to %s") %  ((p or "None"), u, uri))
879
880            if p:
881                # Request with user and project specified
882                req = {\
883                        'credential': [ "project: %s" % p, "user: %s"  % u],
884                    }
885            else:
886                # Request with only user specified
887                req = {\
888                        'credential': [ 'user: %s' % u ],
889                    }
890
891            # Make the service request from the services we're importing and
892            # exporting.  Keep track of the export request ids so we can
893            # collect the resulting info from the access response.
894            e_keys = { }
895            if import_svcs or export_svcs:
896                req['service'] = [ ]
897
898                for i, s in enumerate(import_svcs):
899                    idx = 'import%d' % i
900                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
901                    if s.params:
902                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
903                                for k, v in s.params.items()]
904                    req['service'].append(sr)
905
906                for i, s in enumerate(export_svcs):
907                    idx = 'export%d' % i
908                    e_keys[idx] = s
909                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
910                    if s.params:
911                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
912                                for k, v in s.params.items()]
913                    req['service'].append(sr)
914
915            # node resources if any
916            if nodes != None and len(nodes) > 0:
917                rnodes = [ ]
918                for n in nodes:
919                    rn = { }
920                    image, hw, count = n.split(":")
921                    if image: rn['image'] = [ image ]
922                    if hw: rn['hardware'] = [ hw ]
923                    if count and int(count) >0 : rn['count'] = int(count)
924                    rnodes.append(rn)
925                req['resources']= { }
926                req['resources']['node'] = rnodes
927
928            try:
929                if self.local_access.has_key(uri):
930                    # Local access call
931                    req = { 'RequestAccessRequestBody' : req }
932                    r = self.local_access[uri].RequestAccess(req, 
933                            fedid(file=self.cert_file))
934                    r = { 'RequestAccessResponseBody' : r }
935                else:
936                    r = self.call_RequestAccess(uri, req, 
937                            self.cert_file, self.cert_pwd, self.trusted_certs)
938            except service_error, e:
939                if e.code == service_error.access:
940                    self.log.debug("[get_access] Access denied")
941                    r = None
942                    continue
943                else:
944                    raise e
945
946            if r.has_key('RequestAccessResponseBody'):
947                # Through to here we have a valid response, not a fault.
948                # Access denied is a fault, so something better or worse than
949                # access denied has happened.
950                r = r['RequestAccessResponseBody']
951                self.log.debug("[get_access] Access granted")
952                break
953            else:
954                raise service_error(service_error.protocol,
955                        "Bad proxy response")
956       
957        if not r:
958            raise service_error(service_error.access, 
959                    "Access denied by %s (%s)" % (tb, uri))
960
961        tbparam[tb] = { 
962                "allocID" : r['allocID'],
963                "uri": uri,
964                }
965
966        # Collect the responses corresponding to the services this testbed
967        # exports.  These will be the service requests that we will include in
968        # the start segment requests (with appropriate visibility values) to
969        # import and export the segments.
970        for s in r.get('service', []):
971            id = s.get('id', None)
972            if id and id in e_keys:
973                e_keys[id].reqs.append(s)
974
975        # Add attributes to parameter space.  We don't allow attributes to
976        # overlay any parameters already installed.
977        for a in r.get('fedAttr', []):
978            try:
979                if a['attribute'] and \
980                        isinstance(a['attribute'], basestring)\
981                        and not tbparam[tb].has_key(a['attribute'].lower()):
982                    tbparam[tb][a['attribute'].lower()] = a['value']
983            except KeyError:
984                self.log.error("Bad attribute in response: %s" % a)
985
986    def release_access(self, tb, aid, tbmap=None, uri=None):
987        """
988        Release access to testbed through fedd
989        """
990
991        if not uri and tbmap:
992            uri = tbmap.get(tb, None)
993        if not uri:
994            raise service_error(service_error.server_config, 
995                    "Unknown testbed: %s" % tb)
996
997        if self.local_access.has_key(uri):
998            resp = self.local_access[uri].ReleaseAccess(\
999                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1000                    fedid(file=self.cert_file))
1001            resp = { 'ReleaseAccessResponseBody': resp } 
1002        else:
1003            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1004                    self.cert_file, self.cert_pwd, self.trusted_certs)
1005
1006        # better error coding
1007
1008    def remote_ns2topdl(self, uri, desc):
1009
1010        req = {
1011                'description' : { 'ns2description': desc },
1012            }
1013
1014        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1015                self.trusted_certs)
1016
1017        if r.has_key('Ns2TopdlResponseBody'):
1018            r = r['Ns2TopdlResponseBody']
1019            ed = r.get('experimentdescription', None)
1020            if ed.has_key('topdldescription'):
1021                return topdl.Topology(**ed['topdldescription'])
1022            else:
1023                raise service_error(service_error.protocol, 
1024                        "Bad splitter response (no output)")
1025        else:
1026            raise service_error(service_error.protocol, "Bad splitter response")
1027
1028    class start_segment:
1029        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1030                cert_pwd=None, trusted_certs=None, caller=None,
1031                log_collector=None):
1032            self.log = log
1033            self.debug = debug
1034            self.cert_file = cert_file
1035            self.cert_pwd = cert_pwd
1036            self.trusted_certs = None
1037            self.caller = caller
1038            self.testbed = testbed
1039            self.log_collector = log_collector
1040            self.response = None
1041            self.node = { }
1042
1043        def make_map(self, resp):
1044            for e in resp.get('embedding', []):
1045                if 'toponame' in e and 'physname' in e:
1046                    self.node[e['toponame']] = e['physname'][0]
1047
1048        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1049            req = {
1050                    'allocID': { 'fedid' : aid }, 
1051                    'segmentdescription': { 
1052                        'topdldescription': topo.to_dict(),
1053                    },
1054                }
1055
1056            if connInfo:
1057                req['connection'] = connInfo
1058
1059            import_svcs = [ s for m in masters.values() \
1060                    for s in m if self.testbed in s.importers]
1061
1062            if import_svcs or self.testbed in masters:
1063                req['service'] = []
1064
1065            for s in import_svcs:
1066                for r in s.reqs:
1067                    sr = copy.deepcopy(r)
1068                    sr['visibility'] = 'import';
1069                    req['service'].append(sr)
1070
1071            for s in masters.get(self.testbed, []):
1072                for r in s.reqs:
1073                    sr = copy.deepcopy(r)
1074                    sr['visibility'] = 'export';
1075                    req['service'].append(sr)
1076
1077            if attrs:
1078                req['fedAttr'] = attrs
1079
1080            try:
1081                self.log.debug("Calling StartSegment at %s " % uri)
1082                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1083                        self.trusted_certs)
1084                if r.has_key('StartSegmentResponseBody'):
1085                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1086                            None)
1087                    if lval and self.log_collector:
1088                        for line in  lval.splitlines(True):
1089                            self.log_collector.write(line)
1090                    self.make_map(r['StartSegmentResponseBody'])
1091                    self.response = r
1092                else:
1093                    raise service_error(service_error.internal, 
1094                            "Bad response!?: %s" %r)
1095                return True
1096            except service_error, e:
1097                self.log.error("Start segment failed on %s: %s" % \
1098                        (self.testbed, e))
1099                return False
1100
1101
1102
1103    class terminate_segment:
1104        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1105                cert_pwd=None, trusted_certs=None, caller=None):
1106            self.log = log
1107            self.debug = debug
1108            self.cert_file = cert_file
1109            self.cert_pwd = cert_pwd
1110            self.trusted_certs = None
1111            self.caller = caller
1112            self.testbed = testbed
1113
1114        def __call__(self, uri, aid ):
1115            req = {
1116                    'allocID': aid , 
1117                }
1118            try:
1119                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1120                        self.trusted_certs)
1121                return True
1122            except service_error, e:
1123                self.log.error("Terminate segment failed on %s: %s" % \
1124                        (self.testbed, e))
1125                return False
1126   
1127
1128    def allocate_resources(self, allocated, masters, eid, expid, 
1129            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1130            attrs=None, connInfo={}, tbmap=None, expcert=None):
1131
1132        started = { }           # Testbeds where a sub-experiment started
1133                                # successfully
1134
1135        # XXX
1136        fail_soft = False
1137
1138        if tbmap is None: tbmap = { }
1139
1140        log = alloc_log or self.log
1141
1142        thread_pool = self.thread_pool(self.nthreads)
1143        threads = [ ]
1144        starters = [ ]
1145
1146        if expcert:
1147            cert = expcert
1148            pw = None
1149        else:
1150            cert = self.cert_file
1151            pw = self.cert_pw
1152
1153        for tb in allocated.keys():
1154            # Create and start a thread to start the segment, and save it
1155            # to get the return value later
1156            tb_attrs = copy.copy(attrs)
1157            thread_pool.wait_for_slot()
1158            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1159            base, suffix = split_testbed(tb)
1160            if suffix:
1161                tb_attrs.append({'attribute': 'experiment_name', 
1162                    'value': "%s-%s" % (eid, suffix)})
1163            else:
1164                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1165            if not uri:
1166                raise service_error(service_error.internal, 
1167                        "Unknown testbed %s !?" % tb)
1168
1169            if tbparams[tb].has_key('allocID') and \
1170                    tbparams[tb]['allocID'].has_key('fedid'):
1171                aid = tbparams[tb]['allocID']['fedid']
1172            else:
1173                raise service_error(service_error.internal, 
1174                        "No alloc id for testbed %s !?" % tb)
1175
1176            s = self.start_segment(log=log, debug=self.debug,
1177                    testbed=tb, cert_file=cert,
1178                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1179                    caller=self.call_StartSegment,
1180                    log_collector=log_collector)
1181            starters.append(s)
1182            t  = self.pooled_thread(\
1183                    target=s, name=tb,
1184                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1185                    pdata=thread_pool, trace_file=self.trace_file)
1186            threads.append(t)
1187            t.start()
1188
1189        # Wait until all finish (keep pinging the log, though)
1190        mins = 0
1191        revoked = False
1192        while not thread_pool.wait_for_all_done(60.0):
1193            mins += 1
1194            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1195                    % mins)
1196            if not revoked and \
1197                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1198                # a testbed has failed.  Revoke this experiment's
1199                # synchronizarion values so that sub experiments will not
1200                # deadlock waiting for synchronization that will never happen
1201                self.log.info("A subexperiment has failed to swap in, " + \
1202                        "revoking synch keys")
1203                var_key = "fedid:%s" % expid
1204                for k in self.synch_store.all_keys():
1205                    if len(k) > 45 and k[0:46] == var_key:
1206                        self.synch_store.revoke_key(k)
1207                revoked = True
1208
1209        failed = [ t.getName() for t in threads if not t.rv ]
1210        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1211
1212        # If one failed clean up, unless fail_soft is set
1213        if failed:
1214            if not fail_soft:
1215                thread_pool.clear()
1216                for tb in succeeded:
1217                    # Create and start a thread to stop the segment
1218                    thread_pool.wait_for_slot()
1219                    uri = tbparams[tb]['uri']
1220                    t  = self.pooled_thread(\
1221                            target=self.terminate_segment(log=log,
1222                                testbed=tb,
1223                                cert_file=self.cert_file, 
1224                                cert_pwd=self.cert_pwd,
1225                                trusted_certs=self.trusted_certs,
1226                                caller=self.call_TerminateSegment),
1227                            args=(uri, tbparams[tb]['federant']['allocID']),
1228                            name=tb,
1229                            pdata=thread_pool, trace_file=self.trace_file)
1230                    t.start()
1231                # Wait until all finish (if any are being stopped)
1232                if succeeded:
1233                    thread_pool.wait_for_all_done()
1234
1235                # release the allocations
1236                for tb in tbparams.keys():
1237                    self.release_access(tb, tbparams[tb]['allocID'], 
1238                            tbmap=tbmap, uri=tbparams[tb].get('uri', None))
1239                # Remove the placeholder
1240                self.state_lock.acquire()
1241                self.state[eid]['experimentStatus'] = 'failed'
1242                if self.state_filename: self.write_state()
1243                self.state_lock.release()
1244                # Remove the repo dir
1245                self.remove_dirs("%s/%s" %(self.repodir, expid))
1246                # Walk up tmpdir, deleting as we go
1247                if self.cleanup:
1248                    self.remove_dirs(tmpdir)
1249                else:
1250                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1251
1252
1253                log.error("Swap in failed on %s" % ",".join(failed))
1254                return
1255        else:
1256            # Walk through the successes and gather the virtual to physical
1257            # mapping.
1258            embedding = [ ]
1259            for s in starters:
1260                for k, v in s.node.items():
1261                    embedding.append({
1262                        'toponame': k, 
1263                        'physname': [ v],
1264                        'testbed': s.testbed
1265                        })
1266            log.info("[start_segment]: Experiment %s active" % eid)
1267
1268
1269        # Walk up tmpdir, deleting as we go
1270        if self.cleanup:
1271            self.remove_dirs(tmpdir)
1272        else:
1273            log.debug("[start_experiment]: not removing %s" % tmpdir)
1274
1275        # Insert the experiment into our state and update the disk copy.
1276        self.state_lock.acquire()
1277        self.state[expid]['experimentStatus'] = 'active'
1278        self.state[eid] = self.state[expid]
1279        self.state[eid]['experimentdescription']['topdldescription'] = \
1280                top.to_dict()
1281        self.state[eid]['embedding'] = embedding
1282        if self.state_filename: self.write_state()
1283        self.state_lock.release()
1284        return
1285
1286
1287    def add_kit(self, e, kit):
1288        """
1289        Add a Software object created from the list of (install, location)
1290        tuples passed as kit  to the software attribute of an object e.  We
1291        do this enough to break out the code, but it's kind of a hack to
1292        avoid changing the old tuple rep.
1293        """
1294
1295        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1296
1297        if isinstance(e.software, list): e.software.extend(s)
1298        else: e.software = s
1299
1300
1301    def create_experiment_state(self, fid, req, expid, expcert,
1302            state='starting'):
1303        """
1304        Create the initial entry in the experiment's state.  The expid and
1305        expcert are the experiment's fedid and certifacte that represents that
1306        ID, which are installed in the experiment state.  If the request
1307        includes a suggested local name that is used if possible.  If the local
1308        name is already taken by an experiment owned by this user that has
1309        failed, it is overwritten.  Otherwise new letters are added until a
1310        valid localname is found.  The generated local name is returned.
1311        """
1312
1313        if req.has_key('experimentID') and \
1314                req['experimentID'].has_key('localname'):
1315            overwrite = False
1316            eid = req['experimentID']['localname']
1317            # If there's an old failed experiment here with the same local name
1318            # and accessible by this user, we'll overwrite it, otherwise we'll
1319            # fall through and do the collision avoidance.
1320            old_expid = self.get_experiment_fedid(eid)
1321            if old_expid and self.check_experiment_access(fid, old_expid):
1322                self.state_lock.acquire()
1323                status = self.state[eid].get('experimentStatus', None)
1324                if status and status == 'failed':
1325                    # remove the old access attribute
1326                    self.auth.unset_attribute(fid, old_expid)
1327                    self.auth.save()
1328                    overwrite = True
1329                    del self.state[eid]
1330                    del self.state[old_expid]
1331                self.state_lock.release()
1332            self.state_lock.acquire()
1333            while (self.state.has_key(eid) and not overwrite):
1334                eid += random.choice(string.ascii_letters)
1335            # Initial state
1336            self.state[eid] = {
1337                    'experimentID' : \
1338                            [ { 'localname' : eid }, {'fedid': expid } ],
1339                    'experimentStatus': state,
1340                    'experimentAccess': { 'X509' : expcert },
1341                    'owner': fid,
1342                    'log' : [],
1343                }
1344            self.state[expid] = self.state[eid]
1345            if self.state_filename: self.write_state()
1346            self.state_lock.release()
1347        else:
1348            eid = self.exp_stem
1349            for i in range(0,5):
1350                eid += random.choice(string.ascii_letters)
1351            self.state_lock.acquire()
1352            while (self.state.has_key(eid)):
1353                eid = self.exp_stem
1354                for i in range(0,5):
1355                    eid += random.choice(string.ascii_letters)
1356            # Initial state
1357            self.state[eid] = {
1358                    'experimentID' : \
1359                            [ { 'localname' : eid }, {'fedid': expid } ],
1360                    'experimentStatus': state,
1361                    'experimentAccess': { 'X509' : expcert },
1362                    'owner': fid,
1363                    'log' : [],
1364                }
1365            self.state[expid] = self.state[eid]
1366            if self.state_filename: self.write_state()
1367            self.state_lock.release()
1368
1369        return eid
1370
1371
1372    def allocate_ips_to_topo(self, top):
1373        """
1374        Add an ip4_address attribute to all the hosts in the topology, based on
1375        the shared substrates on which they sit.  An /etc/hosts file is also
1376        created and returned as a list of hostfiles entries.  We also return
1377        the allocator, because we may need to allocate IPs to portals
1378        (specifically DRAGON portals).
1379        """
1380        subs = sorted(top.substrates, 
1381                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1382                reverse=True)
1383        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1384        ifs = { }
1385        hosts = [ ]
1386
1387        for idx, s in enumerate(subs):
1388            net_size = len(s.interfaces)+2
1389
1390            a = ips.allocate(net_size)
1391            if a :
1392                base, num = a
1393                if num < net_size: 
1394                    raise service_error(service_error.internal,
1395                            "Allocator returned wrong number of IPs??")
1396            else:
1397                raise service_error(service_error.req, 
1398                        "Cannot allocate IP addresses")
1399            mask = ips.min_alloc
1400            while mask < net_size:
1401                mask *= 2
1402
1403            netmask = ((2**32-1) ^ (mask-1))
1404
1405            base += 1
1406            for i in s.interfaces:
1407                i.attribute.append(
1408                        topdl.Attribute('ip4_address', 
1409                            "%s" % ip_addr(base)))
1410                i.attribute.append(
1411                        topdl.Attribute('ip4_netmask', 
1412                            "%s" % ip_addr(int(netmask))))
1413
1414                hname = i.element.name
1415                if ifs.has_key(hname):
1416                    hosts.append("%s\t%s-%s %s-%d" % \
1417                            (ip_addr(base), hname, s.name, hname,
1418                                ifs[hname]))
1419                else:
1420                    ifs[hname] = 0
1421                    hosts.append("%s\t%s-%s %s-%d %s" % \
1422                            (ip_addr(base), hname, s.name, hname,
1423                                ifs[hname], hname))
1424
1425                ifs[hname] += 1
1426                base += 1
1427        return hosts, ips
1428
1429    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1430            tbparams, masters, tbmap):
1431        """
1432        Request access to the various testbeds required for this instantiation
1433        (passed in as testbeds).  User, access_user, expoert_project and master
1434        are used to construct the correct requests.  Per-testbed parameters are
1435        returned in tbparams.
1436        """
1437        for tb in testbeds:
1438            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1439            allocated[tb] = 1
1440
1441    def get_abac_access_to_testbeds(self, testbeds, fid, allocated, 
1442            tbparams, masters, tbmap, expid=None, expcert=None):
1443        for tb in testbeds:
1444            self.get_abac_access(tb, tbparams, fid, masters, tbmap, expid,
1445                    expcert)
1446            allocated[tb] = 1
1447
1448    def get_abac_access(self, tb, tbparams,fid, masters, tbmap, expid=None, expcert=None):
1449        """
1450        Get access to testbed through fedd and set the parameters for that tb
1451        """
1452        def get_export_project(svcs):
1453            """
1454            Look through for the list of federated_service for this testbed
1455            objects for a project_export service, and extract the project
1456            parameter.
1457            """
1458
1459            pe = [s for s in svcs if s.name=='project_export']
1460            if len(pe) == 1:
1461                return pe[0].params.get('project', None)
1462            elif len(pe) == 0:
1463                return None
1464            else:
1465                raise service_error(service_error.req,
1466                        "More than one project export is not supported")
1467
1468        uri = tbmap.get(testbed_base(tb), None)
1469        if not uri:
1470            raise service_error(service_error.server_config, 
1471                    "Unknown testbed: %s" % tb)
1472
1473        export_svcs = masters.get(tb,[])
1474        import_svcs = [ s for m in masters.values() \
1475                for s in m \
1476                    if tb in s.importers ]
1477
1478        export_project = get_export_project(export_svcs)
1479        # Compose the credential list so that IDs come before attributes
1480        creds = set()
1481        keys = set()
1482        certs = self.auth.get_creds_for_principal(fid)
1483        if expid:
1484            print join([ "%s <- %s" % ( c.head().string(), c.tail().string()) \
1485                    for c in self.auth.get_creds_for_principal(expid)])
1486            certs.update(self.auth.get_creds_for_principal(expid))
1487        for c in certs:
1488            keys.add(c.issuer_cert())
1489            creds.add(c.attribute_cert())
1490        creds = list(keys) + list(creds)
1491
1492        if expcert: cert, pw = expcert, None
1493        else: cert, pw = self.cert_file, self.cert_pw
1494
1495        # Request credentials
1496        req = {
1497                'abac_credential': creds,
1498            }
1499        # Make the service request from the services we're importing and
1500        # exporting.  Keep track of the export request ids so we can
1501        # collect the resulting info from the access response.
1502        e_keys = { }
1503        if import_svcs or export_svcs:
1504            req['service'] = [ ]
1505
1506            for i, s in enumerate(import_svcs):
1507                idx = 'import%d' % i
1508                sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
1509                if s.params:
1510                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1511                            for k, v in s.params.items()]
1512                req['service'].append(sr)
1513
1514            for i, s in enumerate(export_svcs):
1515                idx = 'export%d' % i
1516                e_keys[idx] = s
1517                sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
1518                if s.params:
1519                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
1520                            for k, v in s.params.items()]
1521                req['service'].append(sr)
1522
1523
1524        if self.local_access.has_key(uri):
1525            # Local access call
1526            req = { 'RequestAccessRequestBody' : req }
1527            r = self.local_access[uri].RequestAccess(req, 
1528                    fedid(file=self.cert_file))
1529            r = { 'RequestAccessResponseBody' : r }
1530        else:
1531            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1532
1533        tbparam[tb] = { 
1534                "allocID" : r['allocID'],
1535                "uri": uri,
1536                }
1537
1538        # Collect the responses corresponding to the services this testbed
1539        # exports.  These will be the service requests that we will include in
1540        # the start segment requests (with appropriate visibility values) to
1541        # import and export the segments.
1542        for s in r.get('service', []):
1543            id = s.get('id', None)
1544            if id and id in e_keys:
1545                e_keys[id].reqs.append(s)
1546
1547        # Add attributes to parameter space.  We don't allow attributes to
1548        # overlay any parameters already installed.
1549        for a in r.get('fedAttr', []):
1550            try:
1551                if a['attribute'] and \
1552                        isinstance(a['attribute'], basestring)\
1553                        and not tbparam[tb].has_key(a['attribute'].lower()):
1554                    tbparam[tb][a['attribute'].lower()] = a['value']
1555            except KeyError:
1556                self.log.error("Bad attribute in response: %s" % a)
1557
1558
1559    def split_topology(self, top, topo, testbeds):
1560        """
1561        Create the sub-topologies that are needed for experiment instantiation.
1562        """
1563        for tb in testbeds:
1564            topo[tb] = top.clone()
1565            # copy in for loop allows deletions from the original
1566            for e in [ e for e in topo[tb].elements]:
1567                etb = e.get_attribute('testbed')
1568                # NB: elements without a testbed attribute won't appear in any
1569                # sub topologies. 
1570                if not etb or etb != tb:
1571                    for i in e.interface:
1572                        for s in i.subs:
1573                            try:
1574                                s.interfaces.remove(i)
1575                            except ValueError:
1576                                raise service_error(service_error.internal,
1577                                        "Can't remove interface??")
1578                    topo[tb].elements.remove(e)
1579            topo[tb].make_indices()
1580
1581    def wrangle_software(self, expid, top, topo, tbparams):
1582        """
1583        Copy software out to the repository directory, allocate permissions and
1584        rewrite the segment topologies to look for the software in local
1585        places.
1586        """
1587
1588        # Copy the rpms and tarfiles to a distribution directory from
1589        # which the federants can retrieve them
1590        linkpath = "%s/software" %  expid
1591        softdir ="%s/%s" % ( self.repodir, linkpath)
1592        softmap = { }
1593        # These are in a list of tuples format (each kit).  This comprehension
1594        # unwraps them into a single list of tuples that initilaizes the set of
1595        # tuples.
1596        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1597                for p, t in l ])
1598        pkgs.update([x.location for e in top.elements \
1599                for x in e.software])
1600        try:
1601            os.makedirs(softdir)
1602        except EnvironmentError, e:
1603            raise service_error(
1604                    "Cannot create software directory: %s" % e)
1605        # The actual copying.  Everything's converted into a url for copying.
1606        for pkg in pkgs:
1607            loc = pkg
1608
1609            scheme, host, path = urlparse(loc)[0:3]
1610            dest = os.path.basename(path)
1611            if not scheme:
1612                if not loc.startswith('/'):
1613                    loc = "/%s" % loc
1614                loc = "file://%s" %loc
1615            try:
1616                u = urlopen(loc)
1617            except Exception, e:
1618                raise service_error(service_error.req, 
1619                        "Cannot open %s: %s" % (loc, e))
1620            try:
1621                f = open("%s/%s" % (softdir, dest) , "w")
1622                self.log.debug("Writing %s/%s" % (softdir,dest) )
1623                data = u.read(4096)
1624                while data:
1625                    f.write(data)
1626                    data = u.read(4096)
1627                f.close()
1628                u.close()
1629            except Exception, e:
1630                raise service_error(service_error.internal,
1631                        "Could not copy %s: %s" % (loc, e))
1632            path = re.sub("/tmp", "", linkpath)
1633            # XXX
1634            softmap[pkg] = \
1635                    "%s/%s/%s" %\
1636                    ( self.repo_url, path, dest)
1637
1638            # Allow the individual segments to access the software.
1639            for tb in tbparams.keys():
1640                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1641                        "/%s/%s" % ( path, dest))
1642            self.auth.save()
1643
1644        # Convert the software locations in the segments into the local
1645        # copies on this host
1646        for soft in [ s for tb in topo.values() \
1647                for e in tb.elements \
1648                    if getattr(e, 'software', False) \
1649                        for s in e.software ]:
1650            if softmap.has_key(soft.location):
1651                soft.location = softmap[soft.location]
1652
1653
1654    def new_experiment(self, req, fid):
1655        """
1656        The external interface to empty initial experiment creation called from
1657        the dispatcher.
1658
1659        Creates a working directory, splits the incoming description using the
1660        splitter script and parses out the avrious subsections using the
1661        lcasses above.  Once each sub-experiment is created, use pooled threads
1662        to instantiate them and start it all up.
1663        """
1664        req = req.get('NewRequestBody', None)
1665        if not req:
1666            raise service_error(service_error.req,
1667                    "Bad request format (no NewRequestBody)")
1668
1669        if self.auth.import_credentials(data_list=req.get('credential', [])):
1670            self.auth.save()
1671
1672        if not self.auth.check_attribute(fid, 'new'):
1673            raise service_error(service_error.access, "New access denied")
1674
1675        try:
1676            tmpdir = tempfile.mkdtemp(prefix="split-")
1677        except EnvironmentError:
1678            raise service_error(service_error.internal, "Cannot create tmp dir")
1679
1680        try:
1681            access_user = self.accessdb[fid]
1682        except KeyError:
1683            raise service_error(service_error.internal,
1684                    "Access map and authorizer out of sync in " + \
1685                            "new_experiment for fedid %s"  % fid)
1686
1687        pid = "dummy"
1688        gid = "dummy"
1689
1690        # Generate an ID for the experiment (slice) and a certificate that the
1691        # allocator can use to prove they own it.  We'll ship it back through
1692        # the encrypted connection.  If the requester supplied one, use it.
1693        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1694            expcert = req['experimentAccess']['X509']
1695            expid = fedid(certstr=expcert)
1696            self.state_lock.acquire()
1697            if expid in self.state:
1698                self.state_lock.release()
1699                raise service_error(service_error.req, 
1700                        'fedid %s identifies an existing experiment' % expid)
1701            self.state_lock.release()
1702        else:
1703            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1704
1705        #now we're done with the tmpdir, and it should be empty
1706        if self.cleanup:
1707            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1708            os.rmdir(tmpdir)
1709        else:
1710            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1711
1712        eid = self.create_experiment_state(fid, req, expid, expcert, 
1713                state='empty')
1714
1715        # Let users touch the state
1716        self.auth.set_attribute(fid, expid)
1717        self.auth.set_attribute(expid, expid)
1718        # Override fedids can manipulate state as well
1719        for o in self.overrides:
1720            self.auth.set_attribute(o, expid)
1721        self.auth.save()
1722
1723        rv = {
1724                'experimentID': [
1725                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1726                ],
1727                'experimentStatus': 'empty',
1728                'experimentAccess': { 'X509' : expcert }
1729            }
1730
1731        return rv
1732
1733    def create_experiment(self, req, fid):
1734        """
1735        The external interface to experiment creation called from the
1736        dispatcher.
1737
1738        Creates a working directory, splits the incoming description using the
1739        splitter script and parses out the various subsections using the
1740        classes above.  Once each sub-experiment is created, use pooled threads
1741        to instantiate them and start it all up.
1742        """
1743
1744        req = req.get('CreateRequestBody', None)
1745        if not req:
1746            raise service_error(service_error.req,
1747                    "Bad request format (no CreateRequestBody)")
1748
1749        # Get the experiment access
1750        exp = req.get('experimentID', None)
1751        if exp:
1752            if exp.has_key('fedid'):
1753                key = exp['fedid']
1754                expid = key
1755                eid = None
1756            elif exp.has_key('localname'):
1757                key = exp['localname']
1758                eid = key
1759                expid = None
1760            else:
1761                raise service_error(service_error.req, "Unknown lookup type")
1762        else:
1763            raise service_error(service_error.req, "No request?")
1764
1765        print "%s" % expid
1766        print 'creds ',
1767        print join([ "%s <- %s" % ( c.head().string(), c.tail().string()) \
1768                for c in self.auth.get_creds_for_principal(expid)])
1769        # Import information from the requester
1770        if self.auth.import_credentials(data_list=req.get('credential', [])):
1771            self.auth.save()
1772
1773        print 'creds ',
1774        print join([ "%s <- %s" % ( c.head().string(), c.tail().string()) \
1775                for c in self.auth.get_creds_for_principal(expid)])
1776        self.check_experiment_access(fid, key)
1777
1778        # Install the testbed map entries supplied with the request into a copy
1779        # of the testbed map.
1780        tbmap = dict(self.tbmap)
1781        for m in req.get('testbedmap', []):
1782            if 'testbed' in m and 'uri' in m:
1783                tbmap[m['testbed']] = m['uri']
1784
1785        try:
1786            tmpdir = tempfile.mkdtemp(prefix="split-")
1787            os.mkdir(tmpdir+"/keys")
1788        except EnvironmentError:
1789            raise service_error(service_error.internal, "Cannot create tmp dir")
1790
1791        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1792        gw_secretkey_base = "fed.%s" % self.ssh_type
1793        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1794        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1795        tclfile = tmpdir + "/experiment.tcl"
1796        tbparams = { }
1797        try:
1798            access_user = self.accessdb[fid]
1799        except KeyError:
1800            raise service_error(service_error.internal,
1801                    "Access map and authorizer out of sync in " + \
1802                            "create_experiment for fedid %s"  % fid)
1803
1804        pid = "dummy"
1805        gid = "dummy"
1806
1807        # The tcl parser needs to read a file so put the content into that file
1808        descr=req.get('experimentdescription', None)
1809        if descr:
1810            file_content=descr.get('ns2description', None)
1811            if file_content:
1812                try:
1813                    f = open(tclfile, 'w')
1814                    f.write(file_content)
1815                    f.close()
1816                except EnvironmentError:
1817                    raise service_error(service_error.internal,
1818                            "Cannot write temp experiment description")
1819            else:
1820                raise service_error(service_error.req, 
1821                        "Only ns2descriptions supported")
1822        else:
1823            raise service_error(service_error.req, "No experiment description")
1824
1825        self.state_lock.acquire()
1826        if self.state.has_key(key):
1827            self.state[key]['experimentStatus'] = "starting"
1828            for e in self.state[key].get('experimentID',[]):
1829                if not expid and e.has_key('fedid'):
1830                    expid = e['fedid']
1831                elif not eid and e.has_key('localname'):
1832                    eid = e['localname']
1833            if 'experimentAccess' in self.state[key] and \
1834                    'X509' in self.state[key]['experimentAccess']:
1835                expcert = self.state[key]['experimentAccess']['X509']
1836            else:
1837                expcert = None
1838        self.state_lock.release()
1839
1840        if not (eid and expid):
1841            raise service_error(service_error.internal, 
1842                    "Cannot find local experiment info!?")
1843
1844        # make a protected copy of the access certificate so the experiment
1845        # controller can act as the experiment principal.  mkstemp is the most
1846        # secure way to do that and the file is in a directory created by
1847        # mkdtemp.  expcert enters the if as the contents of the file and
1848        # leaves is as the filename in which the cert is stored.  All this goes
1849        # away when the tempfiles are cleared.
1850        if expcert:
1851            try:
1852                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
1853                f = os.fdopen(certf, 'w')
1854                print >> f, expcert
1855                f.close()
1856                expcert = certfn
1857            except EnvironmentError, e:
1858                raise service_error(service_error.internal, 
1859                        "Cannot create temp cert file?")
1860
1861        try: 
1862            # This catches exceptions to clear the placeholder if necessary
1863            try:
1864                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1865            except ValueError:
1866                raise service_error(service_error.server_config, 
1867                        "Bad key type (%s)" % self.ssh_type)
1868
1869            # Copy the service request
1870            tb_services = [ s for s in req.get('service',[]) ]
1871            # Translate to topdl
1872            if self.splitter_url:
1873                self.log.debug("Calling remote topdl translator at %s" % \
1874                        self.splitter_url)
1875                top = self.remote_ns2topdl(self.splitter_url, file_content)
1876            else:
1877                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1878                    str(self.muxmax), '-m', 'dummy']
1879
1880                tclcmd.extend([pid, gid, eid, tclfile])
1881
1882                self.log.debug("running local splitter %s", " ".join(tclcmd))
1883                # This is just fantastic.  As a side effect the parser copies
1884                # tb_compat.tcl into the current directory, so that directory
1885                # must be writable by the fedd user.  Doing this in the
1886                # temporary subdir ensures this is the case.
1887                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1888                        cwd=tmpdir)
1889                split_data = tclparser.stdout
1890
1891                top = topdl.topology_from_xml(file=split_data, top="experiment")
1892
1893            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1894            # Find the testbeds to look up
1895            testbeds = set([ a.value for e in top.elements \
1896                    for a in e.attribute \
1897                        if a.attribute == 'testbed'])
1898
1899            tb_hosts = { }
1900            for tb in testbeds:
1901                tb_hosts[tb] = [ e.name for e in top.elements \
1902                        if isinstance(e, topdl.Computer) and \
1903                            e.get_attribute('testbed') and \
1904                            e.get_attribute('testbed') == tb]
1905
1906            masters = { }           # testbeds exporting services
1907            pmasters = { }          # Testbeds exporting services that
1908                                    # need portals
1909            for s in tb_services:
1910                # If this is a service request with the importall field
1911                # set, fill it out.
1912
1913                if s.get('importall', False):
1914                    s['import'] = [ tb for tb in testbeds \
1915                            if tb not in s.get('export',[])]
1916                    del s['importall']
1917
1918                # Add the service to masters
1919                for tb in s.get('export', []):
1920                    if s.get('name', None):
1921                        if tb not in masters:
1922                            masters[tb] = [ ]
1923
1924                        params = { }
1925                        if 'fedAttr' in s:
1926                            for a in s['fedAttr']:
1927                                params[a.get('attribute', '')] = \
1928                                        a.get('value','')
1929
1930                        fser = federated_service(name=s['name'],
1931                                exporter=tb, importers=s.get('import',[]),
1932                                params=params)
1933                        if fser.name == 'hide_hosts' \
1934                                and 'hosts' not in fser.params:
1935                            fser.params['hosts'] = \
1936                                    ",".join(tb_hosts.get(fser.exporter, []))
1937                        masters[tb].append(fser)
1938
1939                        if fser.portal:
1940                            if tb not in pmasters: pmasters[tb] = [ fser ]
1941                            else: pmasters[tb].append(fser)
1942                    else:
1943                        self.log.error('Testbed service does not have name " + \
1944                                "and importers')
1945
1946
1947            allocated = { }         # Testbeds we can access
1948            topo ={ }               # Sub topologies
1949            connInfo = { }          # Connection information
1950
1951            if self.auth_type == 'legacy':
1952                self.get_access_to_testbeds(testbeds, access_user, allocated, 
1953                        tbparams, masters, tbmap)
1954            elif self.auth_type == 'abac':
1955                self.get_abac_access_to_testbeds(testbeds, fid, allocated, 
1956                        tbparams, masters, tbmap, expid, expcert)
1957            else:
1958                raise service_error(service_error.internal, 
1959                        "Unknown auth_type %s" % self.auth_type)
1960
1961            self.split_topology(top, topo, testbeds)
1962
1963            # Copy configuration files into the remote file store
1964            # The config urlpath
1965            configpath = "/%s/config" % expid
1966            # The config file system location
1967            configdir ="%s%s" % ( self.repodir, configpath)
1968            try:
1969                os.makedirs(configdir)
1970            except EnvironmentError, e:
1971                raise service_error(service_error.internal,
1972                        "Cannot create config directory: %s" % e)
1973            try:
1974                f = open("%s/hosts" % configdir, "w")
1975                f.write('\n'.join(hosts))
1976                f.close()
1977            except EnvironmentError, e:
1978                raise service_error(service_error.internal, 
1979                        "Cannot write hosts file: %s" % e)
1980            try:
1981                copy_file("%s" % gw_pubkey, "%s/%s" % \
1982                        (configdir, gw_pubkey_base))
1983                copy_file("%s" % gw_secretkey, "%s/%s" % \
1984                        (configdir, gw_secretkey_base))
1985            except EnvironmentError, e:
1986                raise service_error(service_error.internal, 
1987                        "Cannot copy keyfiles: %s" % e)
1988
1989            # Allow the individual testbeds to access the configuration files.
1990            for tb in tbparams.keys():
1991                asignee = tbparams[tb]['allocID']['fedid']
1992                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1993                    self.auth.set_attribute(asignee, "%s/%s" % \
1994                            (configpath, f))
1995
1996            part = experiment_partition(self.auth, self.store_url, tbmap,
1997                    self.muxmax, self.direct_transit)
1998            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1999                    connInfo, expid)
2000            # Now get access to the dynamic testbeds (those added above)
2001            for tb in [ t for t in topo if t not in allocated]:
2002                #XXX: ABAC
2003                self.get_access(tb, None, tbparams, access_user, masters, tbmap)
2004                allocated[tb] = 1
2005                store_keys = topo[tb].get_attribute('store_keys')
2006                # Give the testbed access to keys it exports or imports
2007                if store_keys:
2008                    for sk in store_keys.split(" "):
2009                        self.auth.set_attribute(\
2010                                tbparams[tb]['allocID']['fedid'], sk)
2011            self.auth.save()
2012
2013            self.wrangle_software(expid, top, topo, tbparams)
2014
2015            vtopo = topdl.topology_to_vtopo(top)
2016            vis = self.genviz(vtopo)
2017
2018            # save federant information
2019            for k in allocated.keys():
2020                tbparams[k]['federant'] = {
2021                        'name': [ { 'localname' : eid} ],
2022                        'allocID' : tbparams[k]['allocID'],
2023                        'uri': tbparams[k]['uri'],
2024                    }
2025
2026            self.state_lock.acquire()
2027            self.state[eid]['vtopo'] = vtopo
2028            self.state[eid]['vis'] = vis
2029            self.state[eid]['experimentdescription'] = \
2030                    { 'topdldescription': top.to_dict() }
2031            self.state[eid]['federant'] = \
2032                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2033                        if tbparams[tb].has_key('federant') ]
2034            if self.state_filename: 
2035                self.write_state()
2036            self.state_lock.release()
2037        except service_error, e:
2038            # If something goes wrong in the parse (usually an access error)
2039            # clear the placeholder state.  From here on out the code delays
2040            # exceptions.  Failing at this point returns a fault to the remote
2041            # caller.
2042
2043            self.state_lock.acquire()
2044            del self.state[eid]
2045            del self.state[expid]
2046            if self.state_filename: self.write_state()
2047            self.state_lock.release()
2048            raise e
2049
2050
2051        # Start the background swapper and return the starting state.  From
2052        # here on out, the state will stick around a while.
2053
2054        # Let users touch the state
2055        self.auth.set_attribute(fid, expid)
2056        self.auth.set_attribute(expid, expid)
2057        # Override fedids can manipulate state as well
2058        for o in self.overrides:
2059            self.auth.set_attribute(o, expid)
2060        self.auth.save()
2061
2062        # Create a logger that logs to the experiment's state object as well as
2063        # to the main log file.
2064        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2065        alloc_collector = self.list_log(self.state[eid]['log'])
2066        h = logging.StreamHandler(alloc_collector)
2067        # XXX: there should be a global one of these rather than repeating the
2068        # code.
2069        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2070                    '%d %b %y %H:%M:%S'))
2071        alloc_log.addHandler(h)
2072       
2073        attrs = [ 
2074                {
2075                    'attribute': 'ssh_pubkey', 
2076                    'value': '%s/%s/config/%s' % \
2077                            (self.repo_url, expid, gw_pubkey_base)
2078                },
2079                {
2080                    'attribute': 'ssh_secretkey', 
2081                    'value': '%s/%s/config/%s' % \
2082                            (self.repo_url, expid, gw_secretkey_base)
2083                },
2084                {
2085                    'attribute': 'hosts', 
2086                    'value': '%s/%s/config/hosts' % \
2087                            (self.repo_url, expid)
2088                },
2089            ]
2090
2091        # transit and disconnected testbeds may not have a connInfo entry.
2092        # Fill in the blanks.
2093        for t in allocated.keys():
2094            if not connInfo.has_key(t):
2095                connInfo[t] = { }
2096
2097        # Start a thread to do the resource allocation
2098        t  = Thread(target=self.allocate_resources,
2099                args=(allocated, masters, eid, expid, tbparams, 
2100                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2101                    connInfo, tbmap, expcert),
2102                name=eid)
2103        t.start()
2104
2105        rv = {
2106                'experimentID': [
2107                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2108                ],
2109                'experimentStatus': 'starting',
2110            }
2111
2112        return rv
2113   
2114    def get_experiment_fedid(self, key):
2115        """
2116        find the fedid associated with the localname key in the state database.
2117        """
2118
2119        rv = None
2120        self.state_lock.acquire()
2121        if self.state.has_key(key):
2122            if isinstance(self.state[key], dict):
2123                try:
2124                    kl = [ f['fedid'] for f in \
2125                            self.state[key]['experimentID']\
2126                                if f.has_key('fedid') ]
2127                except KeyError:
2128                    self.state_lock.release()
2129                    raise service_error(service_error.internal, 
2130                            "No fedid for experiment %s when getting "+\
2131                                    "fedid(!?)" % key)
2132                if len(kl) == 1:
2133                    rv = kl[0]
2134                else:
2135                    self.state_lock.release()
2136                    raise service_error(service_error.internal, 
2137                            "multiple fedids for experiment %s when " +\
2138                                    "getting fedid(!?)" % key)
2139            else:
2140                self.state_lock.release()
2141                raise service_error(service_error.internal, 
2142                        "Unexpected state for %s" % key)
2143        self.state_lock.release()
2144        return rv
2145
2146    def check_experiment_access(self, fid, key):
2147        """
2148        Confirm that the fid has access to the experiment.  Though a request
2149        may be made in terms of a local name, the access attribute is always
2150        the experiment's fedid.
2151        """
2152        if not isinstance(key, fedid):
2153            key = self.get_experiment_fedid(key)
2154
2155        if self.auth.check_attribute(fid, key):
2156            return True
2157        else:
2158            raise service_error(service_error.access, "Access Denied")
2159
2160
2161    def get_handler(self, path, fid):
2162        self.log.info("Get handler %s %s" % (path, fid))
2163        if self.auth.check_attribute(fid, path):
2164            return ("%s/%s" % (self.repodir, path), "application/binary")
2165        else:
2166            return (None, None)
2167
2168    def get_vtopo(self, req, fid):
2169        """
2170        Return the stored virtual topology for this experiment
2171        """
2172        rv = None
2173        state = None
2174
2175        req = req.get('VtopoRequestBody', None)
2176        if not req:
2177            raise service_error(service_error.req,
2178                    "Bad request format (no VtopoRequestBody)")
2179        exp = req.get('experiment', None)
2180        if exp:
2181            if exp.has_key('fedid'):
2182                key = exp['fedid']
2183                keytype = "fedid"
2184            elif exp.has_key('localname'):
2185                key = exp['localname']
2186                keytype = "localname"
2187            else:
2188                raise service_error(service_error.req, "Unknown lookup type")
2189        else:
2190            raise service_error(service_error.req, "No request?")
2191
2192        self.check_experiment_access(fid, key)
2193
2194        self.state_lock.acquire()
2195        if self.state.has_key(key):
2196            if self.state[key].has_key('vtopo'):
2197                rv = { 'experiment' : {keytype: key },\
2198                        'vtopo': self.state[key]['vtopo'],\
2199                    }
2200            else:
2201                state = self.state[key]['experimentStatus']
2202        self.state_lock.release()
2203
2204        if rv: return rv
2205        else: 
2206            if state:
2207                raise service_error(service_error.partial, 
2208                        "Not ready: %s" % state)
2209            else:
2210                raise service_error(service_error.req, "No such experiment")
2211
2212    def get_vis(self, req, fid):
2213        """
2214        Return the stored visualization for this experiment
2215        """
2216        rv = None
2217        state = None
2218
2219        req = req.get('VisRequestBody', None)
2220        if not req:
2221            raise service_error(service_error.req,
2222                    "Bad request format (no VisRequestBody)")
2223        exp = req.get('experiment', None)
2224        if exp:
2225            if exp.has_key('fedid'):
2226                key = exp['fedid']
2227                keytype = "fedid"
2228            elif exp.has_key('localname'):
2229                key = exp['localname']
2230                keytype = "localname"
2231            else:
2232                raise service_error(service_error.req, "Unknown lookup type")
2233        else:
2234            raise service_error(service_error.req, "No request?")
2235
2236        self.check_experiment_access(fid, key)
2237
2238        self.state_lock.acquire()
2239        if self.state.has_key(key):
2240            if self.state[key].has_key('vis'):
2241                rv =  { 'experiment' : {keytype: key },\
2242                        'vis': self.state[key]['vis'],\
2243                        }
2244            else:
2245                state = self.state[key]['experimentStatus']
2246        self.state_lock.release()
2247
2248        if rv: return rv
2249        else:
2250            if state:
2251                raise service_error(service_error.partial, 
2252                        "Not ready: %s" % state)
2253            else:
2254                raise service_error(service_error.req, "No such experiment")
2255
2256    def clean_info_response(self, rv):
2257        """
2258        Remove the information in the experiment's state object that is not in
2259        the info response.
2260        """
2261        # Remove the owner info (should always be there, but...)
2262        if rv.has_key('owner'): del rv['owner']
2263
2264        # Convert the log into the allocationLog parameter and remove the
2265        # log entry (with defensive programming)
2266        if rv.has_key('log'):
2267            rv['allocationLog'] = "".join(rv['log'])
2268            del rv['log']
2269        else:
2270            rv['allocationLog'] = ""
2271
2272        if rv['experimentStatus'] != 'active':
2273            if rv.has_key('federant'): del rv['federant']
2274        else:
2275            # remove the allocationID and uri info from each federant
2276            for f in rv.get('federant', []):
2277                if f.has_key('allocID'): del f['allocID']
2278                if f.has_key('uri'): del f['uri']
2279
2280        return rv
2281
2282    def get_info(self, req, fid):
2283        """
2284        Return all the stored info about this experiment
2285        """
2286        rv = None
2287
2288        req = req.get('InfoRequestBody', None)
2289        if not req:
2290            raise service_error(service_error.req,
2291                    "Bad request format (no InfoRequestBody)")
2292        exp = req.get('experiment', None)
2293        if exp:
2294            if exp.has_key('fedid'):
2295                key = exp['fedid']
2296                keytype = "fedid"
2297            elif exp.has_key('localname'):
2298                key = exp['localname']
2299                keytype = "localname"
2300            else:
2301                raise service_error(service_error.req, "Unknown lookup type")
2302        else:
2303            raise service_error(service_error.req, "No request?")
2304
2305        self.check_experiment_access(fid, key)
2306
2307        # The state may be massaged by the service function that called
2308        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2309        # state.
2310        self.state_lock.acquire()
2311        if self.state.has_key(key):
2312            rv = copy.deepcopy(self.state[key])
2313        self.state_lock.release()
2314
2315        if rv:
2316            return self.clean_info_response(rv)
2317        else:
2318            raise service_error(service_error.req, "No such experiment")
2319
2320    def get_multi_info(self, req, fid):
2321        """
2322        Return all the stored info that this fedid can access
2323        """
2324        rv = { 'info': [ ] }
2325
2326        self.state_lock.acquire()
2327        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2328            try:
2329                self.check_experiment_access(fid, key)
2330            except service_error, e:
2331                if e.code == service_error.access:
2332                    continue
2333                else:
2334                    self.state_lock.release()
2335                    raise e
2336
2337            if self.state.has_key(key):
2338                e = copy.deepcopy(self.state[key])
2339                e = self.clean_info_response(e)
2340                rv['info'].append(e)
2341        self.state_lock.release()
2342        return rv
2343
2344    def remove_dirs(self, dir):
2345        """
2346        Remove the directory tree and all files rooted at dir.  Log any errors,
2347        but continue.
2348        """
2349        self.log.debug("[removedirs]: removing %s" % dir)
2350        try:
2351            for path, dirs, files in os.walk(dir, topdown=False):
2352                for f in files:
2353                    os.remove(os.path.join(path, f))
2354                for d in dirs:
2355                    os.rmdir(os.path.join(path, d))
2356            os.rmdir(dir)
2357        except EnvironmentError, e:
2358            self.log.error("Error deleting directory tree in %s" % e);
2359
2360    def terminate_experiment(self, req, fid):
2361        """
2362        Swap this experiment out on the federants and delete the shared
2363        information
2364        """
2365        tbparams = { }
2366        req = req.get('TerminateRequestBody', None)
2367        if not req:
2368            raise service_error(service_error.req,
2369                    "Bad request format (no TerminateRequestBody)")
2370        force = req.get('force', False)
2371        exp = req.get('experiment', None)
2372        if exp:
2373            if exp.has_key('fedid'):
2374                key = exp['fedid']
2375                keytype = "fedid"
2376            elif exp.has_key('localname'):
2377                key = exp['localname']
2378                keytype = "localname"
2379            else:
2380                raise service_error(service_error.req, "Unknown lookup type")
2381        else:
2382            raise service_error(service_error.req, "No request?")
2383
2384        self.check_experiment_access(fid, key)
2385
2386        dealloc_list = [ ]
2387
2388
2389        # Create a logger that logs to the dealloc_list as well as to the main
2390        # log file.
2391        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2392        h = logging.StreamHandler(self.list_log(dealloc_list))
2393        # XXX: there should be a global one of these rather than repeating the
2394        # code.
2395        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2396                    '%d %b %y %H:%M:%S'))
2397        dealloc_log.addHandler(h)
2398
2399        self.state_lock.acquire()
2400        fed_exp = self.state.get(key, None)
2401        repo = None
2402
2403        if fed_exp:
2404            # This branch of the conditional holds the lock to generate a
2405            # consistent temporary tbparams variable to deallocate experiments.
2406            # It releases the lock to do the deallocations and reacquires it to
2407            # remove the experiment state when the termination is complete.
2408
2409            # First make sure that the experiment creation is complete.
2410            status = fed_exp.get('experimentStatus', None)
2411
2412            if status:
2413                if status in ('starting', 'terminating'):
2414                    if not force:
2415                        self.state_lock.release()
2416                        raise service_error(service_error.partial, 
2417                                'Experiment still being created or destroyed')
2418                    else:
2419                        self.log.warning('Experiment in %s state ' % status + \
2420                                'being terminated by force.')
2421            else:
2422                # No status??? trouble
2423                self.state_lock.release()
2424                raise service_error(service_error.internal,
2425                        "Experiment has no status!?")
2426
2427            ids = []
2428            #  experimentID is a list of dicts that are self-describing
2429            #  identifiers.  This finds all the fedids and localnames - the
2430            #  keys of self.state - and puts them into ids.
2431            for id in fed_exp.get('experimentID', []):
2432                if id.has_key('fedid'): 
2433                    ids.append(id['fedid'])
2434                    repo = "%s" % id['fedid']
2435                if id.has_key('localname'): ids.append(id['localname'])
2436
2437            # Collect the allocation/segment ids into a dict keyed by the fedid
2438            # of the allocation (or a monotonically increasing integer) that
2439            # contains a tuple of uri, aid (which is a dict...)
2440            for i, fed in enumerate(fed_exp.get('federant', [])):
2441                try:
2442                    uri = fed['uri']
2443                    aid = fed['allocID']
2444                    k = fed['allocID'].get('fedid', i)
2445                except KeyError, e:
2446                    continue
2447                tbparams[k] = (uri, aid)
2448            fed_exp['experimentStatus'] = 'terminating'
2449            if self.state_filename: self.write_state()
2450            self.state_lock.release()
2451
2452            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2453            # then completes, so we can't wait if nothing starts.  So, no
2454            # tbparams, no start.
2455            if len(tbparams) > 0:
2456                thread_pool = self.thread_pool(self.nthreads)
2457                for k in tbparams.keys():
2458                    # Create and start a thread to stop the segment
2459                    thread_pool.wait_for_slot()
2460                    uri, aid = tbparams[k]
2461                    t  = self.pooled_thread(\
2462                            target=self.terminate_segment(log=dealloc_log,
2463                                testbed=uri,
2464                                cert_file=self.cert_file, 
2465                                cert_pwd=self.cert_pwd,
2466                                trusted_certs=self.trusted_certs,
2467                                caller=self.call_TerminateSegment),
2468                            args=(uri, aid), name=k,
2469                            pdata=thread_pool, trace_file=self.trace_file)
2470                    t.start()
2471                # Wait for completions
2472                thread_pool.wait_for_all_done()
2473
2474            # release the allocations (failed experiments have done this
2475            # already, and starting experiments may be in odd states, so we
2476            # ignore errors releasing those allocations
2477            try: 
2478                for k in tbparams.keys():
2479                    # This releases access by uri
2480                    uri, aid = tbparams[k]
2481                    self.release_access(None, aid, uri=uri)
2482            except service_error, e:
2483                if status != 'failed' and not force:
2484                    raise e
2485
2486            # Remove the terminated experiment
2487            self.state_lock.acquire()
2488            for id in ids:
2489                if self.state.has_key(id): del self.state[id]
2490
2491            if self.state_filename: self.write_state()
2492            self.state_lock.release()
2493
2494            # Delete any synch points associated with this experiment.  All
2495            # synch points begin with the fedid of the experiment.
2496            fedid_keys = set(["fedid:%s" % f for f in ids \
2497                    if isinstance(f, fedid)])
2498            for k in self.synch_store.all_keys():
2499                try:
2500                    if len(k) > 45 and k[0:46] in fedid_keys:
2501                        self.synch_store.del_value(k)
2502                except synch_store.BadDeletionError:
2503                    pass
2504            self.write_store()
2505
2506            # Remove software and other cached stuff from the filesystem.
2507            if repo:
2508                self.remove_dirs("%s/%s" % (self.repodir, repo))
2509               
2510            return { 
2511                    'experiment': exp , 
2512                    'deallocationLog': "".join(dealloc_list),
2513                    }
2514        else:
2515            # Don't forget to release the lock
2516            self.state_lock.release()
2517            raise service_error(service_error.req, "No saved state")
2518
2519
2520    def GetValue(self, req, fid):
2521        """
2522        Get a value from the synchronized store
2523        """
2524        req = req.get('GetValueRequestBody', None)
2525        if not req:
2526            raise service_error(service_error.req,
2527                    "Bad request format (no GetValueRequestBody)")
2528       
2529        name = req['name']
2530        wait = req['wait']
2531        rv = { 'name': name }
2532
2533        if self.auth.check_attribute(fid, name):
2534            self.log.debug("[GetValue] asking for %s " % name)
2535            try:
2536                v = self.synch_store.get_value(name, wait)
2537            except synch_store.RevokedKeyError:
2538                # No more synch on this key
2539                raise service_error(service_error.federant, 
2540                        "Synch key %s revoked" % name)
2541            if v is not None:
2542                rv['value'] = v
2543            self.log.debug("[GetValue] got %s from %s" % (v, name))
2544            return rv
2545        else:
2546            raise service_error(service_error.access, "Access Denied")
2547       
2548
2549    def SetValue(self, req, fid):
2550        """
2551        Set a value in the synchronized store
2552        """
2553        req = req.get('SetValueRequestBody', None)
2554        if not req:
2555            raise service_error(service_error.req,
2556                    "Bad request format (no SetValueRequestBody)")
2557       
2558        name = req['name']
2559        v = req['value']
2560
2561        if self.auth.check_attribute(fid, name):
2562            try:
2563                self.synch_store.set_value(name, v)
2564                self.write_store()
2565                self.log.debug("[SetValue] set %s to %s" % (name, v))
2566            except synch_store.CollisionError:
2567                # Translate into a service_error
2568                raise service_error(service_error.req,
2569                        "Value already set: %s" %name)
2570            except synch_store.RevokedKeyError:
2571                # No more synch on this key
2572                raise service_error(service_error.federant, 
2573                        "Synch key %s revoked" % name)
2574            return { 'name': name, 'value': v }
2575        else:
2576            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.