Azure EventHub Producer: Efficient Batching with Object Pooling

Aman Gupta
Aman Gupta
5 min read
Posted on June 13, 2023
Azure EventHub Producer: Efficient Batching with Object Pooling

Azure EventHub is a real-time event streaming and ingestion service offered by Microsoft. Azure provides SDKs to send and receive events from EventHub. This article focusses on how to send events to EventHub using its JAVA package azure-messaging-eventhubs.

azure-messaging-eventhubs has EventHubProducerClient which provides the functionality to send events to EventHub. 

Making a send call to the EventHub for sending a single event is not efficient in terms of performance and throughput. This is because each call to the EventHub is an IO operation, and it has some latency associated with it. For a system running at a large scale requiring high throughput, it’s crucial to send the events to EventHub efficiently and in a manner that doesn’t act as a bottleneck to the system processing.

Hence, it’s recommended to send a batch of events in a single call to the EventHub instead of one at a time. Thus, batching of events is imperative.

Batching is creating a collection of events and storing them as they arrive until the batch size is full.

Three main challenges with dealing with batches of events are:

  1. You don’t want the events in the incomplete batches to remain idle for an indefinite period of time. Even with an incomplete batch you would still want to push it to EventHub after waiting for a while.

  2. You don’t want to have inconsistent batches in the case of multithreaded applications.

  3. You don’t want to mix up events in the batches if your application has multiple EventHubs where it's sending the events.

To handle all these challenges efficiently, let’s see how Object Pooling can come to the rescue. 

Before jumping into the solution, let’s understand what Object Pooling is. 

Object Pooling

It's a way to enhance the performance of a process in terms of speed and resource usage.

Let’s understand this in a simpler way – borrowing and reusing things are cheaper than buying a new one. The same phenomenon applies to a program/process that requires an object. It will borrow the object from the object pool instead of instantiating a new one, use it and then return it back to the pool.

In technical terms, this can boost performance which is especially effective when:

  • The cost of initializing a class instance is high.

  • The rate of instantiation of a class is high.

  • The number of instantiations in use at any particular time is low.

An important thing to note here (and to add to the notion of borrowing is cheaper) is that we don’t want a process to wait for an object to get returned to the pool if all objects are currently in use at a given time. So, the object pool also instantiates new objects as and when required but must also implement a way to clean up the unused objects periodically.

Object pooling is widely used in DB connections pooling, HTTP connection pooling, etc. since it doesn’t make sense to create a new connection every time the client fires a request.

Now, let's discuss the implementation of object pooling and how can we use this pattern for our use case of batching events.

Apache Commons Pool

Apache commons-pool is an open-source library that provides an object pooling API i.e., PooledObjectFactory provides a generic interface for managing the lifecycle of a pooled object and a number of object pool implementations. For our use case, we will use the most generic implementation of the object pool, GenericObjectPool.

PooledObjectFactory

It provides an interface to manage the lifecycle of the pooled object and also provides helper methods to manage, monitor, and extend the pool.

The following lifecycle methods are provided: 

public interface PooledObjectFactory {

// Initialise an instance to be returned by the pool
    activateObject(PooledObject)
    
// Destroys an instance no longer needed by the pool
    destroyObject(PooledObject)
    destroyObject(PooledObject, DestroyMode)
    
// Creates an instance that can be returned by the pool
    makeObject()
    
// Uninitialize an instance to be returned to the pool
    passivateObject(PooledObject)
    
// Validate the object before using it
    validateObject(PooledObject)
}

GenericObjectPool

It’s a configurable implementation of a core interface ObjectPool that defines the following methods for managing and monitoring the pool:

// Obtain an instance from my pool
Object borrowObject() throws Exception;

// Return an instance to my pool
void returnObject(Object obj) throws Exception;

// Invalidates an object from the pool
void invalidateObject(Object obj) throws Exception;

// Used for pre-loading a pool with idle objects
void addObject() throws Exception;

// Return the number of idle instances
int getNumIdle() throws UnsupportedOperationException;

// Return the number of active instances
int getNumActive() throws UnsupportedOperationException;

// Clears the idle objects
void clear() throws Exception, UnsupportedOperationException;

// Close the pool
void close() throws Exception;

//Set the ObjectFactory to be used for creating instances
void setFactory(PoolableObjectFactory factory) throws
IllegalStateException,
UnsupportedOperationException;

Since GenericObjectPool implements all the object methods required, you don’t need to worry a lot about the details of implementation. 

It provides a wide variety of configuration options, including the ability to cap the number of idle or active instances, evict instances as they sit idle in the pool, etc.

When coupled with the appropriate PooledObjectFactory, GenericObjectPool provides robust pooling functionality for arbitrary objects.

However, it’s important to note that the success of a pooling strategy depends on how well you configure the pool. 

Let’s discuss briefly some of the important Object Pool configurations: 

  • maxIdle: The upper limit on the number of “idle” instances in the pool. Use a negative value to indicate an unlimited number of idle instances.

  • maxTotal: The upper limit on the total number of object instances managed by the pool. Negative values mean that there is no limit to the number of objects allocated by the pool.

  • minEvictableIdleTime: Minimum amount of time an object may sit idle in the pool before it is eligible for eviction by the idle object evictor thread.

  • timeBetweenEvictionRuns: Sets the number of milliseconds to sleep between runs of the idle object evictor thread. When non-positive, no idle object evictor thread runs. 

