Implementing Transports and Protocols

Some preliminary information would be handy before delving into details:

How Exactly is User Code Wrapped?

Here’s what happens when a request arrives to an Rpclib server:

The server transport decides whether this is a simple interface document request or a remote procedure call request. Every transport has its own way of dealing with this.

If the incoming request was for the interface document, it’s easy: The interface document needs to be generated and returned as a nice chunk of string to the client. The server transport first calls rpclib.interface._base.InterfaceBase.build_interface_document() which builds and caches the document and later calls the rpclib.interface._base.InterfaceBase.get_interface_document() that returns the cached document.

If it was an RPC request, here’s what happens:

  1. The server must set the ctx.in_string attribute to an iterable of strings. This will contain the incoming byte stream.
  2. The server calls the rpclib.server._base.ServerBase.get_in_object function from its parent class.
  3. The server then calls the create_in_document, decompose_incoming_envelope and deserialize functions from the protocol class in the in_protocol attribute. The first call parses incoming stream to the protocol serializer’s internal representation. This is then split to header and body parts by the second call and deserialized to the native python representation by the third call.
  4. Once the protocol performs its voodoo, the server calls get_out_object which in turn calls the rpclib.application.Application.process_request() function.
  5. The process_request function fires relevant events and calls the rpclib.application.Application.call_wrapper() function. This function is overridable by user, but the overriding function must call the one in rpclib.application.Application.
  6. The call_wrapper function in turn calls the rpclib.service.ServiceBase.call_wrapper() function, which has has the same requirements.
  7. The rpclib.service.ServiceBase.call_wrapper() finally calls the user function, and the value is returned to process_request call, which sets the return value to ctx.out_object.
  8. The server object now calls the get_out_string function to put the response as an iterable of strings in ctx.out_string. The get_out_string function calls the serialize and create_out_string functions of the protocol class.
  9. The server pushes the stream from ctx.out_string back to the client.

The same logic applies to client transports, in reverse.

So if you want to implement a new transport or protocol, all you need to do is to subclass the relevant base class and implement the missing methods.

A Transport Example: A DB-Backed Fan-Out Queue

Here’s the source code in one file: https://github.com/arskom/rpclib/blob/master/examples/queue.py

The following block of code is SQLAlchemy boilerplate for creating the database and other related machinery. Under normal conditions, you should pass the sqlalchemy url to the Producer and Consumer instances instead of the connection object itself, but here as we deal with an in-memory database, global variable ugliness is just a nicer way to pass database handles.

db = create_engine('sqlite:///:memory:')
metadata = MetaData(bind=db)
DeclarativeBase = declarative_base(metadata=metadata)

This is the table where queued messages are stored. Note that it’s a vanilla SQLAlchemy object:

class TaskQueue(DeclarativeBase):
    __tablename__ = 'task_queue'

    id = Column(sqlalchemy.Integer, primary_key=True)
    data = Column(sqlalchemy.LargeBinary, nullable=False)

This is the table where the task id of the last processed task for each worker is stored. Workers are identified by an integer.

class WorkerStatus(DeclarativeBase):
    __tablename__ = 'worker_status'

    worker_id = Column(sqlalchemy.Integer, nullable=False, primary_key=True,
                                                        autoincrement=False)
    task_id = Column(sqlalchemy.Integer, ForeignKey(TaskQueue.id),
                                                             nullable=False)

The consumer is a rpclib.server._base.ServerBase child that receives requests by polling the database.

The transport is for displaying it in the Wsdl. While it’s irrelevant here, it’s nice to put it in:

class Consumer(ServerBase):
    transport = 'http://sqlalchemy.persistent.queue/'

We set the incoming values, create a database connection and set it to self.session:

def __init__(self, db, app, consumer_id):
    ServerBase.__init__(self, app)

    self.session = sessionmaker(bind=db)()
    self.id = consumer_id

We also query the worker status table and get the id for the first task. If there is no record for own worker id, the server starts from the beginning:

try:
    self.session.query(WorkerStatus) \
                .filter_by(worker_id=self.id).one()
except NoResultFound:
    self.session.add(WorkerStatus(worker_id=self.id, task_id=0))
    self.session.commit()

This is the main loop for our server:

def serve_forever(self):
    while True:

We first get the id of the last processed task:

last = self.session.query(WorkerStatus).with_lockmode("update") \
            .filter_by(worker_id=self.id).one()

