Real-Time Stream Processing at InMobi- Part 4

Satish Mittal

Satish Mittal on October 14, 2015

This is the fourth blog in the series of blogs regarding real time stream processing at InMobi. The first blog provided insight into the variety of company-wide use-cases that provided the motivation for stream processing at InMobi. The second blog provided a brief overview of both Storm and Spark Streaming platforms that is required for effective comparison. The third blog introduced various criteria on which any large scale enterprise should look to evaluate any stream processing platform.

In the last and final edition of this series, we provide our findings on the various evaluation criteria along with the final recommendation.

Evaluation Findings:

1. Processing Model:

Storm:

Storm supports true streaming processing model (via Core storm layer) in strictest sense, with an additional micro-batching model (via Trident Storm layer). Hence it can support the entire spectrum of use-cases.

Spark streaming:

Spark Streaming is a wrapper over Spark (Batch processing) framework. It internally converts the incoming stream into small micro-batches, which are then handed over to Spark framework for processing. Thus Spark streaming supports batch processing model in strictest sense, which approximates stream processing by performing micro-batching. Hence if any application needs event driven processing (that is process each message as soon as it arrives), it won’t be possible. The downstream applications are also forced to consume inputs as batches. The size of input batch can be tuned via configuration.

2. Primitives:

Storm:

Core storm provides a rich set of primitives to perform tuple level processing within a stream (filters, functions). Aggregations across messages in a stream can be achieved through group by semantics. Similarly join operations can be performed across streams - it supports left, right, inner join (default). One can also perform multi-join operations, and joins with phase lag. A rich set of groupings allow setting the message partitioning scheme from parent spout/bolt to downstream bolt.

Storm Trident abstraction allows developer to define their own pluggable operators. It also adds primitives for doing stateful, incremental processing on top of any database or external store.

Spark streaming:

Spark streaming provides two rich categories of operators - stream transformation operators that convert one DStream into another, and output operators that write data to external systems. The former includes stateless operators (filter, map, mapPartitions, union, distinct and so on) as well as stateful window operators (countByWindow, reduceByWindow and so on). The windowing operations are particularly powerful, as they allow to apply transformations over a sliding window of data.

Spark streaming also allows application to maintain arbitrary state while continuously updating it with new information. Finally, it allows combining batch processing along with streaming, by performing arbitrary Spark RDD computation with DStream. For example one can join DStream with a static file.

3. State management:

Storm:

Core storm by default doesn’t provide any framework level support to store any intermediate bolt output (result of user operation) as state. Hence, any application needs to explicitly create/update its own state as and when needed.

On the other hand, Storm Trident layer has exposed StateFactory and StateUpdater interfaces that provide complete flexibility to an application developer to create state at the store of his/her choice (such as in-memory, relational DB, K-V store and so on ), and update the state with each batch of messages. In addition, there are multiple out-of-box state implementations available (for example HBaseState, HDFSState, HiveState, MemoryMapState, RedisState, JDBCState, TridentKafkaState and so on ).

Spark Streaming:

The underlying Spark by default treats output of each RDD operation (stateless/stateful) as intermediate state and stores it as RDD. Spark Streaming explicitly allows maintaining and updating state via updateStateByKey API. A pluggable method could not be found to implement state in the external system.

However, note that if any state updates are performed via updateStateByKey API, then application must enable data checkpointing on a fault-tolerant, reliable file system (such as HDFS, S3 and so on ). In this case, Spark streaming will periodically perform async writes to file system. In general, checkpointing needs to be enabled if application uses any stateful transformation operator. However, in case of multiple nodes going down before async write happens, the entire checkpointed RDDs stored in-memory will be lost.

Also, since all RDD’s are stored in-memory with replication, applications need to provision enough RAM to hold the state. In addition, it needs to optimize more in terms of storing serialized data (which increases latency in serialization/deserialization), clearing unwanted data (especially in case of window operators) and tune JVM to minimize GC related pauses.

4. Message delivery guarantees (Handling message level failures):

Storm:

Storm supports all 3 message processing guarantees: at-least once, at-most once and exactly once. Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.

Core storm guarantees that every spout tuple will be fully processed by topology. It does this by tracking the tree of tuples triggered by every spout tuple and determining when that tree of tuples has been successfully completed. Hence Core storm provides at-least once processing semantics by default. The same APIs also allows developer to achieve at-most once processing by controlling the acking of tuples.

Exactly-once processing semantics can be achieved via Storm Trident abstraction. The message failure detection mechanism is the same as in case of at-least-once. Tuples are actually processed at-least-once. However, Storm’s state implementation detects duplicates and ignores.

