External Plug-In Example: pluginTest.py

This section describes an implementation of an external plug-in in the Python programming language (http://www.python.org). The ORB used is OmniORB, which implements a CORBA binding for Python.

The example describes a stateful plug-in. The source code can be found in the file pluginTest.py. The file starts by importing some CORBA-specific modules. It then imports the modules sspPlugin, which contains the stubs generated from the IDL file, and sspPlugin_POA, which contains the skeletons to be used to create the servants.

It then defines the class PluginTestImpl, which extends the sspPlugin_POA.SyncPluginInterface. This is the mark of a stateful plug-in. For a stateless plug-in, the base class should be sspPlugin_POA.PluginInterface.

The PluginTestImpl class defines a default constructor that does nothing. It defines the methods authorize() and track() from the PluginInterface and the methods restart(), restartComplete() and stop() from the SyncPluginInterface.

The methods restart(), restartComplete() and stop() simply log the occurrence of a synchronization.

The method authorize() logs all the attributes of the authorization event (the attributes are sent as a NameValueList sequence, which is mapped to a Python list of NameValue objects) and randomly allows or denies access. If the method decides to deny access, it raises an AuthorizationDenied exception that is caught by the SAE, which denies access to this user.

The method track() also logs all the attributes of the tracking event and sets a parameter of the subscription, namely the sessionTimeout. This means that this subscription will be stopped 20 seconds later, which also generates a tracking event.

pluginTest.py

The following is a sample of the pluginTest.py file.


import sys
import CORBA, PortableServer
import CosNaming
from SAE_idl import sspPlugin, sspPlugin__POA
import random
import getopt
import binascii

class PluginTestImpl(sspPlugin__POA.StateSyncPlugin):
    def __init__(self):
        pass

    def authorize(self, pet, ctx, attr):
        print "authorize(%s)" % (pet)
        self.print_nvl(attr)
        #serviceDir = ctx.getServiceDirectory()
        #userDir = ctx.getUserDirectory()
        #print serviceDir.ldapHost, serviceDir.baseDN
        #print userDir.ldapHost, userDir.baseDN
        if random.randrange(0,6) == 0:
            print "********** I'm going to deny this one"
            raise sspPlugin.PluginInterface.AuthorizationDenied("Go away, better luck next time")
        nvl = []
        #if random.randrange(0,2) == 0:
        #    sessionTimeout = random.randrange(30, 60)
        #    print "set session timeout =", sessionTimeout
        #    nvl.append(sspPlugin.NameValue(sspPlugin.PA_SESSION_TIMEOUT,
        #                                   sspPlugin.AttributeUnion(longVal=sessionTimeout)))
        return nvl

    def track(self, pet, ctx, nvl, subscription):
        print "track(%s)" % pet
        self.print_nvl(nvl)
        print "SessionTimeout =", subscription.sessionTimeout
        print "InterimTime =", subscription.interimTime
        #if random.randrange(0,2) == 0:
        #    subscription.sessionTimeout = random.randrange(30, 60)
        #    print "set session timeout =", subscription.sessionTimeout
        return subscription

    def print_nvl(self, nvl):
        for nv in nvl:
            print "\t", nv.name,
            if nv.value._d == sspPlugin.PAT_STRING:
                print nv.value.stringVal
            elif nv.value._d == sspPlugin.PAT_LONG:
                print nv.value.longVal
            elif nv.value._d == sspPlugin.PAT_LONG_LONG:
                print nv.value.longLongVal
            elif nv.value._d == sspPlugin.PAT_OPAQUE:
                h = binascii.hexlify(nv.value.opaqueVal)
                print ':'.join([ h[i:i+2] for i in range(0, len(h), 2)])
            else:
                print

    def restart(self, pepId):
        print "Restart initiated for pep=" + pepId

    def restartComplete(self, pepId, isPepReset):
        print "Restart completed for pep=" + pepId + ", isReset=" + `isPepReset`

    def stop(self, pepId):
        print "Server stopped: pepId=" + pepId

    #
    # Implementation of StateSyncPlugin interface
    #
    def readyToSync(self, pepids, valid):
        print "readyToSync(%s, %s)" % (pepids, valid)
        return 50

    def trackBulk(self, events):
        print "+++ trackBulk"
        for evt in events:
            print "%s" % evt.pet
            self.print_nvl(evt.nvl)
        print "--- trackBulk"

    def syncComplete(self):
        print "syncComplete"

    def ping(self):
        print "ping"

def publishCOSName(orb, path, plugin):
    names = path.split("/")
    nc = []
    for n in names:
        nc.append( apply(CosNaming.NameComponent,
                         (tuple(n.split(".", 1))+('',))[:2]) )

    obj = orb.resolve_initial_references("NameService")
    rootContext = obj._narrow(CosNaming.NamingContext)
    if rootContext is None:
        print "Could not find root Naming Context"
        sys.exit(1)

    ctx = rootContext

    for i in range(len(nc)-1):
        name = [nc[i]]

        try:
            ctx = ctx.bind_new_context(name)
        except CosNaming.NamingContext.AlreadyBound:
            obj = ctx.resolve(name)
            ctx = obj._narrow(CosNaming.NamingContext)
            if ctx is None:
                print "path exists, but is not a NamingContext"
                sys.exit(1)

    name = [nc[-1]]
    try:
        ctx.bind(name, plugin)
    except CosNaming.NamingContext.AlreadyBound:
        ctx.rebind(name, plugin)

    print "published Obj to corbaname:rir:#%s" % path


def main(argv):
    orb = CORBA.ORB_init(argv, CORBA.ORB_ID)
    poa = orb.resolve_initial_references("RootPOA")

    thePluginTest = PluginTestImpl()
    thePluginObj = thePluginTest._this()

    iorfile = "/var/tmp/test.plugin"
    cosname = "plugins/test.plugin"

    try:
        opts, args = getopt.getopt(argv[1:], 'i:c:',
                                   ['iorfile=', "cosname="])
        for o,a in opts:
            if o in ('-i', '--iorfile'):
                iorfile = a
            elif o in ('c', '--cosname'):
                cosname = a
                iorfile = None
    except getopt.error:
        print "usage: %s [-i file] [-c name] [--iorfile==file] [--cosname=name]" % argv[0]
        sys.exit(1)

    if iorfile:
        print "objectref= file:%s" % iorfile
        file = open(iorfile, "w")
        file.write(orb.object_to_string(thePluginObj))
        file.close()
    elif cosname:
        publishCOSName(orb, cosname, thePluginObj)

    poa._get_the_POAManager().activate()

    try:
        orb.run()
    except:
        print
    print "Plugin stopped"

main(sys.argv)