Which is used to get the next tasks to process:

task_queue = self.session.query(TaskQueue) \
        .filter(TaskQueue.id > last.task_id) \
        .order_by(TaskQueue.id)

Each task is an rpc request, so we create a rpclib.MethodContext instance for each task and set transport-specific data to the ctx.transport object:

for task in task_queue:
    ctx = MethodContext(self.app)
    ctx.in_string = [task.data]
    ctx.transport.consumer_id = self.id
    ctx.transport.task_id = task.id

This call parses the incoming request:

self.get_in_object(ctx)

In case of an error when parsing the request, the server logs the error and continues to process the next task in queue. The get_out_string call is smart enough to notice and serialize the error. If this was a normal server, we’d worry about returning the error to the client as well as logging it.

if ctx.in_error:
    self.get_out_string(ctx)
    logging.error(''.join(ctx.out_string))
    continue

As the request was parsed correctly, the user method can be called to process the task:

self.get_out_object(ctx)

The server should not care whether the error was an expected or unexpected one. So the error is logged and the server continues to process the next task in queue.

if ctx.out_error:
    self.get_out_string(ctx)
    logging.error(''.join(ctx.out_string))
    continue

If task processing went fine, the server serializes the out object and logs that instead.

self.get_out_string(ctx)
logging.debug(''.join(ctx.out_string))

Finally, the task is marked as processed.

last.task_id = task.id self.session.commit()

Once all tasks in queue are consumed, the server waits a pre-defined amount of time before polling the database for new tasks:

time.sleep(10)

This concludes the worker implementation. But how do we put tasks in the task queue? That’s the job of the Producer class that is implemented as an Rpclib client.

Implementing clients is a two-stage operation. The main transport logic is in the rpclib.client.RemoteProcedureBase child that is a native Python callable whose function is to serialize the arguments, send it to the server, receive the reply, deserialize it and pass the return value to the python caller. However, in our case, the client does not return anything as calls are processed asyncronously.

We start with the constructor, where we initialize the SQLAlchemy database connection factory:

class RemoteProcedure(RemoteProcedureBase):
    def __init__(self, db, app, name, out_header):
        RemoteProcedureBase.__init__(self, db, app, name, out_header)

        self.Session = sessionmaker(bind=db)

The implementation of the client is much simpler because we trust that the Rpclib code will do The Right Thing. Here, we serialize the arguments:

def __call__(self, *args, **kwargs):
    session = self.Session()

    self.get_out_object(args, kwargs)
    self.get_out_string()

    out_string = ''.join(self.ctx.out_string)

And put the resulting bytestream to the database:

session.add(TaskQueue(data=out_string))
session.commit()
session.close()

Again here the function does not return anything because this is an asyncronous client.

Here’s the Producer class whose sole purpose is to initialize the right callable factory.

class Producer(ClientBase):
    def __init__(self, db, app):
        ClientBase.__init__(self, db, app)

        self.service = Service(RemoteProcedure, db, app)

This is the worker service that will process the tasks.

class AsyncService(ServiceBase):
    @rpc(Integer)
    def sleep(ctx, integer):
        print "Sleeping for %d seconds..." % (integer)
        time.sleep(integer)

And this event is here to do some logging.

def _on_method_call(ctx):
    print "This is worker id %d, processing task id %d." % (
                           ctx.transport.consumer_id, ctx.transport.task_id)

AsyncService.event_manager.add_listener('method_call', _on_method_call)

It’s now time to deploy our service. We start by configuring the logger and creating the necessary sql tables:

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    logging.getLogger('sqlalchemy.engine.base.Engine').setLevel(logging.DEBUG)

    metadata.create_all()

We then initialize our application:

application = Application([AsyncService], 'rpclib.async',
        interface=Wsdl11(), in_protocol=Soap11(), out_protocol=Soap11())

And queue some tasks:

producer = Producer(db, application)
for i in range(10):
    producer.service.sleep(i)

And finally start the one worker to consume the queued tasks:

consumer = Consumer(db, application, 1)
consumer.serve_forever()

That’s about it! You can switch to another database engine that accepts multiple connections and insert tasks from another connection to see the consumer in action. You could also start other workers in other processes to see the pub-sub functionality.

What’s Next?

Start hacking! Good luck, and be sure to pop out to the mailing list if you have questions.

Table Of Contents

Previous topic

Working with RPC Metadata

Next topic

Rpclib API Reference

This Page