We found that Trident has approached exactly once mechanism in a systematic, well-defined manner that is simple, easy to understand and has clutter-free design. There are clearly laid out requirements/expectations from framework, spouts and states for collectively achieving the same.

Spark Streaming:

Spark Streaming tries to define its fault tolerance semantics in terms of the guarantees provided by the receivers (reliable/unreliable) and output operators. As per the architecture of Spark Streaming, the incoming data from receivers is read and replicated among multiple Spark executors in worker nodes in the cluster. This leads to multiple failure scenarios in terms of data received but may not be replicated. Further the impact of failure scenarios become quite different in case of worker vs driver node failures.

Prior to Spark 1.2, if the worker node where a network receiver was running fails, then data may be lost. If the driver node fails, then all executors with their received and replicated in-memory data is lost. To avoid this loss of past received data, Spark 1.2 introduced write ahead logs which save the received data to fault-tolerant storage (HDFS, S3 and so on ). With the write ahead logs enabled and reliable receivers, there is zero data loss.

With WAL approach, the entire streaming data from any reliable receiver would first get written on to HDFS.Hence, it is quite inefficient due to unnecessary replication overheads involved. Also, it brings in additional points of failure. It reduces overall receiver throughput; the documentation recommends to add more receivers.

Considering the high traction of Kafka integration in community, Spark 1.3 release provided a special implementation for Kafka based receivers called Kafka Direct API, which tries to ensure that all the Kafka data is received by Spark Streaming exactly once. The documentation calls this approach as experimental as of 1.4.1 release. Also, it is not generic for all network receivers.

Overall, the current state of message delivery semantics in Spark Streaming doesn’t appear to be stable, simple, and generic enough. Architecturally it seems to take complex design approach on case-by-case basis, given that the streaming layer has to retrofit into underlying batch oriented Spark design.

5. Fault Tolerance (Handling process/node level failures):

Storm:

Storm is designed with fault-tolerance at its core. Storm daemons (Nimbus and Supervisor) are designed to be fail-fast (that means process self-destructs whenever any unexpected situation is encountered) and stateless (all state is kept in Zookeeper or on disk). If these daemons are running under supervision, they restart as if nothing happened! Most notably, no running worker processes are affected by the death of Nimbus or the Supervisors. Thus one can even issue ‘kill -9’ to them without impacting any running topology.

When a worker dies, the supervisor will restart it. If it continuously fails on startup and is unable to heartbeat to Nimbus, Nimbus will reassign the worker to another machine. When a node dies, all tasks assigned to that machine will time-out and Nimbus will reassign those tasks to other machines.

Spark Streaming:

The Driver Node (equivalent of JT) is a SPOF.If the driver node fails, then all executors with their received and replicated in-memory data is lost. In order to reliably recover from driver failure, Metadata checkpointing is needed. This involves storing following metadata to fault-tolerant storage: configuration of streaming application, set of DStream operations in application, Batches whose jobs are queued but have not completed yet.

Each streaming application has to be rewritten in a way that checkpoint information can be used for driver failure recovery (that is re-create streaming context from checkpointed data).

6. Debuggability and monitoring:

Storm:

Storm UI supports visualization of each topology; with complete break-up of internal spouts and bolts. UI also provides information about any errors happening in tasks and fine-grained stats on the throughput and latency of each component of each running topology.It helps in debugging issues at high level, for example as soon as an error happens in a topolog, stack trace is displayed.

Metric based monitoring: Storm’s built-in metrics feature provides framework level support for applications to emit any metrics, which can then be easily integrated with external metrics/monitoring systems (Graphite, CloudWatch, Librato Metrics, and so on).

Worker logs are another source of valuable information for troubleshooting topology errors.

Spark Streaming:

Spark web UI shows an additional Streaming tab which shows statistics about running receivers (whether receivers are active, number of records received, receiver error, and so on .) and completed batches (batch processing times, queueing delays, and so on). This can be used to monitor the progress of the streaming application.

The following two metrics in web UI are particularly important for tuning the batch size:

  • Processing Time - The time to process each batch of data.
  • Scheduling Delay - The time a batch waits in queue for the processing of previous batches to finish.

Also, Spark Streaming provides StreamingListener interface as part of its developer monitoring API, which allows to get receiver status and processing times.

7. Auto scaling:

Storm:

Storm supports configuring initial parallelism at various levels per topology - number of worker processes, number of executors, number of tasks. In addition, it supports dynamic rebalancing, which allows to increase or decrease the number of worker processes and/or executors without being required to restart the cluster or the topology. However, the number of initial tasks configured remain constant throughout the lifetime of topology.

