source: fedd/federation/experiment_control.py @ 6e63513

axis_examplecompt_changesinfo-ops
Last change on this file since 6e63513 was 6e63513, checked in by Ted Faber <faber@…>, 13 years ago

Checkpoint

  • Property mode set to 100644
File size: 87.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32from authorizer import abac_authorizer
33
34import topdl
35import list_log
36from ip_allocator import ip_allocator
37from ip_addr import ip_addr
38
39
40class nullHandler(logging.Handler):
41    def emit(self, record): pass
42
43fl = logging.getLogger("fedd.experiment_control")
44fl.addHandler(nullHandler())
45
46
47# Right now, no support for composition.
48class federated_service:
49    def __init__(self, name, exporter=None, importers=None, params=None, 
50            reqs=None, portal=None):
51        self.name=name
52        self.exporter=exporter
53        if importers is None: self.importers = []
54        else: self.importers=importers
55        if params is None: self.params = { }
56        else: self.params = params
57        if reqs is None: self.reqs = []
58        else: self.reqs = reqs
59
60        if portal is not None:
61            self.portal = portal
62        else:
63            self.portal = (name in federated_service.needs_portal)
64
65    def __str__(self):
66        return "name %s export %s import %s params %s reqs %s" % \
67                (self.name, self.exporter, self.importers, self.params,
68                        [ (r['name'], r['visibility']) for r in self.reqs] )
69
70    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
71
72class experiment_control_local:
73    """
74    Control of experiments that this system can directly access.
75
76    Includes experiment creation, termination and information dissemination.
77    Thred safe.
78    """
79
80    class ssh_cmd_timeout(RuntimeError): pass
81   
82    class thread_pool:
83        """
84        A class to keep track of a set of threads all invoked for the same
85        task.  Manages the mutual exclusion of the states.
86        """
87        def __init__(self, nthreads):
88            """
89            Start a pool.
90            """
91            self.changed = Condition()
92            self.started = 0
93            self.terminated = 0
94            self.nthreads = nthreads
95
96        def acquire(self):
97            """
98            Get the pool's lock.
99            """
100            self.changed.acquire()
101
102        def release(self):
103            """
104            Release the pool's lock.
105            """
106            self.changed.release()
107
108        def wait(self, timeout = None):
109            """
110            Wait for a pool thread to start or stop.
111            """
112            self.changed.wait(timeout)
113
114        def start(self):
115            """
116            Called by a pool thread to report starting.
117            """
118            self.changed.acquire()
119            self.started += 1
120            self.changed.notifyAll()
121            self.changed.release()
122
123        def terminate(self):
124            """
125            Called by a pool thread to report finishing.
126            """
127            self.changed.acquire()
128            self.terminated += 1
129            self.changed.notifyAll()
130            self.changed.release()
131
132        def clear(self):
133            """
134            Clear all pool data.
135            """
136            self.changed.acquire()
137            self.started = 0
138            self.terminated =0
139            self.changed.notifyAll()
140            self.changed.release()
141
142        def wait_for_slot(self):
143            """
144            Wait until we have a free slot to start another pooled thread
145            """
146            self.acquire()
147            while self.started - self.terminated >= self.nthreads:
148                self.wait()
149            self.release()
150
151        def wait_for_all_done(self, timeout=None):
152            """
153            Wait until all active threads finish (and at least one has
154            started).  If a timeout is given, return after waiting that long
155            for termination.  If all threads are done (and one has started in
156            the since the last clear()) return True, otherwise False.
157            """
158            if timeout:
159                deadline = time.time() + timeout
160            self.acquire()
161            while self.started == 0 or self.started > self.terminated:
162                self.wait(timeout)
163                if timeout:
164                    if time.time() > deadline:
165                        break
166                    timeout = deadline - time.time()
167            self.release()
168            return not (self.started == 0 or self.started > self.terminated)
169
170    class pooled_thread(Thread):
171        """
172        One of a set of threads dedicated to a specific task.  Uses the
173        thread_pool class above for coordination.
174        """
175        def __init__(self, group=None, target=None, name=None, args=(), 
176                kwargs={}, pdata=None, trace_file=None):
177            Thread.__init__(self, group, target, name, args, kwargs)
178            self.rv = None          # Return value of the ops in this thread
179            self.exception = None   # Exception that terminated this thread
180            self.target=target      # Target function to run on start()
181            self.args = args        # Args to pass to target
182            self.kwargs = kwargs    # Additional kw args
183            self.pdata = pdata      # thread_pool for this class
184            # Logger for this thread
185            self.log = logging.getLogger("fedd.experiment_control")
186       
187        def run(self):
188            """
189            Emulate Thread.run, except add pool data manipulation and error
190            logging.
191            """
192            if self.pdata:
193                self.pdata.start()
194
195            if self.target:
196                try:
197                    self.rv = self.target(*self.args, **self.kwargs)
198                except service_error, s:
199                    self.exception = s
200                    self.log.error("Thread exception: %s %s" % \
201                            (s.code_string(), s.desc))
202                except:
203                    self.exception = sys.exc_info()[1]
204                    self.log.error(("Unexpected thread exception: %s" +\
205                            "Trace %s") % (self.exception,\
206                                traceback.format_exc()))
207            if self.pdata:
208                self.pdata.terminate()
209
210    call_RequestAccess = service_caller('RequestAccess')
211    call_ReleaseAccess = service_caller('ReleaseAccess')
212    call_StartSegment = service_caller('StartSegment')
213    call_TerminateSegment = service_caller('TerminateSegment')
214    call_Ns2Topdl = service_caller('Ns2Topdl')
215
216    def __init__(self, config=None, auth=None):
217        """
218        Intialize the various attributes, most from the config object
219        """
220
221        def parse_tarfile_list(tf):
222            """
223            Parse a tarfile list from the configuration.  This is a set of
224            paths and tarfiles separated by spaces.
225            """
226            rv = [ ]
227            if tf is not None:
228                tl = tf.split()
229                while len(tl) > 1:
230                    p, t = tl[0:2]
231                    del tl[0:2]
232                    rv.append((p, t))
233            return rv
234
235        self.thread_with_rv = experiment_control_local.pooled_thread
236        self.thread_pool = experiment_control_local.thread_pool
237        self.list_log = list_log.list_log
238
239        self.cert_file = config.get("experiment_control", "cert_file")
240        if self.cert_file:
241            self.cert_pwd = config.get("experiment_control", "cert_pwd")
242        else:
243            self.cert_file = config.get("globals", "cert_file")
244            self.cert_pwd = config.get("globals", "cert_pwd")
245
246        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
247                or config.get("globals", "trusted_certs")
248
249        self.repodir = config.get("experiment_control", "repodir")
250        self.repo_url = config.get("experiment_control", "repo_url", 
251                "https://users.isi.deterlab.net:23235");
252
253        self.exp_stem = "fed-stem"
254        self.log = logging.getLogger("fedd.experiment_control")
255        set_log_level(config, "experiment_control", self.log)
256        self.muxmax = 2
257        self.nthreads = 10
258        self.randomize_experiments = False
259
260        self.splitter = None
261        self.ssh_keygen = "/usr/bin/ssh-keygen"
262        self.ssh_identity_file = None
263
264
265        self.debug = config.getboolean("experiment_control", "create_debug")
266        self.cleanup = not config.getboolean("experiment_control", 
267                "leave_tmpfiles")
268        self.state_filename = config.get("experiment_control", 
269                "experiment_state")
270        self.store_filename = config.get("experiment_control", 
271                "synch_store")
272        self.store_url = config.get("experiment_control", "store_url")
273        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
274        self.fedkit = parse_tarfile_list(\
275                config.get("experiment_control", "fedkit"))
276        self.gatewaykit = parse_tarfile_list(\
277                config.get("experiment_control", "gatewaykit"))
278        accessdb_file = config.get("experiment_control", "accessdb")
279
280        self.ssh_pubkey_file = config.get("experiment_control", 
281                "ssh_pubkey_file")
282        self.ssh_privkey_file = config.get("experiment_control",
283                "ssh_privkey_file")
284        dt = config.get("experiment_control", "direct_transit")
285        self.auth_type = config.get('experiment_control', 'auth_type') \
286                or 'legacy'
287        self.auth_dir = config.get('experiment_control', 'auth_dir')
288        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
289        else: self.direct_transit = [ ]
290        # NB for internal master/slave ops, not experiment setup
291        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
292
293        self.overrides = set([])
294        ovr = config.get('experiment_control', 'overrides')
295        if ovr:
296            for o in ovr.split(","):
297                o = o.strip()
298                if o.startswith('fedid:'): o = o[len('fedid:'):]
299                self.overrides.add(fedid(hexstr=o))
300
301        self.state = { }
302        self.state_lock = Lock()
303        self.tclsh = "/usr/local/bin/otclsh"
304        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
305                config.get("experiment_control", "tcl_splitter",
306                        "/usr/testbed/lib/ns2ir/parse.tcl")
307        mapdb_file = config.get("experiment_control", "mapdb")
308        self.trace_file = sys.stderr
309
310        self.def_expstart = \
311                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
312                "/tmp/federate";
313        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
314                "FEDDIR/hosts";
315        self.def_gwstart = \
316                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
317                "/tmp/bridge.log";
318        self.def_mgwstart = \
319                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
320                "/tmp/bridge.log";
321        self.def_gwimage = "FBSD61-TUNNEL2";
322        self.def_gwtype = "pc";
323        self.local_access = { }
324
325        if self.auth_type == 'legacy':
326            if auth:
327                self.auth = auth
328            else:
329                self.log.error( "[access]: No authorizer initialized, " +\
330                        "creating local one.")
331                auth = authorizer()
332        elif self.auth_type == 'abac':
333            self.auth = abac_authorizer(load=self.auth_dir)
334        else:
335            raise service_error(service_error.internal, 
336                    "Unknown auth_type: %s" % self.auth_type)
337
338
339        if self.ssh_pubkey_file:
340            try:
341                f = open(self.ssh_pubkey_file, 'r')
342                self.ssh_pubkey = f.read()
343                f.close()
344            except EnvironmentError:
345                raise service_error(service_error.internal,
346                        "Cannot read sshpubkey")
347        else:
348            raise service_error(service_error.internal, 
349                    "No SSH public key file?")
350
351        if not self.ssh_privkey_file:
352            raise service_error(service_error.internal, 
353                    "No SSH public key file?")
354
355
356        if mapdb_file:
357            self.read_mapdb(mapdb_file)
358        else:
359            self.log.warn("[experiment_control] No testbed map, using defaults")
360            self.tbmap = { 
361                    'deter':'https://users.isi.deterlab.net:23235',
362                    'emulab':'https://users.isi.deterlab.net:23236',
363                    'ucb':'https://users.isi.deterlab.net:23237',
364                    }
365
366        if accessdb_file:
367                self.read_accessdb(accessdb_file)
368        else:
369            raise service_error(service_error.internal,
370                    "No accessdb specified in config")
371
372        # Grab saved state.  OK to do this w/o locking because it's read only
373        # and only one thread should be in existence that can see self.state at
374        # this point.
375        if self.state_filename:
376            self.read_state()
377
378        if self.store_filename:
379            self.read_store()
380        else:
381            self.log.warning("No saved synch store")
382            self.synch_store = synch_store
383
384        # Dispatch tables
385        self.soap_services = {\
386                'New': soap_handler('New', self.new_experiment),
387                'Create': soap_handler('Create', self.create_experiment),
388                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
389                'Vis': soap_handler('Vis', self.get_vis),
390                'Info': soap_handler('Info', self.get_info),
391                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
392                'Terminate': soap_handler('Terminate', 
393                    self.terminate_experiment),
394                'GetValue': soap_handler('GetValue', self.GetValue),
395                'SetValue': soap_handler('SetValue', self.SetValue),
396        }
397
398        self.xmlrpc_services = {\
399                'New': xmlrpc_handler('New', self.new_experiment),
400                'Create': xmlrpc_handler('Create', self.create_experiment),
401                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
402                'Vis': xmlrpc_handler('Vis', self.get_vis),
403                'Info': xmlrpc_handler('Info', self.get_info),
404                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
405                'Terminate': xmlrpc_handler('Terminate',
406                    self.terminate_experiment),
407                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
408                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
409        }
410
411    # Call while holding self.state_lock
412    def write_state(self):
413        """
414        Write a new copy of experiment state after copying the existing state
415        to a backup.
416
417        State format is a simple pickling of the state dictionary.
418        """
419        if os.access(self.state_filename, os.W_OK):
420            copy_file(self.state_filename, \
421                    "%s.bak" % self.state_filename)
422        try:
423            f = open(self.state_filename, 'w')
424            pickle.dump(self.state, f)
425        except EnvironmentError, e:
426            self.log.error("Can't write file %s: %s" % \
427                    (self.state_filename, e))
428        except pickle.PicklingError, e:
429            self.log.error("Pickling problem: %s" % e)
430        except TypeError, e:
431            self.log.error("Pickling problem (TypeError): %s" % e)
432
433    @staticmethod
434    def get_alloc_ids(state):
435        """
436        Pull the fedids of the identifiers of each allocation from the
437        state.  Again, a dict dive that's best isolated.
438
439        Used by read_store and read state
440        """
441
442        return [ f['allocID']['fedid'] 
443                for f in state.get('federant',[]) \
444                    if f.has_key('allocID') and \
445                        f['allocID'].has_key('fedid')]
446
447    # Call while holding self.state_lock
448    def read_state(self):
449        """
450        Read a new copy of experiment state.  Old state is overwritten.
451
452        State format is a simple pickling of the state dictionary.
453        """
454       
455        def get_experiment_id(state):
456            """
457            Pull the fedid experimentID out of the saved state.  This is kind
458            of a gross walk through the dict.
459            """
460
461            if state.has_key('experimentID'):
462                for e in state['experimentID']:
463                    if e.has_key('fedid'):
464                        return e['fedid']
465                else:
466                    return None
467            else:
468                return None
469
470        try:
471            f = open(self.state_filename, "r")
472            self.state = pickle.load(f)
473            self.log.debug("[read_state]: Read state from %s" % \
474                    self.state_filename)
475        except EnvironmentError, e:
476            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
477                    % (self.state_filename, e))
478        except pickle.UnpicklingError, e:
479            self.log.warning(("[read_state]: No saved state: " + \
480                    "Unpickling failed: %s") % e)
481       
482        for s in self.state.values():
483            try:
484
485                eid = get_experiment_id(s)
486                if eid : 
487                    if self.auth_type == 'legacy':
488                        # XXX: legacy
489                        # Give the owner rights to the experiment
490                        self.auth.set_attribute(s['owner'], eid)
491                        # And holders of the eid as well
492                        self.auth.set_attribute(eid, eid)
493                        # allow overrides to control experiments as well
494                        for o in self.overrides:
495                            self.auth.set_attribute(o, eid)
496                        # Set permissions to allow reading of the software
497                        # repo, if any, as well.
498                        for a in self.get_alloc_ids(s):
499                            self.auth.set_attribute(a, 'repo/%s' % eid)
500                else:
501                    raise KeyError("No experiment id")
502            except KeyError, e:
503                self.log.warning("[read_state]: State ownership or identity " +\
504                        "misformatted in %s: %s" % (self.state_filename, e))
505
506
507    def read_accessdb(self, accessdb_file):
508        """
509        Read the mapping from fedids that can create experiments to their name
510        in the 3-level access namespace.  All will be asserted from this
511        testbed and can include the local username and porject that will be
512        asserted on their behalf by this fedd.  Each fedid is also added to the
513        authorization system with the "create" attribute.
514        """
515        self.accessdb = {}
516        # These are the regexps for parsing the db
517        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
518        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
519                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
520        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
521                "\s*->\s*(" + name_expr + ")\s*$")
522        lineno = 0
523
524        # Parse the mappings and store in self.authdb, a dict of
525        # fedid -> (proj, user)
526        try:
527            f = open(accessdb_file, "r")
528            for line in f:
529                lineno += 1
530                line = line.strip()
531                if len(line) == 0 or line.startswith('#'):
532                    continue
533                m = project_line.match(line)
534                if m:
535                    fid = fedid(hexstr=m.group(1))
536                    project, user = m.group(2,3)
537                    if not self.accessdb.has_key(fid):
538                        self.accessdb[fid] = []
539                    self.accessdb[fid].append((project, user))
540                    continue
541
542                m = user_line.match(line)
543                if m:
544                    fid = fedid(hexstr=m.group(1))
545                    project = None
546                    user = m.group(2)
547                    if not self.accessdb.has_key(fid):
548                        self.accessdb[fid] = []
549                    self.accessdb[fid].append((project, user))
550                    continue
551                self.log.warn("[experiment_control] Error parsing access " +\
552                        "db %s at line %d" %  (accessdb_file, lineno))
553        except EnvironmentError:
554            raise service_error(service_error.internal,
555                    ("Error opening/reading %s as experiment " +\
556                            "control accessdb") %  accessdb_file)
557        f.close()
558
559        # Initialize the authorization attributes
560        # XXX: legacy
561        if self.auth_type == 'legacy':
562            for fid in self.accessdb.keys():
563                self.auth.set_attribute(fid, 'create')
564                self.auth.set_attribute(fid, 'new')
565
566    def read_mapdb(self, file):
567        """
568        Read a simple colon separated list of mappings for the
569        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
570        """
571
572        self.tbmap = { }
573        lineno =0
574        try:
575            f = open(file, "r")
576            for line in f:
577                lineno += 1
578                line = line.strip()
579                if line.startswith('#') or len(line) == 0:
580                    continue
581                try:
582                    label, url = line.split(':', 1)
583                    self.tbmap[label] = url
584                except ValueError, e:
585                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
586                            "map db: %s %s" % (lineno, line, e))
587        except EnvironmentError, e:
588            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
589                    "open %s: %s" % (file, e))
590        f.close()
591
592    def read_store(self):
593        try:
594            self.synch_store = synch_store()
595            self.synch_store.load(self.store_filename)
596            self.log.debug("[read_store]: Read store from %s" % \
597                    self.store_filename)
598        except EnvironmentError, e:
599            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
600                    % (self.state_filename, e))
601            self.synch_store = synch_store()
602
603        # Set the initial permissions on data in the store.  XXX: This ad hoc
604        # authorization attribute initialization is getting out of hand.
605        # XXX: legacy
606        if self.auth_type == 'legacy':
607            for k in self.synch_store.all_keys():
608                try:
609                    if k.startswith('fedid:'):
610                        fid = fedid(hexstr=k[6:46])
611                        if self.state.has_key(fid):
612                            for a in self.get_alloc_ids(self.state[fid]):
613                                self.auth.set_attribute(a, k)
614                except ValueError, e:
615                    self.log.warn("Cannot deduce permissions for %s" % k)
616
617
618    def write_store(self):
619        """
620        Write a new copy of synch_store after writing current state
621        to a backup.  We use the internal synch_store pickle method to avoid
622        incinsistent data.
623
624        State format is a simple pickling of the store.
625        """
626        if os.access(self.store_filename, os.W_OK):
627            copy_file(self.store_filename, \
628                    "%s.bak" % self.store_filename)
629        try:
630            self.synch_store.save(self.store_filename)
631        except EnvironmentError, e:
632            self.log.error("Can't write file %s: %s" % \
633                    (self.store_filename, e))
634        except TypeError, e:
635            self.log.error("Pickling problem (TypeError): %s" % e)
636
637       
638    def generate_ssh_keys(self, dest, type="rsa" ):
639        """
640        Generate a set of keys for the gateways to use to talk.
641
642        Keys are of type type and are stored in the required dest file.
643        """
644        valid_types = ("rsa", "dsa")
645        t = type.lower();
646        if t not in valid_types: raise ValueError
647        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
648
649        try:
650            trace = open("/dev/null", "w")
651        except EnvironmentError:
652            raise service_error(service_error.internal,
653                    "Cannot open /dev/null??");
654
655        # May raise CalledProcessError
656        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
657        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
658        if rv != 0:
659            raise service_error(service_error.internal, 
660                    "Cannot generate nonce ssh keys.  %s return code %d" \
661                            % (self.ssh_keygen, rv))
662
663    def gentopo(self, str):
664        """
665        Generate the topology dtat structure from the splitter's XML
666        representation of it.
667
668        The topology XML looks like:
669            <experiment>
670                <nodes>
671                    <node><vname></vname><ips>ip1:ip2</ips></node>
672                </nodes>
673                <lans>
674                    <lan>
675                        <vname></vname><vnode></vnode><ip></ip>
676                        <bandwidth></bandwidth><member>node:port</member>
677                    </lan>
678                </lans>
679        """
680        class topo_parse:
681            """
682            Parse the topology XML and create the dats structure.
683            """
684            def __init__(self):
685                # Typing of the subelements for data conversion
686                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
687                self.int_subelements = ( 'bandwidth',)
688                self.float_subelements = ( 'delay',)
689                # The final data structure
690                self.nodes = [ ]
691                self.lans =  [ ]
692                self.topo = { \
693                        'node': self.nodes,\
694                        'lan' : self.lans,\
695                    }
696                self.element = { }  # Current element being created
697                self.chars = ""     # Last text seen
698
699            def end_element(self, name):
700                # After each sub element the contents is added to the current
701                # element or to the appropriate list.
702                if name == 'node':
703                    self.nodes.append(self.element)
704                    self.element = { }
705                elif name == 'lan':
706                    self.lans.append(self.element)
707                    self.element = { }
708                elif name in self.str_subelements:
709                    self.element[name] = self.chars
710                    self.chars = ""
711                elif name in self.int_subelements:
712                    self.element[name] = int(self.chars)
713                    self.chars = ""
714                elif name in self.float_subelements:
715                    self.element[name] = float(self.chars)
716                    self.chars = ""
717
718            def found_chars(self, data):
719                self.chars += data.rstrip()
720
721
722        tp = topo_parse();
723        parser = xml.parsers.expat.ParserCreate()
724        parser.EndElementHandler = tp.end_element
725        parser.CharacterDataHandler = tp.found_chars
726
727        parser.Parse(str)
728
729        return tp.topo
730       
731
732    def genviz(self, topo):
733        """
734        Generate the visualization the virtual topology
735        """
736
737        neato = "/usr/local/bin/neato"
738        # These are used to parse neato output and to create the visualization
739        # file.
740        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
741        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
742                "%s</type></node>"
743
744        try:
745            # Node names
746            nodes = [ n['vname'] for n in topo['node'] ]
747            topo_lans = topo['lan']
748        except KeyError, e:
749            raise service_error(service_error.internal, "Bad topology: %s" %e)
750
751        lans = { }
752        links = { }
753
754        # Walk through the virtual topology, organizing the connections into
755        # 2-node connections (links) and more-than-2-node connections (lans).
756        # When a lan is created, it's added to the list of nodes (there's a
757        # node in the visualization for the lan).
758        for l in topo_lans:
759            if links.has_key(l['vname']):
760                if len(links[l['vname']]) < 2:
761                    links[l['vname']].append(l['vnode'])
762                else:
763                    nodes.append(l['vname'])
764                    lans[l['vname']] = links[l['vname']]
765                    del links[l['vname']]
766                    lans[l['vname']].append(l['vnode'])
767            elif lans.has_key(l['vname']):
768                lans[l['vname']].append(l['vnode'])
769            else:
770                links[l['vname']] = [ l['vnode'] ]
771
772
773        # Open up a temporary file for dot to turn into a visualization
774        try:
775            df, dotname = tempfile.mkstemp()
776            dotfile = os.fdopen(df, 'w')
777        except EnvironmentError:
778            raise service_error(service_error.internal,
779                    "Failed to open file in genviz")
780
781        try:
782            dnull = open('/dev/null', 'w')
783        except EnvironmentError:
784            service_error(service_error.internal,
785                    "Failed to open /dev/null in genviz")
786
787        # Generate a dot/neato input file from the links, nodes and lans
788        try:
789            print >>dotfile, "graph G {"
790            for n in nodes:
791                print >>dotfile, '\t"%s"' % n
792            for l in links.keys():
793                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
794            for l in lans.keys():
795                for n in lans[l]:
796                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
797            print >>dotfile, "}"
798            dotfile.close()
799        except TypeError:
800            raise service_error(service_error.internal,
801                    "Single endpoint link in vtopo")
802        except EnvironmentError:
803            raise service_error(service_error.internal, "Cannot write dot file")
804
805        # Use dot to create a visualization
806        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
807                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
808                close_fds=True)
809        dnull.close()
810
811        # Translate dot to vis format
812        vis_nodes = [ ]
813        vis = { 'node': vis_nodes }
814        for line in dot.stdout:
815            m = vis_re.match(line)
816            if m:
817                vn = m.group(1)
818                vis_node = {'name': vn, \
819                        'x': float(m.group(2)),\
820                        'y' : float(m.group(3)),\
821                    }
822                if vn in links.keys() or vn in lans.keys():
823                    vis_node['type'] = 'lan'
824                else:
825                    vis_node['type'] = 'node'
826                vis_nodes.append(vis_node)
827        rv = dot.wait()
828
829        os.remove(dotname)
830        if rv == 0 : return vis
831        else: return None
832
833    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
834        """
835        Get access to testbed through fedd and set the parameters for that tb
836        """
837        def get_export_project(svcs):
838            """
839            Look through for the list of federated_service for this testbed
840            objects for a project_export service, and extract the project
841            parameter.
842            """
843
844            pe = [s for s in svcs if s.name=='project_export']
845            if len(pe) == 1:
846                return pe[0].params.get('project', None)
847            elif len(pe) == 0:
848                return None
849            else:
850                raise service_error(service_error.req,
851                        "More than one project export is not supported")
852
853        uri = tbmap.get(testbed_base(tb), None)
854        if not uri:
855            raise service_error(service_error.server_config, 
856                    "Unknown testbed: %s" % tb)
857
858        export_svcs = masters.get(tb,[])
859        import_svcs = [ s for m in masters.values() \
860                for s in m \
861                    if tb in s.importers ]
862
863        export_project = get_export_project(export_svcs)
864
865        # Tweak search order so that if there are entries in access_user that
866        # have a project matching the export project, we try them first
867        if export_project: 
868            access_sequence = [ (p, u) for p, u in access_user \
869                    if p == export_project] 
870            access_sequence.extend([(p, u) for p, u in access_user \
871                    if p != export_project]) 
872        else: 
873            access_sequence = access_user
874
875        for p, u in access_sequence: 
876            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
877                    "to %s") %  ((p or "None"), u, uri))
878
879            if p:
880                # Request with user and project specified
881                req = {\
882                        'credential': [ "project: %s" % p, "user: %s"  % u],
883                    }
884            else:
885                # Request with only user specified
886                req = {\
887                        'credential': [ 'user: %s' % u ],
888                    }
889
890            # Make the service request from the services we're importing and
891            # exporting.  Keep track of the export request ids so we can
892            # collect the resulting info from the access response.
893            e_keys = { }
894            if import_svcs or export_svcs:
895                req['service'] = [ ]
896
897                for i, s in enumerate(import_svcs):
898                    idx = 'import%d' % i
899                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
900                    if s.params:
901                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
902                                for k, v in s.params.items()]
903                    req['service'].append(sr)
904
905                for i, s in enumerate(export_svcs):
906                    idx = 'export%d' % i
907                    e_keys[idx] = s
908                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
909                    if s.params:
910                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
911                                for k, v in s.params.items()]
912                    req['service'].append(sr)
913
914            # node resources if any
915            if nodes != None and len(nodes) > 0:
916                rnodes = [ ]
917                for n in nodes:
918                    rn = { }
919                    image, hw, count = n.split(":")
920                    if image: rn['image'] = [ image ]
921                    if hw: rn['hardware'] = [ hw ]
922                    if count and int(count) >0 : rn['count'] = int(count)
923                    rnodes.append(rn)
924                req['resources']= { }
925                req['resources']['node'] = rnodes
926
927            try:
928                if self.local_access.has_key(uri):
929                    # Local access call
930                    req = { 'RequestAccessRequestBody' : req }
931                    r = self.local_access[uri].RequestAccess(req, 
932                            fedid(file=self.cert_file))
933                    r = { 'RequestAccessResponseBody' : r }
934                else:
935                    r = self.call_RequestAccess(uri, req, 
936                            self.cert_file, self.cert_pwd, self.trusted_certs)
937            except service_error, e:
938                if e.code == service_error.access:
939                    self.log.debug("[get_access] Access denied")
940                    r = None
941                    continue
942                else:
943                    raise e
944
945            if r.has_key('RequestAccessResponseBody'):
946                # Through to here we have a valid response, not a fault.
947                # Access denied is a fault, so something better or worse than
948                # access denied has happened.
949                r = r['RequestAccessResponseBody']
950                self.log.debug("[get_access] Access granted")
951                break
952            else:
953                raise service_error(service_error.protocol,
954                        "Bad proxy response")
955       
956        if not r:
957            raise service_error(service_error.access, 
958                    "Access denied by %s (%s)" % (tb, uri))
959
960        tbparam[tb] = { 
961                "allocID" : r['allocID'],
962                "uri": uri,
963                }
964
965        # Collect the responses corresponding to the services this testbed
966        # exports.  These will be the service requests that we will include in
967        # the start segment requests (with appropriate visibility values) to
968        # import and export the segments.
969        for s in r.get('service', []):
970            id = s.get('id', None)
971            if id and id in e_keys:
972                e_keys[id].reqs.append(s)
973
974        # Add attributes to parameter space.  We don't allow attributes to
975        # overlay any parameters already installed.
976        for a in r.get('fedAttr', []):
977            try:
978                if a['attribute'] and \
979                        isinstance(a['attribute'], basestring)\
980                        and not tbparam[tb].has_key(a['attribute'].lower()):
981                    tbparam[tb][a['attribute'].lower()] = a['value']
982            except KeyError:
983                self.log.error("Bad attribute in response: %s" % a)
984
985    def release_access(self, tb, aid, tbmap=None, uri=None):
986        """
987        Release access to testbed through fedd
988        """
989
990        if not uri and tbmap:
991            uri = tbmap.get(tb, None)
992        if not uri:
993            raise service_error(service_error.server_config, 
994                    "Unknown testbed: %s" % tb)
995
996        if self.local_access.has_key(uri):
997            resp = self.local_access[uri].ReleaseAccess(\
998                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
999                    fedid(file=self.cert_file))
1000            resp = { 'ReleaseAccessResponseBody': resp } 
1001        else:
1002            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1003                    self.cert_file, self.cert_pwd, self.trusted_certs)
1004
1005        # better error coding
1006
1007    def remote_ns2topdl(self, uri, desc):
1008
1009        req = {
1010                'description' : { 'ns2description': desc },
1011            }
1012
1013        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1014                self.trusted_certs)
1015
1016        if r.has_key('Ns2TopdlResponseBody'):
1017            r = r['Ns2TopdlResponseBody']
1018            ed = r.get('experimentdescription', None)
1019            if ed.has_key('topdldescription'):
1020                return topdl.Topology(**ed['topdldescription'])
1021            else:
1022                raise service_error(service_error.protocol, 
1023                        "Bad splitter response (no output)")
1024        else:
1025            raise service_error(service_error.protocol, "Bad splitter response")
1026
1027    class start_segment:
1028        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1029                cert_pwd=None, trusted_certs=None, caller=None,
1030                log_collector=None):
1031            self.log = log
1032            self.debug = debug
1033            self.cert_file = cert_file
1034            self.cert_pwd = cert_pwd
1035            self.trusted_certs = None
1036            self.caller = caller
1037            self.testbed = testbed
1038            self.log_collector = log_collector
1039            self.response = None
1040            self.node = { }
1041
1042        def make_map(self, resp):
1043            for e in resp.get('embedding', []):
1044                if 'toponame' in e and 'physname' in e:
1045                    self.node[e['toponame']] = e['physname'][0]
1046
1047        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1048            req = {
1049                    'allocID': { 'fedid' : aid }, 
1050                    'segmentdescription': { 
1051                        'topdldescription': topo.to_dict(),
1052                    },
1053                }
1054
1055            if connInfo:
1056                req['connection'] = connInfo
1057
1058            import_svcs = [ s for m in masters.values() \
1059                    for s in m if self.testbed in s.importers]
1060
1061            if import_svcs or self.testbed in masters:
1062                req['service'] = []
1063
1064            for s in import_svcs:
1065                for r in s.reqs:
1066                    sr = copy.deepcopy(r)
1067                    sr['visibility'] = 'import';
1068                    req['service'].append(sr)
1069
1070            for s in masters.get(self.testbed, []):
1071                for r in s.reqs:
1072                    sr = copy.deepcopy(r)
1073                    sr['visibility'] = 'export';
1074                    req['service'].append(sr)
1075
1076            if attrs:
1077                req['fedAttr'] = attrs
1078
1079            try:
1080                self.log.debug("Calling StartSegment at %s " % uri)
1081                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1082                        self.trusted_certs)
1083                if r.has_key('StartSegmentResponseBody'):
1084                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1085                            None)
1086                    if lval and self.log_collector:
1087                        for line in  lval.splitlines(True):
1088                            self.log_collector.write(line)
1089                    self.make_map(r['StartSegmentResponseBody'])
1090                    self.response = r
1091                else:
1092                    raise service_error(service_error.internal, 
1093                            "Bad response!?: %s" %r)
1094                return True
1095            except service_error, e:
1096                self.log.error("Start segment failed on %s: %s" % \
1097                        (self.testbed, e))
1098                return False
1099
1100
1101
1102    class terminate_segment:
1103        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1104                cert_pwd=None, trusted_certs=None, caller=None):
1105            self.log = log
1106            self.debug = debug
1107            self.cert_file = cert_file
1108            self.cert_pwd = cert_pwd
1109            self.trusted_certs = None
1110            self.caller = caller
1111            self.testbed = testbed
1112
1113        def __call__(self, uri, aid ):
1114            req = {
1115                    'allocID': aid , 
1116                }
1117            try:
1118                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1119                        self.trusted_certs)
1120                return True
1121            except service_error, e:
1122                self.log.error("Terminate segment failed on %s: %s" % \
1123                        (self.testbed, e))
1124                return False
1125   
1126
1127    def allocate_resources(self, allocated, masters, eid, expid, 
1128            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1129            attrs=None, connInfo={}, tbmap=None):
1130
1131        started = { }           # Testbeds where a sub-experiment started
1132                                # successfully
1133
1134        # XXX
1135        fail_soft = False
1136
1137        if tbmap is None: tbmap = { }
1138
1139        log = alloc_log or self.log
1140
1141        thread_pool = self.thread_pool(self.nthreads)
1142        threads = [ ]
1143        starters = [ ]
1144
1145        for tb in allocated.keys():
1146            # Create and start a thread to start the segment, and save it
1147            # to get the return value later
1148            tb_attrs = copy.copy(attrs)
1149            thread_pool.wait_for_slot()
1150            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1151            base, suffix = split_testbed(tb)
1152            if suffix:
1153                tb_attrs.append({'attribute': 'experiment_name', 
1154                    'value': "%s-%s" % (eid, suffix)})
1155            else:
1156                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1157            if not uri:
1158                raise service_error(service_error.internal, 
1159                        "Unknown testbed %s !?" % tb)
1160
1161            if tbparams[tb].has_key('allocID') and \
1162                    tbparams[tb]['allocID'].has_key('fedid'):
1163                aid = tbparams[tb]['allocID']['fedid']
1164            else:
1165                raise service_error(service_error.internal, 
1166                        "No alloc id for testbed %s !?" % tb)
1167
1168            s = self.start_segment(log=log, debug=self.debug,
1169                    testbed=tb, cert_file=self.cert_file,
1170                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1171                    caller=self.call_StartSegment,
1172                    log_collector=log_collector)
1173            starters.append(s)
1174            t  = self.pooled_thread(\
1175                    target=s, name=tb,
1176                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1177                    pdata=thread_pool, trace_file=self.trace_file)
1178            threads.append(t)
1179            t.start()
1180
1181        # Wait until all finish (keep pinging the log, though)
1182        mins = 0
1183        revoked = False
1184        while not thread_pool.wait_for_all_done(60.0):
1185            mins += 1
1186            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1187                    % mins)
1188            if not revoked and \
1189                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1190                # a testbed has failed.  Revoke this experiment's
1191                # synchronizarion values so that sub experiments will not
1192                # deadlock waiting for synchronization that will never happen
1193                self.log.info("A subexperiment has failed to swap in, " + \
1194                        "revoking synch keys")
1195                var_key = "fedid:%s" % expid
1196                for k in self.synch_store.all_keys():
1197                    if len(k) > 45 and k[0:46] == var_key:
1198                        self.synch_store.revoke_key(k)
1199                revoked = True
1200
1201        failed = [ t.getName() for t in threads if not t.rv ]
1202        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1203
1204        # If one failed clean up, unless fail_soft is set
1205        if failed:
1206            if not fail_soft:
1207                thread_pool.clear()
1208                for tb in succeeded:
1209                    # Create and start a thread to stop the segment
1210                    thread_pool.wait_for_slot()
1211                    uri = tbparams[tb]['uri']
1212                    t  = self.pooled_thread(\
1213                            target=self.terminate_segment(log=log,
1214                                testbed=tb,
1215                                cert_file=self.cert_file, 
1216                                cert_pwd=self.cert_pwd,
1217                                trusted_certs=self.trusted_certs,
1218                                caller=self.call_TerminateSegment),
1219                            args=(uri, tbparams[tb]['federant']['allocID']),
1220                            name=tb,
1221                            pdata=thread_pool, trace_file=self.trace_file)
1222                    t.start()
1223                # Wait until all finish (if any are being stopped)
1224                if succeeded:
1225                    thread_pool.wait_for_all_done()
1226
1227                # release the allocations
1228                for tb in tbparams.keys():
1229                    self.release_access(tb, tbparams[tb]['allocID'], 
1230                            tbmap=tbmap, uri=tbparams[tb].get('uri', None))
1231                # Remove the placeholder
1232                self.state_lock.acquire()
1233                self.state[eid]['experimentStatus'] = 'failed'
1234                if self.state_filename: self.write_state()
1235                self.state_lock.release()
1236                # Remove the repo dir
1237                self.remove_dirs("%s/%s" %(self.repodir, expid))
1238                # Walk up tmpdir, deleting as we go
1239                if self.cleanup:
1240                    self.remove_dirs(tmpdir)
1241                else:
1242                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1243
1244
1245                log.error("Swap in failed on %s" % ",".join(failed))
1246                return
1247        else:
1248            # Walk through the successes and gather the virtual to physical
1249            # mapping.
1250            embedding = [ ]
1251            for s in starters:
1252                for k, v in s.node.items():
1253                    embedding.append({
1254                        'toponame': k, 
1255                        'physname': [ v],
1256                        'testbed': s.testbed
1257                        })
1258            log.info("[start_segment]: Experiment %s active" % eid)
1259
1260
1261        # Walk up tmpdir, deleting as we go
1262        if self.cleanup:
1263            self.remove_dirs(tmpdir)
1264        else:
1265            log.debug("[start_experiment]: not removing %s" % tmpdir)
1266
1267        # Insert the experiment into our state and update the disk copy.
1268        self.state_lock.acquire()
1269        self.state[expid]['experimentStatus'] = 'active'
1270        self.state[eid] = self.state[expid]
1271        self.state[eid]['experimentdescription']['topdldescription'] = \
1272                top.to_dict()
1273        self.state[eid]['embedding'] = embedding
1274        if self.state_filename: self.write_state()
1275        self.state_lock.release()
1276        return
1277
1278
1279    def add_kit(self, e, kit):
1280        """
1281        Add a Software object created from the list of (install, location)
1282        tuples passed as kit  to the software attribute of an object e.  We
1283        do this enough to break out the code, but it's kind of a hack to
1284        avoid changing the old tuple rep.
1285        """
1286
1287        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1288
1289        if isinstance(e.software, list): e.software.extend(s)
1290        else: e.software = s
1291
1292
1293    def create_experiment_state(self, fid, req, expid, expcert,
1294            state='starting'):
1295        """
1296        Create the initial entry in the experiment's state.  The expid and
1297        expcert are the experiment's fedid and certifacte that represents that
1298        ID, which are installed in the experiment state.  If the request
1299        includes a suggested local name that is used if possible.  If the local
1300        name is already taken by an experiment owned by this user that has
1301        failed, it is overwritten.  Otherwise new letters are added until a
1302        valid localname is found.  The generated local name is returned.
1303        """
1304
1305        if req.has_key('experimentID') and \
1306                req['experimentID'].has_key('localname'):
1307            overwrite = False
1308            eid = req['experimentID']['localname']
1309            # If there's an old failed experiment here with the same local name
1310            # and accessible by this user, we'll overwrite it, otherwise we'll
1311            # fall through and do the collision avoidance.
1312            old_expid = self.get_experiment_fedid(eid)
1313            if old_expid and self.check_experiment_access(fid, old_expid):
1314                self.state_lock.acquire()
1315                status = self.state[eid].get('experimentStatus', None)
1316                if status and status == 'failed':
1317                    # remove the old access attribute
1318                    self.auth.unset_attribute(fid, old_expid)
1319                    self.auth.save()
1320                    overwrite = True
1321                    del self.state[eid]
1322                    del self.state[old_expid]
1323                self.state_lock.release()
1324            self.state_lock.acquire()
1325            while (self.state.has_key(eid) and not overwrite):
1326                eid += random.choice(string.ascii_letters)
1327            # Initial state
1328            self.state[eid] = {
1329                    'experimentID' : \
1330                            [ { 'localname' : eid }, {'fedid': expid } ],
1331                    'experimentStatus': state,
1332                    'experimentAccess': { 'X509' : expcert },
1333                    'owner': fid,
1334                    'log' : [],
1335                }
1336            self.state[expid] = self.state[eid]
1337            if self.state_filename: self.write_state()
1338            self.state_lock.release()
1339        else:
1340            eid = self.exp_stem
1341            for i in range(0,5):
1342                eid += random.choice(string.ascii_letters)
1343            self.state_lock.acquire()
1344            while (self.state.has_key(eid)):
1345                eid = self.exp_stem
1346                for i in range(0,5):
1347                    eid += random.choice(string.ascii_letters)
1348            # Initial state
1349            self.state[eid] = {
1350                    'experimentID' : \
1351                            [ { 'localname' : eid }, {'fedid': expid } ],
1352                    'experimentStatus': state,
1353                    'experimentAccess': { 'X509' : expcert },
1354                    'owner': fid,
1355                    'log' : [],
1356                }
1357            self.state[expid] = self.state[eid]
1358            if self.state_filename: self.write_state()
1359            self.state_lock.release()
1360
1361        return eid
1362
1363
1364    def allocate_ips_to_topo(self, top):
1365        """
1366        Add an ip4_address attribute to all the hosts in the topology, based on
1367        the shared substrates on which they sit.  An /etc/hosts file is also
1368        created and returned as a list of hostfiles entries.  We also return
1369        the allocator, because we may need to allocate IPs to portals
1370        (specifically DRAGON portals).
1371        """
1372        subs = sorted(top.substrates, 
1373                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1374                reverse=True)
1375        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1376        ifs = { }
1377        hosts = [ ]
1378
1379        for idx, s in enumerate(subs):
1380            net_size = len(s.interfaces)+2
1381
1382            a = ips.allocate(net_size)
1383            if a :
1384                base, num = a
1385                if num < net_size: 
1386                    raise service_error(service_error.internal,
1387                            "Allocator returned wrong number of IPs??")
1388            else:
1389                raise service_error(service_error.req, 
1390                        "Cannot allocate IP addresses")
1391            mask = ips.min_alloc
1392            while mask < net_size:
1393                mask *= 2
1394
1395            netmask = ((2**32-1) ^ (mask-1))
1396
1397            base += 1
1398            for i in s.interfaces:
1399                i.attribute.append(
1400                        topdl.Attribute('ip4_address', 
1401                            "%s" % ip_addr(base)))
1402                i.attribute.append(
1403                        topdl.Attribute('ip4_netmask', 
1404                            "%s" % ip_addr(int(netmask))))
1405
1406                hname = i.element.name
1407                if ifs.has_key(hname):
1408                    hosts.append("%s\t%s-%s %s-%d" % \
1409                            (ip_addr(base), hname, s.name, hname,
1410                                ifs[hname]))
1411                else:
1412                    ifs[hname] = 0
1413                    hosts.append("%s\t%s-%s %s-%d %s" % \
1414                            (ip_addr(base), hname, s.name, hname,
1415                                ifs[hname], hname))
1416
1417                ifs[hname] += 1
1418                base += 1
1419        return hosts, ips
1420
1421    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1422            tbparams, masters, tbmap):
1423        """
1424        Request access to the various testbeds required for this instantiation
1425        (passed in as testbeds).  User, access_user, expoert_project and master
1426        are used to construct the correct requests.  Per-testbed parameters are
1427        returned in tbparams.
1428        """
1429        for tb in testbeds:
1430            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1431            allocated[tb] = 1
1432
1433    def get_abac_access_to_testbeds(self, testbeds, fid, allocated, 
1434            tbparams, masters, tbmap):
1435        for tb in testbeds:
1436            self.get_abac_access(tb, tbparams, fid, masters, tbmap)
1437            allocated[tb] = 1
1438
1439    def get_abac_access(self, tb, tbparams,fid, masters, tbmap):
1440        """
1441        Get access to testbed through fedd and set the parameters for that tb
1442        """
1443        def get_export_project(svcs):
1444            """
1445            Look through for the list of federated_service for this testbed
1446            objects for a project_export service, and extract the project
1447            parameter.
1448            """
1449
1450            pe = [s for s in svcs if s.name=='project_export']
1451            if len(pe) == 1:
1452                return pe[0].params.get('project', None)
1453            elif len(pe) == 0:
1454                return None
1455            else:
1456                raise service_error(service_error.req,
1457                        "More than one project export is not supported")
1458
1459        uri = tbmap.get(testbed_base(tb), None)
1460        if not uri:
1461            raise service_error(service_error.server_config, 
1462                    "Unknown testbed: %s" % tb)
1463
1464        export_svcs = masters.get(tb,[])
1465        import_svcs = [ s for m in masters.values() \
1466                for s in m \
1467                    if tb in s.importers ]
1468
1469        export_project = get_export_project(export_svcs)
1470        # Compose the credential list so that IDs come before attributes
1471        creds = set()
1472        keys = set()
1473        for c in self.auth.get_creds_for_principal(fid):
1474            keys.add(c.issuer_cert())
1475            creds.add(c.attribute_cert())
1476        creds = list(keys) + list(creds)
1477
1478        # Request credentials
1479        req = {
1480                'abac_credential': creds,
1481            }
1482        # Make the service request from the services we're importing and
1483        # exporting.  Keep track of the export request ids so we can
1484        # collect the resulting info from the access response.
1485        e_keys = { }
1486        if import_svcs or export_svcs:
1487            req['service'] = [ ]
1488
1489            for i, s in enumerate(import_svcs):
1490                idx = 'import%d' % i
1491                sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
1492                if s.params:
1493                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1494                            for k, v in s.params.items()]
1495                req['service'].append(sr)
1496
1497            for i, s in enumerate(export_svcs):
1498                idx = 'export%d' % i
1499                e_keys[idx] = s
1500                sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
1501                if s.params:
1502                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
1503                            for k, v in s.params.items()]
1504                req['service'].append(sr)
1505
1506
1507        if self.local_access.has_key(uri):
1508            # Local access call
1509            req = { 'RequestAccessRequestBody' : req }
1510            r = self.local_access[uri].RequestAccess(req, 
1511                    fedid(file=self.cert_file))
1512            r = { 'RequestAccessResponseBody' : r }
1513        else:
1514            r = self.call_RequestAccess(uri, req, 
1515                    self.cert_file, self.cert_pwd, self.trusted_certs)
1516
1517        tbparam[tb] = { 
1518                "allocID" : r['allocID'],
1519                "uri": uri,
1520                }
1521
1522        # Collect the responses corresponding to the services this testbed
1523        # exports.  These will be the service requests that we will include in
1524        # the start segment requests (with appropriate visibility values) to
1525        # import and export the segments.
1526        for s in r.get('service', []):
1527            id = s.get('id', None)
1528            if id and id in e_keys:
1529                e_keys[id].reqs.append(s)
1530
1531        # Add attributes to parameter space.  We don't allow attributes to
1532        # overlay any parameters already installed.
1533        for a in r.get('fedAttr', []):
1534            try:
1535                if a['attribute'] and \
1536                        isinstance(a['attribute'], basestring)\
1537                        and not tbparam[tb].has_key(a['attribute'].lower()):
1538                    tbparam[tb][a['attribute'].lower()] = a['value']
1539            except KeyError:
1540                self.log.error("Bad attribute in response: %s" % a)
1541
1542
1543    def split_topology(self, top, topo, testbeds):
1544        """
1545        Create the sub-topologies that are needed for experiment instantiation.
1546        """
1547        for tb in testbeds:
1548            topo[tb] = top.clone()
1549            # copy in for loop allows deletions from the original
1550            for e in [ e for e in topo[tb].elements]:
1551                etb = e.get_attribute('testbed')
1552                # NB: elements without a testbed attribute won't appear in any
1553                # sub topologies. 
1554                if not etb or etb != tb:
1555                    for i in e.interface:
1556                        for s in i.subs:
1557                            try:
1558                                s.interfaces.remove(i)
1559                            except ValueError:
1560                                raise service_error(service_error.internal,
1561                                        "Can't remove interface??")
1562                    topo[tb].elements.remove(e)
1563            topo[tb].make_indices()
1564
1565    def wrangle_software(self, expid, top, topo, tbparams):
1566        """
1567        Copy software out to the repository directory, allocate permissions and
1568        rewrite the segment topologies to look for the software in local
1569        places.
1570        """
1571
1572        # Copy the rpms and tarfiles to a distribution directory from
1573        # which the federants can retrieve them
1574        linkpath = "%s/software" %  expid
1575        softdir ="%s/%s" % ( self.repodir, linkpath)
1576        softmap = { }
1577        # These are in a list of tuples format (each kit).  This comprehension
1578        # unwraps them into a single list of tuples that initilaizes the set of
1579        # tuples.
1580        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1581                for p, t in l ])
1582        pkgs.update([x.location for e in top.elements \
1583                for x in e.software])
1584        try:
1585            os.makedirs(softdir)
1586        except EnvironmentError, e:
1587            raise service_error(
1588                    "Cannot create software directory: %s" % e)
1589        # The actual copying.  Everything's converted into a url for copying.
1590        for pkg in pkgs:
1591            loc = pkg
1592
1593            scheme, host, path = urlparse(loc)[0:3]
1594            dest = os.path.basename(path)
1595            if not scheme:
1596                if not loc.startswith('/'):
1597                    loc = "/%s" % loc
1598                loc = "file://%s" %loc
1599            try:
1600                u = urlopen(loc)
1601            except Exception, e:
1602                raise service_error(service_error.req, 
1603                        "Cannot open %s: %s" % (loc, e))
1604            try:
1605                f = open("%s/%s" % (softdir, dest) , "w")
1606                self.log.debug("Writing %s/%s" % (softdir,dest) )
1607                data = u.read(4096)
1608                while data:
1609                    f.write(data)
1610                    data = u.read(4096)
1611                f.close()
1612                u.close()
1613            except Exception, e:
1614                raise service_error(service_error.internal,
1615                        "Could not copy %s: %s" % (loc, e))
1616            path = re.sub("/tmp", "", linkpath)
1617            # XXX
1618            softmap[pkg] = \
1619                    "%s/%s/%s" %\
1620                    ( self.repo_url, path, dest)
1621
1622            # Allow the individual segments to access the software.
1623            for tb in tbparams.keys():
1624                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1625                        "/%s/%s" % ( path, dest))
1626            self.auth.save()
1627
1628        # Convert the software locations in the segments into the local
1629        # copies on this host
1630        for soft in [ s for tb in topo.values() \
1631                for e in tb.elements \
1632                    if getattr(e, 'software', False) \
1633                        for s in e.software ]:
1634            if softmap.has_key(soft.location):
1635                soft.location = softmap[soft.location]
1636
1637
1638    def new_experiment(self, req, fid):
1639        """
1640        The external interface to empty initial experiment creation called from
1641        the dispatcher.
1642
1643        Creates a working directory, splits the incoming description using the
1644        splitter script and parses out the avrious subsections using the
1645        lcasses above.  Once each sub-experiment is created, use pooled threads
1646        to instantiate them and start it all up.
1647        """
1648        req = req.get('NewRequestBody', None)
1649        if not req:
1650            raise service_error(service_error.req,
1651                    "Bad request format (no NewRequestBody)")
1652
1653        if self.auth.import_credentials(data_list=req.get('credential', [])):
1654            self.auth.save()
1655       
1656        if not self.auth.check_attribute(fid, 'new'):
1657            raise service_error(service_error.access, "New access denied")
1658
1659        try:
1660            tmpdir = tempfile.mkdtemp(prefix="split-")
1661        except EnvironmentError:
1662            raise service_error(service_error.internal, "Cannot create tmp dir")
1663
1664        try:
1665            access_user = self.accessdb[fid]
1666        except KeyError:
1667            raise service_error(service_error.internal,
1668                    "Access map and authorizer out of sync in " + \
1669                            "new_experiment for fedid %s"  % fid)
1670
1671        pid = "dummy"
1672        gid = "dummy"
1673
1674        # Generate an ID for the experiment (slice) and a certificate that the
1675        # allocator can use to prove they own it.  We'll ship it back through
1676        # the encrypted connection.  If the requester supplied one, use it.
1677        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1678            expcert = req['experimentAccess']['X509']
1679            expid = fedid(certstr=expcert)
1680            self.state_lock.acquire()
1681            if expid in self.state:
1682                self.state_lock.release()
1683                raise service_error(service_error.req, 
1684                        'fedid %s identifies an existing experiment' % expid)
1685            self.state_lock.release()
1686        else:
1687            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1688
1689        #now we're done with the tmpdir, and it should be empty
1690        if self.cleanup:
1691            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1692            os.rmdir(tmpdir)
1693        else:
1694            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1695
1696        eid = self.create_experiment_state(fid, req, expid, expcert, 
1697                state='empty')
1698
1699        # Let users touch the state
1700        self.auth.set_attribute(fid, expid)
1701        self.auth.set_attribute(expid, expid)
1702        # Override fedids can manipulate state as well
1703        for o in self.overrides:
1704            self.auth.set_attribute(o, expid)
1705        self.auth.save()
1706
1707        rv = {
1708                'experimentID': [
1709                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1710                ],
1711                'experimentStatus': 'empty',
1712                'experimentAccess': { 'X509' : expcert }
1713            }
1714
1715        return rv
1716
1717    def create_experiment(self, req, fid):
1718        """
1719        The external interface to experiment creation called from the
1720        dispatcher.
1721
1722        Creates a working directory, splits the incoming description using the
1723        splitter script and parses out the various subsections using the
1724        classes above.  Once each sub-experiment is created, use pooled threads
1725        to instantiate them and start it all up.
1726        """
1727
1728        req = req.get('CreateRequestBody', None)
1729        if not req:
1730            raise service_error(service_error.req,
1731                    "Bad request format (no CreateRequestBody)")
1732
1733        # Get the experiment access
1734        exp = req.get('experimentID', None)
1735        if exp:
1736            if exp.has_key('fedid'):
1737                key = exp['fedid']
1738                expid = key
1739                eid = None
1740            elif exp.has_key('localname'):
1741                key = exp['localname']
1742                eid = key
1743                expid = None
1744            else:
1745                raise service_error(service_error.req, "Unknown lookup type")
1746        else:
1747            raise service_error(service_error.req, "No request?")
1748
1749        # Import information from the requester
1750        if self.auth.import_credentials(data_list=req.get('credential', [])):
1751            self.auth.save()
1752
1753        self.check_experiment_access(fid, key)
1754
1755        # Install the testbed map entries supplied with the request into a copy
1756        # of the testbed map.
1757        tbmap = dict(self.tbmap)
1758        for m in req.get('testbedmap', []):
1759            if 'testbed' in m and 'uri' in m:
1760                tbmap[m['testbed']] = m['uri']
1761
1762        try:
1763            tmpdir = tempfile.mkdtemp(prefix="split-")
1764            os.mkdir(tmpdir+"/keys")
1765        except EnvironmentError:
1766            raise service_error(service_error.internal, "Cannot create tmp dir")
1767
1768        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1769        gw_secretkey_base = "fed.%s" % self.ssh_type
1770        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1771        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1772        tclfile = tmpdir + "/experiment.tcl"
1773        tbparams = { }
1774        try:
1775            access_user = self.accessdb[fid]
1776        except KeyError:
1777            raise service_error(service_error.internal,
1778                    "Access map and authorizer out of sync in " + \
1779                            "create_experiment for fedid %s"  % fid)
1780
1781        pid = "dummy"
1782        gid = "dummy"
1783
1784        # The tcl parser needs to read a file so put the content into that file
1785        descr=req.get('experimentdescription', None)
1786        if descr:
1787            file_content=descr.get('ns2description', None)
1788            if file_content:
1789                try:
1790                    f = open(tclfile, 'w')
1791                    f.write(file_content)
1792                    f.close()
1793                except EnvironmentError:
1794                    raise service_error(service_error.internal,
1795                            "Cannot write temp experiment description")
1796            else:
1797                raise service_error(service_error.req, 
1798                        "Only ns2descriptions supported")
1799        else:
1800            raise service_error(service_error.req, "No experiment description")
1801
1802        self.state_lock.acquire()
1803        if self.state.has_key(key):
1804            self.state[key]['experimentStatus'] = "starting"
1805            for e in self.state[key].get('experimentID',[]):
1806                if not expid and e.has_key('fedid'):
1807                    expid = e['fedid']
1808                elif not eid and e.has_key('localname'):
1809                    eid = e['localname']
1810        self.state_lock.release()
1811
1812        if not (eid and expid):
1813            raise service_error(service_error.internal, 
1814                    "Cannot find local experiment info!?")
1815
1816        try: 
1817            # This catches exceptions to clear the placeholder if necessary
1818            try:
1819                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1820            except ValueError:
1821                raise service_error(service_error.server_config, 
1822                        "Bad key type (%s)" % self.ssh_type)
1823
1824            # Copy the service request
1825            tb_services = [ s for s in req.get('service',[]) ]
1826            # Translate to topdl
1827            if self.splitter_url:
1828                self.log.debug("Calling remote topdl translator at %s" % \
1829                        self.splitter_url)
1830                top = self.remote_ns2topdl(self.splitter_url, file_content)
1831            else:
1832                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1833                    str(self.muxmax), '-m', 'dummy']
1834
1835                tclcmd.extend([pid, gid, eid, tclfile])
1836
1837                self.log.debug("running local splitter %s", " ".join(tclcmd))
1838                # This is just fantastic.  As a side effect the parser copies
1839                # tb_compat.tcl into the current directory, so that directory
1840                # must be writable by the fedd user.  Doing this in the
1841                # temporary subdir ensures this is the case.
1842                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1843                        cwd=tmpdir)
1844                split_data = tclparser.stdout
1845
1846                top = topdl.topology_from_xml(file=split_data, top="experiment")
1847
1848            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1849            # Find the testbeds to look up
1850            testbeds = set([ a.value for e in top.elements \
1851                    for a in e.attribute \
1852                        if a.attribute == 'testbed'])
1853
1854            tb_hosts = { }
1855            for tb in testbeds:
1856                tb_hosts[tb] = [ e.name for e in top.elements \
1857                        if isinstance(e, topdl.Computer) and \
1858                            e.get_attribute('testbed') and \
1859                            e.get_attribute('testbed') == tb]
1860
1861            masters = { }           # testbeds exporting services
1862            pmasters = { }          # Testbeds exporting services that
1863                                    # need portals
1864            for s in tb_services:
1865                # If this is a service request with the importall field
1866                # set, fill it out.
1867
1868                if s.get('importall', False):
1869                    s['import'] = [ tb for tb in testbeds \
1870                            if tb not in s.get('export',[])]
1871                    del s['importall']
1872
1873                # Add the service to masters
1874                for tb in s.get('export', []):
1875                    if s.get('name', None):
1876                        if tb not in masters:
1877                            masters[tb] = [ ]
1878
1879                        params = { }
1880                        if 'fedAttr' in s:
1881                            for a in s['fedAttr']:
1882                                params[a.get('attribute', '')] = \
1883                                        a.get('value','')
1884
1885                        fser = federated_service(name=s['name'],
1886                                exporter=tb, importers=s.get('import',[]),
1887                                params=params)
1888                        if fser.name == 'hide_hosts' \
1889                                and 'hosts' not in fser.params:
1890                            fser.params['hosts'] = \
1891                                    ",".join(tb_hosts.get(fser.exporter, []))
1892                        masters[tb].append(fser)
1893
1894                        if fser.portal:
1895                            if tb not in pmasters: pmasters[tb] = [ fser ]
1896                            else: pmasters[tb].append(fser)
1897                    else:
1898                        self.log.error('Testbed service does not have name " + \
1899                                "and importers')
1900
1901
1902            allocated = { }         # Testbeds we can access
1903            topo ={ }               # Sub topologies
1904            connInfo = { }          # Connection information
1905
1906            if self.auth_type == 'legacy':
1907                self.get_access_to_testbeds(testbeds, access_user, allocated, 
1908                        tbparams, masters, tbmap)
1909            elif self.auth_type == 'abac':
1910                self.get_abac_access_to_testbeds(testbeds, fid, allocated, 
1911                        tbparams, masters, tbmap)
1912            else:
1913                raise service_error(service_error.internal, 
1914                        "Unknown auth_type %s" % self.auth_type)
1915
1916            self.split_topology(top, topo, testbeds)
1917
1918            # Copy configuration files into the remote file store
1919            # The config urlpath
1920            configpath = "/%s/config" % expid
1921            # The config file system location
1922            configdir ="%s%s" % ( self.repodir, configpath)
1923            try:
1924                os.makedirs(configdir)
1925            except EnvironmentError, e:
1926                raise service_error(service_error.internal,
1927                        "Cannot create config directory: %s" % e)
1928            try:
1929                f = open("%s/hosts" % configdir, "w")
1930                f.write('\n'.join(hosts))
1931                f.close()
1932            except EnvironmentError, e:
1933                raise service_error(service_error.internal, 
1934                        "Cannot write hosts file: %s" % e)
1935            try:
1936                copy_file("%s" % gw_pubkey, "%s/%s" % \
1937                        (configdir, gw_pubkey_base))
1938                copy_file("%s" % gw_secretkey, "%s/%s" % \
1939                        (configdir, gw_secretkey_base))
1940            except EnvironmentError, e:
1941                raise service_error(service_error.internal, 
1942                        "Cannot copy keyfiles: %s" % e)
1943
1944            # Allow the individual testbeds to access the configuration files.
1945            for tb in tbparams.keys():
1946                asignee = tbparams[tb]['allocID']['fedid']
1947                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1948                    self.auth.set_attribute(asignee, "%s/%s" % \
1949                            (configpath, f))
1950
1951            part = experiment_partition(self.auth, self.store_url, tbmap,
1952                    self.muxmax, self.direct_transit)
1953            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1954                    connInfo, expid)
1955            # Now get access to the dynamic testbeds (those added above)
1956            for tb in [ t for t in topo if t not in allocated]:
1957                self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1958                allocated[tb] = 1
1959                store_keys = topo[tb].get_attribute('store_keys')
1960                # Give the testbed access to keys it exports or imports
1961                if store_keys:
1962                    for sk in store_keys.split(" "):
1963                        self.auth.set_attribute(\
1964                                tbparams[tb]['allocID']['fedid'], sk)
1965            self.auth.save()
1966
1967            self.wrangle_software(expid, top, topo, tbparams)
1968
1969            vtopo = topdl.topology_to_vtopo(top)
1970            vis = self.genviz(vtopo)
1971
1972            # save federant information
1973            for k in allocated.keys():
1974                tbparams[k]['federant'] = {
1975                        'name': [ { 'localname' : eid} ],
1976                        'allocID' : tbparams[k]['allocID'],
1977                        'uri': tbparams[k]['uri'],
1978                    }
1979
1980            self.state_lock.acquire()
1981            self.state[eid]['vtopo'] = vtopo
1982            self.state[eid]['vis'] = vis
1983            self.state[eid]['experimentdescription'] = \
1984                    { 'topdldescription': top.to_dict() }
1985            self.state[eid]['federant'] = \
1986                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1987                        if tbparams[tb].has_key('federant') ]
1988            if self.state_filename: 
1989                self.write_state()
1990            self.state_lock.release()
1991        except service_error, e:
1992            # If something goes wrong in the parse (usually an access error)
1993            # clear the placeholder state.  From here on out the code delays
1994            # exceptions.  Failing at this point returns a fault to the remote
1995            # caller.
1996
1997            self.state_lock.acquire()
1998            del self.state[eid]
1999            del self.state[expid]
2000            if self.state_filename: self.write_state()
2001            self.state_lock.release()
2002            raise e
2003
2004
2005        # Start the background swapper and return the starting state.  From
2006        # here on out, the state will stick around a while.
2007
2008        # Let users touch the state
2009        self.auth.set_attribute(fid, expid)
2010        self.auth.set_attribute(expid, expid)
2011        # Override fedids can manipulate state as well
2012        for o in self.overrides:
2013            self.auth.set_attribute(o, expid)
2014        self.auth.save()
2015
2016        # Create a logger that logs to the experiment's state object as well as
2017        # to the main log file.
2018        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2019        alloc_collector = self.list_log(self.state[eid]['log'])
2020        h = logging.StreamHandler(alloc_collector)
2021        # XXX: there should be a global one of these rather than repeating the
2022        # code.
2023        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2024                    '%d %b %y %H:%M:%S'))
2025        alloc_log.addHandler(h)
2026       
2027        attrs = [ 
2028                {
2029                    'attribute': 'ssh_pubkey', 
2030                    'value': '%s/%s/config/%s' % \
2031                            (self.repo_url, expid, gw_pubkey_base)
2032                },
2033                {
2034                    'attribute': 'ssh_secretkey', 
2035                    'value': '%s/%s/config/%s' % \
2036                            (self.repo_url, expid, gw_secretkey_base)
2037                },
2038                {
2039                    'attribute': 'hosts', 
2040                    'value': '%s/%s/config/hosts' % \
2041                            (self.repo_url, expid)
2042                },
2043            ]
2044
2045        # transit and disconnected testbeds may not have a connInfo entry.
2046        # Fill in the blanks.
2047        for t in allocated.keys():
2048            if not connInfo.has_key(t):
2049                connInfo[t] = { }
2050
2051        # Start a thread to do the resource allocation
2052        t  = Thread(target=self.allocate_resources,
2053                args=(allocated, masters, eid, expid, tbparams, 
2054                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2055                    connInfo, tbmap),
2056                name=eid)
2057        t.start()
2058
2059        rv = {
2060                'experimentID': [
2061                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2062                ],
2063                'experimentStatus': 'starting',
2064            }
2065
2066        return rv
2067   
2068    def get_experiment_fedid(self, key):
2069        """
2070        find the fedid associated with the localname key in the state database.
2071        """
2072
2073        rv = None
2074        self.state_lock.acquire()
2075        if self.state.has_key(key):
2076            if isinstance(self.state[key], dict):
2077                try:
2078                    kl = [ f['fedid'] for f in \
2079                            self.state[key]['experimentID']\
2080                                if f.has_key('fedid') ]
2081                except KeyError:
2082                    self.state_lock.release()
2083                    raise service_error(service_error.internal, 
2084                            "No fedid for experiment %s when getting "+\
2085                                    "fedid(!?)" % key)
2086                if len(kl) == 1:
2087                    rv = kl[0]
2088                else:
2089                    self.state_lock.release()
2090                    raise service_error(service_error.internal, 
2091                            "multiple fedids for experiment %s when " +\
2092                                    "getting fedid(!?)" % key)
2093            else:
2094                self.state_lock.release()
2095                raise service_error(service_error.internal, 
2096                        "Unexpected state for %s" % key)
2097        self.state_lock.release()
2098        return rv
2099
2100    def check_experiment_access(self, fid, key):
2101        """
2102        Confirm that the fid has access to the experiment.  Though a request
2103        may be made in terms of a local name, the access attribute is always
2104        the experiment's fedid.
2105        """
2106        if not isinstance(key, fedid):
2107            key = self.get_experiment_fedid(key)
2108
2109        if self.auth.check_attribute(fid, key):
2110            return True
2111        else:
2112            raise service_error(service_error.access, "Access Denied")
2113
2114
2115    def get_handler(self, path, fid):
2116        self.log.info("Get handler %s %s" % (path, fid))
2117        if self.auth.check_attribute(fid, path):
2118            return ("%s/%s" % (self.repodir, path), "application/binary")
2119        else:
2120            return (None, None)
2121
2122    def get_vtopo(self, req, fid):
2123        """
2124        Return the stored virtual topology for this experiment
2125        """
2126        rv = None
2127        state = None
2128
2129        req = req.get('VtopoRequestBody', None)
2130        if not req:
2131            raise service_error(service_error.req,
2132                    "Bad request format (no VtopoRequestBody)")
2133        exp = req.get('experiment', None)
2134        if exp:
2135            if exp.has_key('fedid'):
2136                key = exp['fedid']
2137                keytype = "fedid"
2138            elif exp.has_key('localname'):
2139                key = exp['localname']
2140                keytype = "localname"
2141            else:
2142                raise service_error(service_error.req, "Unknown lookup type")
2143        else:
2144            raise service_error(service_error.req, "No request?")
2145
2146        self.check_experiment_access(fid, key)
2147
2148        self.state_lock.acquire()
2149        if self.state.has_key(key):
2150            if self.state[key].has_key('vtopo'):
2151                rv = { 'experiment' : {keytype: key },\
2152                        'vtopo': self.state[key]['vtopo'],\
2153                    }
2154            else:
2155                state = self.state[key]['experimentStatus']
2156        self.state_lock.release()
2157
2158        if rv: return rv
2159        else: 
2160            if state:
2161                raise service_error(service_error.partial, 
2162                        "Not ready: %s" % state)
2163            else:
2164                raise service_error(service_error.req, "No such experiment")
2165
2166    def get_vis(self, req, fid):
2167        """
2168        Return the stored visualization for this experiment
2169        """
2170        rv = None
2171        state = None
2172
2173        req = req.get('VisRequestBody', None)
2174        if not req:
2175            raise service_error(service_error.req,
2176                    "Bad request format (no VisRequestBody)")
2177        exp = req.get('experiment', None)
2178        if exp:
2179            if exp.has_key('fedid'):
2180                key = exp['fedid']
2181                keytype = "fedid"
2182            elif exp.has_key('localname'):
2183                key = exp['localname']
2184                keytype = "localname"
2185            else:
2186                raise service_error(service_error.req, "Unknown lookup type")
2187        else:
2188            raise service_error(service_error.req, "No request?")
2189
2190        self.check_experiment_access(fid, key)
2191
2192        self.state_lock.acquire()
2193        if self.state.has_key(key):
2194            if self.state[key].has_key('vis'):
2195                rv =  { 'experiment' : {keytype: key },\
2196                        'vis': self.state[key]['vis'],\
2197                        }
2198            else:
2199                state = self.state[key]['experimentStatus']
2200        self.state_lock.release()
2201
2202        if rv: return rv
2203        else:
2204            if state:
2205                raise service_error(service_error.partial, 
2206                        "Not ready: %s" % state)
2207            else:
2208                raise service_error(service_error.req, "No such experiment")
2209
2210    def clean_info_response(self, rv):
2211        """
2212        Remove the information in the experiment's state object that is not in
2213        the info response.
2214        """
2215        # Remove the owner info (should always be there, but...)
2216        if rv.has_key('owner'): del rv['owner']
2217
2218        # Convert the log into the allocationLog parameter and remove the
2219        # log entry (with defensive programming)
2220        if rv.has_key('log'):
2221            rv['allocationLog'] = "".join(rv['log'])
2222            del rv['log']
2223        else:
2224            rv['allocationLog'] = ""
2225
2226        if rv['experimentStatus'] != 'active':
2227            if rv.has_key('federant'): del rv['federant']
2228        else:
2229            # remove the allocationID and uri info from each federant
2230            for f in rv.get('federant', []):
2231                if f.has_key('allocID'): del f['allocID']
2232                if f.has_key('uri'): del f['uri']
2233
2234        return rv
2235
2236    def get_info(self, req, fid):
2237        """
2238        Return all the stored info about this experiment
2239        """
2240        rv = None
2241
2242        req = req.get('InfoRequestBody', None)
2243        if not req:
2244            raise service_error(service_error.req,
2245                    "Bad request format (no InfoRequestBody)")
2246        exp = req.get('experiment', None)
2247        if exp:
2248            if exp.has_key('fedid'):
2249                key = exp['fedid']
2250                keytype = "fedid"
2251            elif exp.has_key('localname'):
2252                key = exp['localname']
2253                keytype = "localname"
2254            else:
2255                raise service_error(service_error.req, "Unknown lookup type")
2256        else:
2257            raise service_error(service_error.req, "No request?")
2258
2259        self.check_experiment_access(fid, key)
2260
2261        # The state may be massaged by the service function that called
2262        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2263        # state.
2264        self.state_lock.acquire()
2265        if self.state.has_key(key):
2266            rv = copy.deepcopy(self.state[key])
2267        self.state_lock.release()
2268
2269        if rv:
2270            return self.clean_info_response(rv)
2271        else:
2272            raise service_error(service_error.req, "No such experiment")
2273
2274    def get_multi_info(self, req, fid):
2275        """
2276        Return all the stored info that this fedid can access
2277        """
2278        rv = { 'info': [ ] }
2279
2280        self.state_lock.acquire()
2281        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2282            try:
2283                self.check_experiment_access(fid, key)
2284            except service_error, e:
2285                if e.code == service_error.access:
2286                    continue
2287                else:
2288                    self.state_lock.release()
2289                    raise e
2290
2291            if self.state.has_key(key):
2292                e = copy.deepcopy(self.state[key])
2293                e = self.clean_info_response(e)
2294                rv['info'].append(e)
2295        self.state_lock.release()
2296        return rv
2297
2298    def remove_dirs(self, dir):
2299        """
2300        Remove the directory tree and all files rooted at dir.  Log any errors,
2301        but continue.
2302        """
2303        self.log.debug("[removedirs]: removing %s" % dir)
2304        try:
2305            for path, dirs, files in os.walk(dir, topdown=False):
2306                for f in files:
2307                    os.remove(os.path.join(path, f))
2308                for d in dirs:
2309                    os.rmdir(os.path.join(path, d))
2310            os.rmdir(dir)
2311        except EnvironmentError, e:
2312            self.log.error("Error deleting directory tree in %s" % e);
2313
2314    def terminate_experiment(self, req, fid):
2315        """
2316        Swap this experiment out on the federants and delete the shared
2317        information
2318        """
2319        tbparams = { }
2320        req = req.get('TerminateRequestBody', None)
2321        if not req:
2322            raise service_error(service_error.req,
2323                    "Bad request format (no TerminateRequestBody)")
2324        force = req.get('force', False)
2325        exp = req.get('experiment', None)
2326        if exp:
2327            if exp.has_key('fedid'):
2328                key = exp['fedid']
2329                keytype = "fedid"
2330            elif exp.has_key('localname'):
2331                key = exp['localname']
2332                keytype = "localname"
2333            else:
2334                raise service_error(service_error.req, "Unknown lookup type")
2335        else:
2336            raise service_error(service_error.req, "No request?")
2337
2338        self.check_experiment_access(fid, key)
2339
2340        dealloc_list = [ ]
2341
2342
2343        # Create a logger that logs to the dealloc_list as well as to the main
2344        # log file.
2345        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2346        h = logging.StreamHandler(self.list_log(dealloc_list))
2347        # XXX: there should be a global one of these rather than repeating the
2348        # code.
2349        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2350                    '%d %b %y %H:%M:%S'))
2351        dealloc_log.addHandler(h)
2352
2353        self.state_lock.acquire()
2354        fed_exp = self.state.get(key, None)
2355        repo = None
2356
2357        if fed_exp:
2358            # This branch of the conditional holds the lock to generate a
2359            # consistent temporary tbparams variable to deallocate experiments.
2360            # It releases the lock to do the deallocations and reacquires it to
2361            # remove the experiment state when the termination is complete.
2362
2363            # First make sure that the experiment creation is complete.
2364            status = fed_exp.get('experimentStatus', None)
2365
2366            if status:
2367                if status in ('starting', 'terminating'):
2368                    if not force:
2369                        self.state_lock.release()
2370                        raise service_error(service_error.partial, 
2371                                'Experiment still being created or destroyed')
2372                    else:
2373                        self.log.warning('Experiment in %s state ' % status + \
2374                                'being terminated by force.')
2375            else:
2376                # No status??? trouble
2377                self.state_lock.release()
2378                raise service_error(service_error.internal,
2379                        "Experiment has no status!?")
2380
2381            ids = []
2382            #  experimentID is a list of dicts that are self-describing
2383            #  identifiers.  This finds all the fedids and localnames - the
2384            #  keys of self.state - and puts them into ids.
2385            for id in fed_exp.get('experimentID', []):
2386                if id.has_key('fedid'): 
2387                    ids.append(id['fedid'])
2388                    repo = "%s" % id['fedid']
2389                if id.has_key('localname'): ids.append(id['localname'])
2390
2391            # Collect the allocation/segment ids into a dict keyed by the fedid
2392            # of the allocation (or a monotonically increasing integer) that
2393            # contains a tuple of uri, aid (which is a dict...)
2394            for i, fed in enumerate(fed_exp.get('federant', [])):
2395                try:
2396                    uri = fed['uri']
2397                    aid = fed['allocID']
2398                    k = fed['allocID'].get('fedid', i)
2399                except KeyError, e:
2400                    continue
2401                tbparams[k] = (uri, aid)
2402            fed_exp['experimentStatus'] = 'terminating'
2403            if self.state_filename: self.write_state()
2404            self.state_lock.release()
2405
2406            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2407            # then completes, so we can't wait if nothing starts.  So, no
2408            # tbparams, no start.
2409            if len(tbparams) > 0:
2410                thread_pool = self.thread_pool(self.nthreads)
2411                for k in tbparams.keys():
2412                    # Create and start a thread to stop the segment
2413                    thread_pool.wait_for_slot()
2414                    uri, aid = tbparams[k]
2415                    t  = self.pooled_thread(\
2416                            target=self.terminate_segment(log=dealloc_log,
2417                                testbed=uri,
2418                                cert_file=self.cert_file, 
2419                                cert_pwd=self.cert_pwd,
2420                                trusted_certs=self.trusted_certs,
2421                                caller=self.call_TerminateSegment),
2422                            args=(uri, aid), name=k,
2423                            pdata=thread_pool, trace_file=self.trace_file)
2424                    t.start()
2425                # Wait for completions
2426                thread_pool.wait_for_all_done()
2427
2428            # release the allocations (failed experiments have done this
2429            # already, and starting experiments may be in odd states, so we
2430            # ignore errors releasing those allocations
2431            try: 
2432                for k in tbparams.keys():
2433                    # This releases access by uri
2434                    uri, aid = tbparams[k]
2435                    self.release_access(None, aid, uri=uri)
2436            except service_error, e:
2437                if status != 'failed' and not force:
2438                    raise e
2439
2440            # Remove the terminated experiment
2441            self.state_lock.acquire()
2442            for id in ids:
2443                if self.state.has_key(id): del self.state[id]
2444
2445            if self.state_filename: self.write_state()
2446            self.state_lock.release()
2447
2448            # Delete any synch points associated with this experiment.  All
2449            # synch points begin with the fedid of the experiment.
2450            fedid_keys = set(["fedid:%s" % f for f in ids \
2451                    if isinstance(f, fedid)])
2452            for k in self.synch_store.all_keys():
2453                try:
2454                    if len(k) > 45 and k[0:46] in fedid_keys:
2455                        self.synch_store.del_value(k)
2456                except synch_store.BadDeletionError:
2457                    pass
2458            self.write_store()
2459
2460            # Remove software and other cached stuff from the filesystem.
2461            if repo:
2462                self.remove_dirs("%s/%s" % (self.repodir, repo))
2463               
2464            return { 
2465                    'experiment': exp , 
2466                    'deallocationLog': "".join(dealloc_list),
2467                    }
2468        else:
2469            # Don't forget to release the lock
2470            self.state_lock.release()
2471            raise service_error(service_error.req, "No saved state")
2472
2473
2474    def GetValue(self, req, fid):
2475        """
2476        Get a value from the synchronized store
2477        """
2478        req = req.get('GetValueRequestBody', None)
2479        if not req:
2480            raise service_error(service_error.req,
2481                    "Bad request format (no GetValueRequestBody)")
2482       
2483        name = req['name']
2484        wait = req['wait']
2485        rv = { 'name': name }
2486
2487        if self.auth.check_attribute(fid, name):
2488            self.log.debug("[GetValue] asking for %s " % name)
2489            try:
2490                v = self.synch_store.get_value(name, wait)
2491            except synch_store.RevokedKeyError:
2492                # No more synch on this key
2493                raise service_error(service_error.federant, 
2494                        "Synch key %s revoked" % name)
2495            if v is not None:
2496                rv['value'] = v
2497            self.log.debug("[GetValue] got %s from %s" % (v, name))
2498            return rv
2499        else:
2500            raise service_error(service_error.access, "Access Denied")
2501       
2502
2503    def SetValue(self, req, fid):
2504        """
2505        Set a value in the synchronized store
2506        """
2507        req = req.get('SetValueRequestBody', None)
2508        if not req:
2509            raise service_error(service_error.req,
2510                    "Bad request format (no SetValueRequestBody)")
2511       
2512        name = req['name']
2513        v = req['value']
2514
2515        if self.auth.check_attribute(fid, name):
2516            try:
2517                self.synch_store.set_value(name, v)
2518                self.write_store()
2519                self.log.debug("[SetValue] set %s to %s" % (name, v))
2520            except synch_store.CollisionError:
2521                # Translate into a service_error
2522                raise service_error(service_error.req,
2523                        "Value already set: %s" %name)
2524            except synch_store.RevokedKeyError:
2525                # No more synch on this key
2526                raise service_error(service_error.federant, 
2527                        "Synch key %s revoked" % name)
2528            return { 'name': name, 'value': v }
2529        else:
2530            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.