NOTE: Caution should be used when configuring this optional feature. Eviction runs contend with client threads for access to objects in the pool, so if they run too frequently performance issues may occur.

For other configurations, you can refer to the official doc or you can use their default values since it’s a good starting point.

Implementing Producer

The idea is to create a pool of producer objects, each responsible for creating its own batch of events and pushing them to EventHub as soon as the batch gets full, or it stays idle for a threshold period of time.

Therefore, the structure will contain the following components: 

ProducerObjectFactory: This will implement PooledObjectFactory and provide the implementation for the lifecycle methods.

//ProducerObjectFactory.java

public class ProducerObjectFactory implements PooledObjectFactory {

    private final EventHubProducerClient eventHubProducerClient;
    private final CreateBatchOptions createBatchOptions;

    public ProducerObjectFactory(EventHubProducerClient
                                 eventHubProducerClient,
                                 CreateBatchOptions
                                 createBatchOptions) {
        this.eventHubProducerClient = eventHubProducerClient;
        this.createBatchOptions = createBatchOptions;
    }

    @Override
    public PooledObject makeObject() throws Exception {
        return new DefaultPooledObject<>(new EventProducer(eventHubProducerClient, createBatchOptions));
    }

    @Override
    public void destroyObject(PooledObject pooledObject) throws Exception {
        if(pooledObject.getObject() instanceof EventProducer){
            EventProducer eventProducer = (EventProducer) pooledObject.getObject();
            eventProducer.close();
        }
    }

    @Override
    public boolean validateObject(PooledObject pooledObject) {
        return pooledObject.getObject() instanceof EventProducer;
    }

    @Override
    public void activateObject(PooledObject pooledObject) throws Exception {
    }

    @Override
    public void passivateObject(PooledObject pooledObject) throws Exception {
    }
}

ProducerPool: This will extend GenericObjectPool and will be used to manage and monitor the pool.

//ProducerPool.java

public class ProducerPool extends GenericObjectPool {

    public ProducerPool(ProducerObjectFactory objFactory,
                        GenericObjectPoolConfig genericObjectPoolConfig) {
        super(objFactory, genericObjectPoolConfig);
    }

    @Override
    public EventProducer borrowObject() throws Exception {
        return super.borrowObject();
    }

    @Override
    public void returnObject(EventProducer eventProducer){
        super.returnObject(eventProducer);
    }
}

EventProducer: The object that will be pooled. This will contain a batch of events as a class variable and provide the functionality to add events to its batch and send the batch to EventHub.

EventProducerManager: This will take care of borrowing and returning EventProducer objects from the pool and provide an abstraction to the users to send events to EventHub. This will be a singleton class.

NOTE: We will be using a singleton EventHubProducerClient that all the pooled objects (EventProducer) will be using so that we use a single client connection to EventHub when sending events instead of unnecessarily creating multiple connections to EventHub.

Things in Action

To send an event to EventHub, the EventProducerManager.sendEvent(Event) will be invoked.

//EventProducerManager.java
public void sendEvent(EventData eventData) {
    EventProducer eventProducer = null;
    try {
        eventProducer = producerPool.borrowObject();
        eventProducer.send(eventData);
    } catch (Exception e) {
        log.error("Unknown error while pooling from producer pool:", e);
    } finally {
        if(eventProducer != null) {
            producerPool.returnObject(eventProducer);
        }
    }
}

EventProducerManager.sendEvent(Event) calls the producerPool.borrowObject() to get an EventProducer object from the ProducerPool.

Then the EventProducer.sendToEh(Event) will be called, which will try to add the event to its eventDataBatch and send the batch to EventHub via EventHubProducerClient.send(EventDataBatch) if it’s full. Post that, the eventDataBatch will be flushed to make it ready for accepting events for a new batch. Finally, the EventProducer object will be returned to the pool.

//EventProducer.java

private void send() {
    if(eventDataBatch.getCount() > 0){
            try {
                eventHubProducerClient.send(eventDataBatch);
            } catch (Exception e) {
                log.error("Exception: ", e);
            }
        eventDataBatch = 
        eventHubProducerClient.createBatch();
    }
}

public boolean sendToEh(EventData eventData) {
    boolean isPushed = false;
    if (eventDataBatch.getCount() > batchMaxSizeLimit) {
        send();
        isPushed = true;
    }

    if (!eventDataBatch.tryAdd(eventData)) {
        send();
        isPushed = true;
        if (!eventDataBatch.tryAdd(eventData)) {
            log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
        }
    }
    return isPushed;
}

If the batch is not full, it will not do anything and return the EventProducer object back to the pool.

Asynchronously, the “idle-evictor” thread will run at intervals and check for any idle EventProducer object eligible for eviction. In the process of eviction, it will first send the existing batch of the object to EventHub via publishRemainingEvents(), flush the batch, and then destroy the object from the pool.

//EventProducer.java

public void publishRemainingEvents(){
    if(eventDataBatch.getCount() > 0) {
        send();
    }
}

@Override
public void close() {
    publishRemainingEvents();
}

Conclusion

Here, we discussed the Apache Commons Pool framework and how to use it to design a solution for batching events to efficiently send them to EventHub achieving high performance and throughput. I have also touched upon the Object Pooling Pattern in general and how can it be used for resource pooling. Though the blog takes into account Java specific implementation, the idea discussed above can be used in other languages and frameworks too.

Resources