Once all supervisor nodes are fully saturated with worker processes, and there is a need to scale out, one simply needs to start a new supervisor node and pointing it to cluster wide Zookeeper. All resources on the newly added supervisor will then be available for scheduling.

It is possible to automate the logic of monitoring the current resource consumption on each node in a Storm cluster, and dynamically add more resources. STORM-594 describes such auto-scaling mechanism using a feedback system.

In our experiments, we observed that Storm tries to spread out topologies nicely across cluster in a balanced manner. for example, when a topology task is to be scheduled, it first tries to spawn any new worker across different supervisors (nodes) if needed; else spawns new worker within same supervisor (node) if needed; else would run tasks within an existing worker. Similarly, when topology is shutdown, worker processes seamlessly scale down.

Spark Streaming:

At the moment, elastic scaling of Spark streaming applications is not supported. Refer to the detailed discussion in Spark Streaming community on this topic.

Essentially, dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or earlier). The reason is that currently the receiving topology is static. The number of receivers is necessarily fixed. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed. Killing receivers results in stopping the topology.

The community is currently brainstorming on what needs to be done to provide some semblance of dynamic scaling to streaming applications.

Note that as a special case for Kafka sources, the above limitations don’t apply since it works with Kafka Direct API.

8. YARN integration:

Storm:

The Storm integration with YARN is recommended through Apache Slider. Slider is a YARN application that aims to seamlessly deploy non-YARN distributed applications on a YARN cluster. It interacts with YARN RM to spawn containers for various ‘components’ of the distributed application and then manages the lifecycle of those containers. Slider provides out-of-box application packages for Storm.

We tried out the Storm-Slider integration in our setups and observed that there are currently some limitations in this regard. Firstly, it led to very large container requirements since we wanted to completely utilize all available resources on a machine. Also, currently all worker logs get scattered across various container directories. There is no centralized cluster level logging available at one place, making debugging difficult. In addition, there is No easy discoverability of Daemons (for example the UI) since they can fail-over to any node in YARN cluster. There is also some amount of overhead that Slider AM will bring with itself.

Spark Streaming:

Spark framework provides native integration with Yarn. Spark streaming being a layer over Spark, simply leverages the integration. Each Spark streaming application gets spawned as a separate Yarn application. The AM container runs the Spark Streaming driver and initializes the Spark streaming context. All executors (and receivers) run as Yarn containers managed by AM. The AM then periodically submits one job per micro-batch on these containers.

9. Isolation:

Storm:

Each worker process runs executors for a specific topology only, that is mixing of different topology tasks is not allowed at worker process level. This provides topology level runtime isolation. Further, each executor thread runs one or more tasks of the same component (spout or bolt), that is no mixing of tasks across components.

However, in a shared cluster setup, a topology that needs large number of tasks, and hence acquires more resources (CPU cores, memory) can get submitted along with a smaller topology. To handle this, Storm 0.8.2 introduced Isolation Scheduler, which provides multi-tenancy across topologies within a cluster and avoids resource contention among them. It allows to specify which topologies should be “isolated”, that is run on a dedicated set of machines within cluster (no other topology). All isolated topologies are given higher priority than non-isolated ones. If needed, take resources from non-isolated.

Storm allows users to plug in custom scheduler in order to apply custom scheduling policy. The custom scheduler is required to implement the IScheduler interface. The default schedulers that come with Storm schedule tasks in a round-robin fashion with disregard to resource demands and availability. STORM-893 captures the efforts around adding resource aware scheduling in Storm.

Spark Streaming:

Each Spark Streaming job is a separate application submitted on YARN cluster, where each executor runs in a separate container. Hence JVM level isolation is provided by Yarn since two different topology can’t execute in same JVM. Additionally, YARN provides resource level isolation since container level resource constraints (CPU, memory limits) can be configured.

The entire resource aware scheduling of executor containers is delegated onto YARN. This allows different streaming applications to ask for different container sizes, without worrying about cluster-wide resource availability.

10. Active open-source community/adoption level:

Storm:

Storm Powered-By page boasts of an extremely impressive and healthy list of companies that are running Storm in production for diverse use-cases: real-time analytics, NLP, data normalizations and ETL; scalable low-latency and high performance processing, in various domains (ad-tech and so on ).

Many of them are large-scale internet deployments that are really pushing the boundaries in terms of performance and scale. For example Yahoo deployment consists of 2,300 nodes running Storm for near-real-time event processing, with largest topology spanning across 400 nodes.

The Storm community is quite active and rapidly rolling out new feature sets and making frequent new releases.

Spark Streaming:

Spark Streaming is still emerging and has limited experience in production clusters, going by the Spark powered by list.

