source: fedd/federation/protogeni_access.py @ 3551ae1

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 3551ae1 was 3551ae1, checked in by Ted Faber <faber@…>, 14 years ago

access is looking better, but segment is a screaming mess. I'm not sure that division makes any sense at all.

  • Property mode set to 100644
File size: 21.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from util import *
18from access_project import access_project
19from fedid import fedid, generate_fedid
20from authorizer import authorizer
21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
28from access import access_base
29
30import topdl
31import list_log
32import proxy_protogeni_segment
33
34
35# Make log messages disappear if noone configures a fedd logger
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.access")
40fl.addHandler(nullHandler())
41
42class access(access_base):
43    """
44    The implementation of access control based on mapping users to projects.
45
46    Users can be mapped to existing projects or have projects created
47    dynamically.  This implements both direct requests and proxies.
48    """
49
50    def __init__(self, config=None, auth=None):
51        """
52        Initializer.  Pulls parameters out of the ConfigParser's access section.
53        """
54
55        access_base.__init__(self, config, auth)
56
57        self.domain = config.get("access", "domain")
58        self.userconfdir = config.get("access","userconfdir")
59        self.userconfcmd = config.get("access","userconfcmd")
60        self.userconfurl = config.get("access","userconfurl")
61        self.federation_software = config.get("access", "federation_software")
62        self.portal_software = config.get("access", "portal_software")
63        self.ssh_port = config.get("access","ssh_port") or "22"
64        self.sshd = config.get("access","sshd")
65        self.sshd_config = config.get("access", "sshd_config")
66        self.access_type = config.get("access", "type")
67        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
68        self.staging_host = config.get("access", "staging_host") \
69                or "ops.emulab.net"
70        self.local_seer_software = config.get("access", "local_seer_software")
71        self.local_seer_image = config.get("access", "local_seer_image")
72        self.local_seer_start = config.get("access", "local_seer_start")
73   
74        self.dragon_endpoint = config.get("access", "dragon")
75        self.dragon_vlans = config.get("access", "dragon_vlans")
76        self.deter_internal = config.get("access", "deter_internal")
77
78        self.tunnel_config = config.getboolean("access", "tunnel_config")
79        self.portal_command = config.get("access", "portal_command")
80        self.portal_image = config.get("access", "portal_image")
81        self.portal_type = config.get("access", "portal_type") or "pc"
82        self.portal_startcommand = config.get("access", "portal_startcommand")
83        self.node_startcommand = config.get("access", "node_startcommand")
84
85        self.federation_software = self.software_list(self.federation_software)
86        self.portal_software = self.software_list(self.portal_software)
87        self.local_seer_software = self.software_list(self.local_seer_software)
88
89        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
90        self.renewal_interval = int(self.renewal_interval) * 60
91
92        self.ch_url = config.get("access", "ch_url")
93        self.sa_url = config.get("access", "sa_url")
94        self.cm_url = config.get("access", "cm_url")
95
96        self.restricted = [ ]
97
98        # read_state in the base_class
99        self.state_lock.acquire()
100        for a  in ('allocation', 'projects', 'keys', 'types'):
101            if a not in self.state:
102                self.state[a] = { }
103        self.allocation = self.state['allocation']
104        self.projects = self.state['projects']
105        self.keys = self.state['keys']
106        self.types = self.state['types']
107        # Add the ownership attributes to the authorizer.  Note that the
108        # indices of the allocation dict are strings, but the attributes are
109        # fedids, so there is a conversion.
110        for k in self.state.get('allocation', {}).keys():
111            for o in self.state['allocation'][k].get('owners', []):
112                self.auth.set_attribute(o, fedid(hexstr=k))
113            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
114
115        self.state_lock.release()
116
117
118        self.log = logging.getLogger("fedd.access")
119        set_log_level(config, "access", self.log)
120
121        self.access = { }
122        if config.has_option("access", "accessdb"):
123            self.read_access(config.get("access", "accessdb"), 
124                    access_obj=self.make_access_info)
125
126        self.start_segment = proxy_protogeni_segment.start_segment
127        self.stop_segment = proxy_protogeni_segment.stop_segment
128        self.renew_segment = proxy_protogeni_segment.renew_segment
129
130        self.lookup_access = self.lookup_access_base
131
132        self.call_SetValue = service_caller('SetValue')
133        self.call_GetValue = service_caller('GetValue')
134        self.exports = {
135                'local_seer_control': self.export_local_seer,
136                'seer_master': self.export_seer_master,
137                'hide_hosts': self.export_hide_hosts,
138                }
139
140        if not self.local_seer_image or not self.local_seer_software or \
141                not self.local_seer_start:
142            if 'local_seer_control' in self.exports:
143                del self.exports['local_seer_control']
144
145        if not self.local_seer_image or not self.local_seer_software or \
146                not self.seer_master_start:
147            if 'seer_master' in self.exports:
148                del self.exports['seer_master']
149
150        self.RenewSlices()
151
152        self.soap_services = {\
153            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
154            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
155            'StartSegment': soap_handler("StartSegment", self.StartSegment),
156            'TerminateSegment': soap_handler("TerminateSegment", 
157                self.TerminateSegment),
158            }
159        self.xmlrpc_services =  {\
160            'RequestAccess': xmlrpc_handler('RequestAccess',
161                self.RequestAccess),
162            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
163                self.ReleaseAccess),
164            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
165            'TerminateSegment': xmlrpc_handler('TerminateSegment',
166                self.TerminateSegment),
167            }
168
169    @staticmethod
170    def make_access_info(s):
171        """
172        Split a string of the form (id, id, id, id) ito its constituent tuples
173        and return them as a tuple.  Use to import access info from the
174        access_db.
175        """
176
177        ss = s.strip()
178        if ss.startswith('(') and ss.endswith(')'):
179            l = [ s.strip() for s  in ss[1:-1].split(",")]
180            if len(l) == 4:
181                return tuple(l)
182            else:
183                raise self.parse_error(
184                        "Exactly 4 elements in access info required")
185        else:
186            raise self.parse_error("Expecting parenthezied values")
187
188
189    def get_handler(self, path, fid):
190        self.log.info("Get handler %s %s" % (path, fid))
191        if self.auth.check_attribute(fid, path) and self.userconfdir:
192            return ("%s/%s" % (self.userconfdir, path), "application/binary")
193        else:
194            return (None, None)
195
196    def build_access_response(self, alloc_id, services):
197        """
198        Create the SOAP response.
199
200        Build the dictionary description of the response and use
201        fedd_utils.pack_soap to create the soap message.  ap is the allocate
202        project message returned from a remote project allocation (even if that
203        allocation was done locally).
204        """
205        # Because alloc_id is already a fedd_services_types.IDType_Holder,
206        # there's no need to repack it
207        msg = { 
208                'allocID': alloc_id,
209                'fedAttr': [
210                    { 'attribute': 'domain', 'value': self.domain } , 
211                ]
212            }
213        if self.dragon_endpoint:
214            msg['fedAttr'].append({'attribute': 'dragon',
215                'value': self.dragon_endpoint})
216        if self.deter_internal:
217            msg['fedAttr'].append({'attribute': 'deter_internal',
218                'value': self.deter_internal})
219        #XXX: ??
220        if self.dragon_vlans:
221            msg['fedAttr'].append({'attribute': 'vlans',
222                'value': self.dragon_vlans})
223
224        if services:
225            msg['service'] = services
226        return msg
227
228    def RequestAccess(self, req, fid):
229        """
230        Handle the access request.
231        """
232
233        # The dance to get into the request body
234        if req.has_key('RequestAccessRequestBody'):
235            req = req['RequestAccessRequestBody']
236        else:
237            raise service_error(service_error.req, "No request!?")
238
239        if req.has_key('destinationTestbed'):
240            dt = unpack_id(req['destinationTestbed'])
241
242        # Request for this fedd
243        found, match = self.lookup_access(req, fid)
244        services, svc_state = self.export_services(req.get('service',[]),
245                None, None)
246        # keep track of what's been added
247        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
248        aid = unicode(allocID)
249
250        self.state_lock.acquire()
251        self.allocation[aid] = { }
252        # The protoGENI certificate
253        self.allocation[aid]['credentials'] = found
254        # The list of owner FIDs
255        self.allocation[aid]['owners'] = [ fid ]
256        self.write_state()
257        self.state_lock.release()
258        self.auth.set_attribute(fid, allocID)
259        self.auth.set_attribute(allocID, allocID)
260
261        try:
262            f = open("%s/%s.pem" % (self.certdir, aid), "w")
263            print >>f, alloc_cert
264            f.close()
265        except EnvironmentError, e:
266            raise service_error(service_error.internal, 
267                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
268        return self.build_access_response({ 'fedid': allocID }, None)
269
270
271    def ReleaseAccess(self, req, fid):
272        # The dance to get into the request body
273        if req.has_key('ReleaseAccessRequestBody'):
274            req = req['ReleaseAccessRequestBody']
275        else:
276            raise service_error(service_error.req, "No request!?")
277
278        # Local request
279        try:
280            if req['allocID'].has_key('localname'):
281                auth_attr = aid = req['allocID']['localname']
282            elif req['allocID'].has_key('fedid'):
283                aid = unicode(req['allocID']['fedid'])
284                auth_attr = req['allocID']['fedid']
285            else:
286                raise service_error(service_error.req,
287                        "Only localnames and fedids are understood")
288        except KeyError:
289            raise service_error(service_error.req, "Badly formed request")
290
291        self.log.debug("[access] deallocation requested for %s", aid)
292        if not self.auth.check_attribute(fid, auth_attr):
293            self.log.debug("[access] deallocation denied for %s", aid)
294            raise service_error(service_error.access, "Access Denied")
295
296        self.state_lock.acquire()
297        if self.allocation.has_key(aid):
298            self.log.debug("Found allocation for %s" %aid)
299            del self.allocation[aid]
300            self.write_state()
301            self.state_lock.release()
302            # And remove the access cert
303            cf = "%s/%s.pem" % (self.certdir, aid)
304            self.log.debug("Removing %s" % cf)
305            os.remove(cf)
306            return { 'allocID': req['allocID'] } 
307        else:
308            self.state_lock.release()
309            raise service_error(service_error.req, "No such allocation")
310
311    def generate_rspec(self, topo, softdir, connInfo):
312        t = topo.clone()
313
314        starts = { }
315        # Weed out the things we aren't going to instantiate: Segments, portal
316        # substrates, and portal interfaces.  (The copy in the for loop allows
317        # us to delete from e.elements in side the for loop).  While we're
318        # touching all the elements, we also adjust paths from the original
319        # testbed to local testbed paths and put the federation commands and
320        # startcommands into a dict so we can start them manually later.
321        # ProtoGENI requires setup before the federation commands run, so we
322        # run them by hand after we've seeded configurations.
323        for e in [e for e in t.elements]:
324            if isinstance(e, topdl.Segment):
325                t.elements.remove(e)
326            # Fix software paths
327            for s in getattr(e, 'software', []):
328                s.location = re.sub("^.*/", softdir, s.location)
329            if isinstance(e, topdl.Computer):
330                if e.get_attribute('portal') and self.portal_startcommand:
331                    # Portals never have a user-specified start command
332                    starts[e.name] = self.portal_startcommand
333                elif self.node_startcommand:
334                    if e.get_attribute('startup'):
335                        starts[e.name] = "%s \\$USER '%s'" % \
336                                (self.node_startcommand, 
337                                        e.get_attribute('startup'))
338                        e.remove_attribute('startup')
339                    else:
340                        starts[e.name] = self.node_startcommand
341
342                # Remove portal interfaces
343                e.interface = [i for i in e.interface \
344                        if not i.get_attribute('portal')]
345
346        t.substrates = [ s.clone() for s in t.substrates ]
347        t.incorporate_elements()
348
349        # Customize the ns2 output for local portal commands and images
350        filters = []
351
352        # NB: these are extra commands issued for the node, not the startcmds
353        if self.portal_command:
354            filters.append(topdl.generate_portal_command_filter(
355                self.portal_command))
356
357        # Convert to rspec and return it
358        exp_rspec = topdl.topology_to_rspec(t, filters)
359
360        return exp_rspec
361
362    def retrieve_software(self, topo, certfile, softdir):
363        """
364        Collect the software that nodes in the topology need loaded and stage
365        it locally.  This implies retrieving it from the experiment_controller
366        and placing it into softdir.  Certfile is used to prove that this node
367        has access to that data (it's the allocation/segment fedid).  Finally
368        local portal and federation software is also copied to the same staging
369        directory for simplicity - all software needed for experiment creation
370        is in softdir.
371        """
372        sw = set()
373        for e in topo.elements:
374            for s in getattr(e, 'software', []):
375                sw.add(s.location)
376        os.mkdir(softdir)
377        for s in sw:
378            self.log.debug("Retrieving %s" % s)
379            try:
380                get_url(s, certfile, softdir)
381            except:
382                t, v, st = sys.exc_info()
383                raise service_error(service_error.internal,
384                        "Error retrieving %s: %s" % (s, v))
385
386        # Copy local portal node software to the tempdir
387        for s in (self.portal_software, self.federation_software):
388            for l, f in s:
389                base = os.path.basename(f)
390                copy_file(f, "%s/%s" % (softdir, base))
391
392        # Ick.  Put this python rpm in a place that it will get moved into
393        # the staging area.  It's a hack to install a modern (in a Roman
394        # sense of modern) python on ProtoGENI
395        python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
396        if os.access("./%s" % python_rpm, os.R_OK):
397            copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
398
399
400    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
401        """
402        Gather common configuration files, retrieve or create an experiment
403        name and project name, and return the ssh_key filenames.  Create an
404        allocation log bound to the state log variable as well.
405        """
406        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
407        ename = None
408        pubkey_base = None
409        secretkey_base = None
410        alloc_log = None
411
412        for a in attrs:
413            if a['attribute'] in configs:
414                try:
415                    self.log.debug("Retrieving %s" % a['value'])
416                    get_url(a['value'], certfile, tmpdir)
417                except:
418                    t, v, st = sys.exc_info()
419                    raise service_error(service_error.internal,
420                            "Error retrieving %s: %s" % (a.get('value', ""), v))
421            if a['attribute'] == 'ssh_pubkey':
422                pubkey_base = a['value'].rpartition('/')[2]
423            if a['attribute'] == 'ssh_secretkey':
424                secretkey_base = a['value'].rpartition('/')[2]
425            if a['attribute'] == 'experiment_name':
426                ename = a['value']
427
428        if not ename:
429            ename = ""
430            for i in range(0,5):
431                ename += random.choice(string.ascii_letters)
432            self.log.warn("No experiment name: picked one randomly: %s" \
433                    % ename)
434
435        self.state_lock.acquire()
436        if self.allocation.has_key(aid):
437            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
438            self.allocation[aid]['experiment'] = ename
439            self.allocation[aid]['log'] = [ ]
440            # Create a logger that logs to the experiment's state object as
441            # well as to the main log file.
442            alloc_log = logging.getLogger('fedd.access.%s' % ename)
443            h = logging.StreamHandler(
444                    list_log.list_log(self.allocation[aid]['log']))
445            # XXX: there should be a global one of these rather than
446            # repeating the code.
447            h.setFormatter(logging.Formatter(
448                "%(asctime)s %(name)s %(message)s",
449                        '%d %b %y %H:%M:%S'))
450            alloc_log.addHandler(h)
451            self.write_state()
452        else:
453            self.log.error("No allocation for %s!?" % aid)
454        self.state_lock.release()
455
456        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
457                cpw, alloc_log)
458
459    def finalize_experiment(self, topo, starter, aid, alloc_id):
460        # Copy the assigned names into the return topology
461        rvtopo = topo.clone()
462        embedding = [ ]
463        for n in starter.node:
464            embedding.append({ 
465                'toponame': n,
466                'physname': ["%s%s" %  (starter.node[n], self.domain)],
467                })
468        # Grab the log (this is some anal locking, but better safe than
469        # sorry)
470        self.state_lock.acquire()
471        logv = "".join(self.allocation[aid]['log'])
472        # It's possible that the StartSegment call gets retried (!).
473        # if the 'started' key is in the allocation, we'll return it rather
474        # than redo the setup.
475        self.allocation[aid]['started'] = { 
476                'allocID': alloc_id,
477                'allocationLog': logv,
478                'segmentdescription': { 
479                    'topdldescription': rvtopo.to_dict() },
480                'embedding': embedding,
481                }
482        retval = copy.deepcopy(self.allocation[aid]['started'])
483        self.write_state()
484        self.state_lock.release()
485
486        return retval
487
488    def StartSegment(self, req, fid):
489        err = None  # Any service_error generated after tmpdir is created
490        rv = None   # Return value from segment creation
491
492        try:
493            req = req['StartSegmentRequestBody']
494            topref = req['segmentdescription']['topdldescription']
495        except KeyError:
496            raise service_error(service_error.req, "Badly formed request")
497
498        connInfo = req.get('connection', [])
499        services = req.get('service', [])
500        auth_attr = req['allocID']['fedid']
501        aid = "%s" % auth_attr
502        attrs = req.get('fedAttr', [])
503        if not self.auth.check_attribute(fid, auth_attr):
504            raise service_error(service_error.access, "Access denied")
505        else:
506            # See if this is a replay of an earlier succeeded StartSegment -
507            # sometimes SSL kills 'em.  If so, replay the response rather than
508            # redoing the allocation.
509            self.state_lock.acquire()
510            retval = self.allocation[aid].get('started', None)
511            self.state_lock.release()
512            if retval:
513                self.log.warning("Duplicate StartSegment for %s: " % aid + \
514                        "replaying response")
515                return retval
516
517        if topref:
518            topo = topdl.Topology(**topref)
519        else:
520            raise service_error(service_error.req, 
521                    "Request missing segmentdescription'")
522
523        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
524        try:
525            tmpdir = tempfile.mkdtemp(prefix="access-")
526            softdir = "%s/software" % tmpdir
527        except EnvironmentError:
528            raise service_error(service_error.internal, "Cannot create tmp dir")
529
530        # Try block alllows us to clean up temporary files.
531        try:
532            self.retrieve_software(topo, certfile, softdir)
533            self.configure_userconf(services, tmpdir)
534            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
535                cpw, alloc_log = self.initialize_experiment_info(attrs,
536                        aid, certfile, tmpdir)
537            # XXX: we really need to put the import and connection info
538            # generation off longer.
539            self.import_store_info(certfile, connInfo)
540            rspec = self.generate_rspec(topo, "%s/%s/" \
541                    % (self.staging_dir, ename), connInfo)
542
543            starter = self.start_segment(keyfile=ssh_key,
544                    debug=self.create_debug, log=alloc_log,
545                    ch_url = self.ch_url, sa_url=self.sa_url,
546                    cm_url=self.cm_url)
547            rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base,
548                    ename,
549                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
550                    certfile, topo, connInfo, services)
551        except EnvironmentError:
552            err = service_error(service_error.internal, "%s" % e)
553        except service_error, e:
554            err = e
555        except:
556            t, v, st = sys.exc_info()
557            err = service_error(service_error.internal, "%s: %s" % \
558                    (v, traceback.extract_tb(st)))
559
560        # Walk up tmpdir, deleting as we go
561        if self.cleanup: self.remove_dirs(tmpdir)
562        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
563
564        if rv:
565            return self.finalize_experiment(topo, starter, aid, req['allocID'])
566        elif err:
567            raise service_error(service_error.federant,
568                    "Swapin failed: %s" % err)
569        else:
570            raise service_error(service_error.federant, "Swapin failed")
571
572    def TerminateSegment(self, req, fid):
573        try:
574            req = req['TerminateSegmentRequestBody']
575        except KeyError:
576            raise service_error(service_error.req, "Badly formed request")
577
578        auth_attr = req['allocID']['fedid']
579        aid = "%s" % auth_attr
580        attrs = req.get('fedAttr', [])
581        if not self.auth.check_attribute(fid, auth_attr):
582            raise service_error(service_error.access, "Access denied")
583
584        self.state_lock.acquire()
585        if self.allocation.has_key(aid):
586            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
587            slice_cred = self.allocation[aid].get('slice_credential', None)
588            ename = self.allocation[aid].get('experiment', None)
589        else:
590            cf, user, ssh_key, cpw = (None, None, None, None)
591            slice_cred = None
592            ename = None
593        self.state_lock.release()
594
595        if ename:
596            staging = "%s/%s" % ( self.staging_dir, ename)
597        else:
598            self.log.warn("Can't find experiment name for %s" % aid)
599            staging = None
600
601        stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug,
602                ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url)
603        stopper(self, user, staging, slice_cred, cf, cpw)
604        return { 'allocID': req['allocID'] }
605
606    def RenewSlices(self):
607        self.log.info("Scanning for slices to renew")
608        self.state_lock.acquire()
609        aids = self.allocation.keys()
610        self.state_lock.release()
611
612        for aid in aids:
613            self.state_lock.acquire()
614            if self.allocation.has_key(aid):
615                name = self.allocation[aid].get('slice_name', None)
616                scred = self.allocation[aid].get('slice_credential', None)
617                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
618            else:
619                name = None
620                scred = None
621            self.state_lock.release()
622
623            if not os.access(cf, os.R_OK):
624                self.log.error(
625                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
626                continue
627
628            # There's a ProtoGENI slice associated with the segment; renew it.
629            if name and scred:
630                renewer = self.renew_segment(log=self.log, 
631                        debug=self.create_debug, keyfile=ssh_key,
632                        cm_url = self.cm_url, sa_url = self.sa_url,
633                        ch_url = self.ch_url)
634                new_scred = renewer(name, scred, self.renewal_interval, cf, cpw)
635                if new_scred:
636                    self.log.info("Slice %s renewed until %s GMT" % \
637                            (name, time.asctime(time.gmtime(\
638                                time.time()+self.renewal_interval))))
639                    self.state_lock.acquire()
640                    if self.allocation.has_key(aid):
641                        self.allocation[aid]['slice_credential'] = new_scred
642                    self.state_lock.release()
643                else:
644                    self.log.info("Failed to renew slice %s " % name)
645
646        # Let's do this all again soon.  (4 tries before the slices time out)   
647        t = Timer(self.renewal_interval/4, self.RenewSlices)
648        t.start()
Note: See TracBrowser for help on using the repository browser.