Azure EventHub is a real-time event streaming and ingestion service offered by Microsoft. Azure provides SDKs to send and receive events from EventHub. This article focusses on how to send events to EventHub using its JAVA package azure-messaging-eventhubs
.
azure-messaging-eventhubs
has EventHubProducerClient
which provides the functionality to send events to EventHub.
Making a send call to the EventHub for sending a single event is not efficient in terms of performance and throughput. This is because each call to the EventHub is an IO operation, and it has some latency associated with it. For a system running at a large scale requiring high throughput, it’s crucial to send the events to EventHub efficiently and in a manner that doesn’t act as a bottleneck to the system processing.
Hence, it’s recommended to send a batch of events in a single call to the EventHub instead of one at a time. Thus, batching of events is imperative.
Batching is creating a collection of events and storing them as they arrive until the batch size is full.
Three main challenges with dealing with batches of events are:
You don’t want the events in the incomplete batches to remain idle for an indefinite period of time. Even with an incomplete batch you would still want to push it to EventHub after waiting for a while.
You don’t want to have inconsistent batches in the case of multithreaded applications.
You don’t want to mix up events in the batches if your application has multiple EventHubs where it's sending the events.
To handle all these challenges efficiently, let’s see how Object Pooling can come to the rescue.
Before jumping into the solution, let’s understand what Object Pooling is.
It's a way to enhance the performance of a process in terms of speed and resource usage.
Let’s understand this in a simpler way – borrowing and reusing things are cheaper than buying a new one. The same phenomenon applies to a program/process that requires an object. It will borrow the object from the object pool instead of instantiating a new one, use it and then return it back to the pool.
In technical terms, this can boost performance which is especially effective when:
The cost of initializing a class instance is high.
The rate of instantiation of a class is high.
The number of instantiations in use at any particular time is low.
An important thing to note here (and to add to the notion of borrowing is cheaper) is that we don’t want a process to wait for an object to get returned to the pool if all objects are currently in use at a given time. So, the object pool also instantiates new objects as and when required but must also implement a way to clean up the unused objects periodically.
Object pooling is widely used in DB connections pooling, HTTP connection pooling, etc. since it doesn’t make sense to create a new connection every time the client fires a request.
Now, let's discuss the implementation of object pooling and how can we use this pattern for our use case of batching events.
Apache commons-pool is an open-source library that provides an object pooling API i.e., PooledObjectFactory
provides a generic interface for managing the lifecycle of a pooled object and a number of object pool implementations. For our use case, we will use the most generic implementation of the object pool, GenericObjectPool
.
The following lifecycle methods are provided:
public interface PooledObjectFactory
// Initialise an instance to be returned by the pool
activateObject(PooledObject
// Destroys an instance no longer needed by the pool
destroyObject(PooledObject
destroyObject(PooledObject
// Creates an instance that can be returned by the pool
makeObject()
// Uninitialize an instance to be returned to the pool
passivateObject(PooledObject
// Validate the object before using it
validateObject(PooledObject
}
ObjectPool
that defines the following methods for managing and monitoring the pool:// Obtain an instance from my pool
Object borrowObject() throws Exception;
// Return an instance to my pool
void returnObject(Object obj) throws Exception;
// Invalidates an object from the pool
void invalidateObject(Object obj) throws Exception;
// Used for pre-loading a pool with idle objects
void addObject() throws Exception;
// Return the number of idle instances
int getNumIdle() throws UnsupportedOperationException;
// Return the number of active instances
int getNumActive() throws UnsupportedOperationException;
// Clears the idle objects
void clear() throws Exception, UnsupportedOperationException;
// Close the pool
void close() throws Exception;
//Set the ObjectFactory to be used for creating instances
void setFactory(PoolableObjectFactory factory) throws
IllegalStateException,
UnsupportedOperationException;
Since GenericObjectPool
implements all the object methods required, you don’t need to worry a lot about the details of implementation.
It provides a wide variety of configuration options, including the ability to cap the number of idle or active instances, evict instances as they sit idle in the pool, etc.
When coupled with the appropriate PooledObjectFactory
, GenericObjectPool
provides robust pooling functionality for arbitrary objects.
However, it’s important to note that the success of a pooling strategy depends on how well you configure the pool.
Let’s discuss briefly some of the important Object Pool configurations:
maxIdle
: The upper limit on the number of “idle” instances in the pool. Use a negative value to indicate an unlimited number of idle instances.
maxTotal
: The upper limit on the total number of object instances managed by the pool. Negative values mean that there is no limit to the number of objects allocated by the pool.
minEvictableIdleTime
: Minimum amount of time an object may sit idle in the pool before it is eligible for eviction by the idle object evictor thread.
timeBetweenEvictionRuns
: Sets the number of milliseconds to sleep between runs of the idle object evictor thread. When non-positive, no idle object evictor thread runs.
NOTE: Caution should be used when configuring this optional feature. Eviction runs contend with client threads for access to objects in the pool, so if they run too frequently performance issues may occur.
For other configurations, you can refer to the official doc or you can use their default values since it’s a good starting point.
The idea is to create a pool of producer objects, each responsible for creating its own batch of events and pushing them to EventHub as soon as the batch gets full, or it stays idle for a threshold period of time.
Therefore, the structure will contain the following components:
ProducerObjectFactory
: This will implement PooledObjectFactory
and provide the implementation for the lifecycle methods.
//ProducerObjectFactory.java
public class ProducerObjectFactory implements PooledObjectFactory
private final EventHubProducerClient eventHubProducerClient;
private final CreateBatchOptions createBatchOptions;
public ProducerObjectFactory(EventHubProducerClient
eventHubProducerClient,
CreateBatchOptions
createBatchOptions) {
this.eventHubProducerClient = eventHubProducerClient;
this.createBatchOptions = createBatchOptions;
}
@Override
public PooledObject
return new DefaultPooledObject<>(new EventProducer(eventHubProducerClient, createBatchOptions));
}
@Override
public void destroyObject(PooledObject pooledObject) throws Exception {
if(pooledObject.getObject() instanceof EventProducer){
EventProducer eventProducer = (EventProducer) pooledObject.getObject();
eventProducer.close();
}
}
@Override
public boolean validateObject(PooledObject pooledObject) {
return pooledObject.getObject() instanceof EventProducer;
}
@Override
public void activateObject(PooledObject pooledObject) throws Exception {
}
@Override
public void passivateObject(PooledObject pooledObject) throws Exception {
}
}
ProducerPool
: This will extend GenericObjectPool
and will be used to manage and monitor the pool.
//ProducerPool.java
public class ProducerPool extends GenericObjectPool
public ProducerPool(ProducerObjectFactory objFactory,
GenericObjectPoolConfig
super(objFactory, genericObjectPoolConfig);
}
@Override
public EventProducer borrowObject() throws Exception {
return super.borrowObject();
}
@Override
public void returnObject(EventProducer eventProducer){
super.returnObject(eventProducer);
}
}
EventProducer
: The object that will be pooled. This will contain a batch of events as a class variable and provide the functionality to add events to its batch and send the batch to EventHub.
EventProducerManager
: This will take care of borrowing and returning EventProducer
objects from the pool and provide an abstraction to the users to send events to EventHub. This will be a singleton class.
NOTE: We will be using a singleton EventHubProducerClient
that all the pooled objects (EventProducer
) will be using so that we use a single client connection to EventHub when sending events instead of unnecessarily creating multiple connections to EventHub.
To send an event to EventHub, the EventProducerManager.sendEvent(Event)
will be invoked.
//EventProducerManager.java
public void sendEvent(EventData eventData) {
EventProducer eventProducer = null;
try {
eventProducer = producerPool.borrowObject();
eventProducer.send(eventData);
} catch (Exception e) {
log.error("Unknown error while pooling from producer pool:", e);
} finally {
if(eventProducer != null) {
producerPool.returnObject(eventProducer);
}
}
}
EventProducerManager.sendEvent(Event)
calls the producerPool.borrowObject()
to get an EventProducer
object from the ProducerPool
.
Then the EventProducer.sendToEh(Event)
will be called, which will try to add the event to its eventDataBatch
and send the batch to EventHub via EventHubProducerClient.send(EventDataBatch)
if it’s full. Post that, the eventDataBatch
will be flushed to make it ready for accepting events for a new batch. Finally, the EventProducer
object will be returned to the pool.
//EventProducer.java
private void send() {
if(eventDataBatch.getCount() > 0){
try {
eventHubProducerClient.send(eventDataBatch);
} catch (Exception e) {
log.error("Exception: ", e);
}
eventDataBatch =
eventHubProducerClient.createBatch();
}
}
public boolean sendToEh(EventData eventData) {
boolean isPushed = false;
if (eventDataBatch.getCount() > batchMaxSizeLimit) {
send();
isPushed = true;
}
if (!eventDataBatch.tryAdd(eventData)) {
send();
isPushed = true;
if (!eventDataBatch.tryAdd(eventData)) {
log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
}
}
return isPushed;
}
If the batch is not full, it will not do anything and return the EventProducer
object back to the pool.
Asynchronously, the “idle-evictor
” thread will run at intervals and check for any idle EventProducer
object eligible for eviction. In the process of eviction, it will first send the existing batch of the object to EventHub via publishRemainingEvents()
, flush the batch, and then destroy the object from the pool.
//EventProducer.java
public void publishRemainingEvents(){
if(eventDataBatch.getCount() > 0) {
send();
}
}
@Override
public void close() {
publishRemainingEvents();
}
Here, we discussed the Apache Commons Pool framework and how to use it to design a solution for batching events to efficiently send them to EventHub achieving high performance and throughput. I have also touched upon the Object Pooling Pattern in general and how can it be used for resource pooling. Though the blog takes into account Java specific implementation, the idea discussed above can be used in other languages and frameworks too.
Sign up with your email address to receive news and updates from InMobi Technology