However the overall umbrella Spark community is easily one of the largest and the most active open source communities out there today. In the latest 1.4.1 release, more than 210 contributors from 70 different organizations contributed more than 1000 patches. The overall charter is rapidly evolving given the large developer base. This should lead to maturity of Spark Streaming in the near future.

11. Ease of development:

Storm:

Storm provides really simple, rich and intuitive APIs that easily describe the DAG nature of processing flow (topology). The Storm tuples, which provide the abstraction of data flowing between nodes in the DAG, are dynamically typed. The motivation there is to simplify the APIs for ease of use. Any new custom tuple type can be plugged in after registering its Kryo serializer.

Developers can start with writing topologies and run them in local cluster mode. In local mode, threads are used to simulate worker nodes, allowing developer to set breakpoints, halt execution, inspect variables, and profile their topology before deploying it to a distributed cluster where all of this is much more difficult.

Spark Streaming:

Spark Streaming offers Scala and Java APIs that have more of a functional programming touch (transformation of data). As a result, the topology code is much more concise. There is a rich set of API documentation and illustrative samples available for ease of developer.

12. Ease of Operability:

Storm:

It is fairly easy to deploy Storm artifacts through various tools (puppets, and so on ) and bring up cluster. Storm has a dependency on Zookeeper cluster to achieve coordination across clusters, and store state and statistics. It provides CLI support to perform actions like submit, activate, deactivate, list, kill topologies etc. A strong fault tolerance means any daemon downtime doesn’t impact running topologies.

In standalone mode, Storm daemons (Nimbus and supervisors) need to be run in supervised mode. In YARN cluster mode, Storm daemons will be spawned as containers and managed by Application Master (Slider).

Spark Streaming:

Spark streaming uses Spark as the underlying execution framework. It should be fairly simple to bring up Spark cluster on YARN. There are multiple deployment requirements. Typically checkpointing would need to be enabled, at least for fault tolerance of application driver. This would bring a dependency on fault-tolerant storage (typically HDFS).

Since the received data must be stored in memory, the executors must be configured with sufficient memory to hold the received data (for example, if we are doing 10 minute window operations, the system has to keep at least last 10 minutes of data in memory). The application driver needs to be automatically restarted by YARN. For receivers other than Kafka, one needs to configure write-ahead logs (again writing data to HDFS).

13. Application level Replay of Events:

At times, the stream processing application may want to go back in time and reprocess a certain set of input messages (For example, there are some deployment failures, or any bug fix needs to be applied, or a certain release simply needs to be rolled back). This can be done only if the underlying streaming source system periodically checkpoints the offsets in a durable shared storage (for example HDFS/zookeeper), and allows rewinding those offsets through some external tool. Assuming this is available, in case of both Storm and Spark Streaming, we need to first stop the currently running topology, rewind the input source offset, and then restart the topology.

Evaluation Summary:

Based on the details provided above, we provide a tabular summary of our findings on various criteria. The findings are rated on a 5-point scale.

Criteria Storm Spark Streaming
Processing Model Very Good Poor
Primitives Good Very Good
State management Good Good
Message delivery guarantees Very Good Average
Fault Tolerance Very Good Average
Debuggability and monitoring Good Good
Auto scaling Very Good Very Poor
YARN Integration Average Very Good
Isolation Average Very Good
Active open-source community Average Very Good
Industry adoption level Very Good Poor
Ease of development Good Good
Ease of operability Very Good Good
Application level replay of events Average Average


Recommendation:

Both Storm and Spark Streaming are at different levels of maturity, adoption and evolution. Storm has already proven its worth in numerous large scale production deployments. The Storm Powered-by page boasts of an impressive list of adopters running Storm for diverse use-cases. With its native stream processing paradigm, Storm seems to be a perfect choice for those applications which have very low latency processing requirements. At the same time, it also provides high-level Trident APIs that are meant for applications looking for stream processing with higher throughput. The simple and robust architecture provides exactly once semantics that is intuitive enough to understand. The Fault-tolerance of Storm provides the required stability, ease of operability much needed in production.

On the other hand, Spark Streaming builds over Spark as its underlying processing platform, and thus essentially performs micro-batch processing. However, Spark is rapidly gaining adoption as the de-facto batch processing platform over Hadoop, and has a highly active community. Coupled with the fact that Spark ecosystem provides ML and graphLib APIs, Spark Streaming might be more suitable for those applications that don't necessarily need very low latency and exactly-once delivery semantics, but might want to leverage the offerings of Spark ecosystem.

After carefully analyzing the pros and cons of both, the team at InMobi has decided to standardize on Storm as the stream processing platform.

Previous Blogs: