Apache Storm key takeaways…

Hadoop moves the code to the data, Storm moves the data to the code. This behavior makes more sense in a stream-processing system, because the data set isn’t known beforehand, unlike in a batch job. Also, the data set is continuously flowing through the code.

A Storm cluster consists of two types of nodes: the master node and the worker nodes. A master node runs a daemon called Nimbus, and the worker nodes each run a daemon called a Supervisor.

The master node can be thought of as the control center. In addition to the other responsibilities, this is where you’d run any of the commands such as activate , deactivate , rebalance , or kill —available in a Storm cluster (more on these commands you see on storm site).

The worker nodes are where the logic in the spouts and bolts is executed. Each worker node has a Supervisor daemon that’s tasked with administering the worker processes and keeping them in a running state. If a Supervisor notices that one of the worker processes is down, it will immediately restart it. Each worker process executes a subset of a topology. This means that each worker process belongs to a specific topology and that each topology will be run across one or more worker processes. Normally, these worker processes are run across many machines within the Storm cluster.

Here are the key takeaways:
¦ A worker process is a JVM .
¦ An executor is a thread of execution within a JVM .
¦ A task is an instance of a spout or bolt being run within a thread of execution on the JVM .

Executors are a thread of execution on the JVM , and tasks are the instances of our spouts and bolts running within a thread of execution. An executor is a thread of execution within a JVM. A task is an instance of a spout or bolt running within that thread of execution.

¦ A topology consists of nodes and edges.
¦ Nodes represent either spouts or bolts.
¦ Edges represent streams of tuples between these spouts and bolts.
¦ A tuple is an ordered list of values, where each value is assigned a name.
¦ A stream is an unbounded sequence of tuples between a spout and a bolt or between two bolts.
¦ A spout is the source of a stream in a topology, usually listening to some sort of live feed of data.
¦ A bolt accepts a stream of tuples from a spout or another bolt, typically performing some sort of computation or transformation on these input tuples. The bolt can then optionally emit new tuples that serve as the input stream to another bolt in the topology.
¦ Each spout and bolt will have one or many individual instances that perform all of this processing in parallel.
¦ TopologyBuilder —This class is used to piece together spouts and bolts, defining the streams and stream groupings between them.
¦ Config — This class is used for defining topologylevel configuration.
¦ StormTopology — This class is what TopologyBuilder builds and is what’s submitted to the cluster to be run.
¦ LocalCluster — This class simulates a Storm cluster inprocess on our local machine, allowing us to easily run our topologies for testing purposes.

Let us see what Storm class say:
You can think of the main method as being split into three sections. The first is where we build the topology and tell Storm where the streams are and identify the stream groupings for each of these streams. The next part is creating the configuration. In our example, we’ve turned on debug logging. Many more configuration options are available that we’ll cover later on coming blogs for advanced users. The final part is where we submit both the configuration and built topology to the local cluster to be run.

Sometimes you need to trigger an action periodically, such as aggregating a batch of data or flushing some writes to a database. Storm has a feature called tick tuples to handle this eventuality. Tick tuples can be configured to be received at a user-defined frequency and when configured, the execute method on the bolt will receive the tick tuple at the given frequency.

There are two ways of connecting components within a topology to one another i.e. shuffle grouping and fields grouping.
¦ You use shuffle grouping to distribute outgoing tuples from one component to the next in a manner that’s random but evenly spread out.
¦ You use fields grouping when you want to ensure tuples with the same values for a selected set of fields always go to the same instance of the next bolt.

Storm allows you to provide a parallelism hint when you define any spouts or bolts.
In code, this would involve transforming
builder.setSpout(“checkins”, new Checkins());
builder.setSpout(“checkins”, new Checkins(), 4);
So we have repid check-ins for that part of topology.
Here we have four executers or threads.

TopologyBuilder —Exposes the API for specifying a topology for Storm to execute.
OutputCollector —The core API for emitting and failing tuples.

builder.setBolt(“geocode-lookup”, new GeocodeLookup(), 8).setNumTasks(64)
Now we have 64 tasks (instances) of GeocodeLookup running across eight executors (threads).

Stream grouping: Stream grouping is used to define a stream that should be assigned to multiple Tasks above Bolts.
There are below types of stream grouping in Storm:

  1. Shuffle Grouping: Random grouping, randomly distributing tuples in the stream, ensuring that the number of tuples received by each bolt is the same. Polling, evenly distributed.
  2. Fields Grouping: Group by field, for example by userid, tuples with the same userid will be assigned to the same Bolts, and different userids will be assigned to different Bolts.
  3. All Grouping: Broadcast, for every tuple, all Bolts will be received.
  4. Global Grouping: Global grouping, this tuple is assigned to one of the tasks of a bolt in storm. A more specific point is to assign the task with the lowest id value.
  5. Non Grouping: Not grouping, this grouping means that stream does not care who will receive its tuple. At present, this grouping and Shuffle grouping are the same effect, and are not evenly distributed.
  6. Direct Grouping: Direct grouping, which is a more special grouping method. Using this grouping means that the sender of the message is handled by which message of the message receiver. Only message flows declared as Direct Stream can declare this grouping method. And this message tuple must be emitted using the emitDirect method. The message handler can either use the TopologyContext or handle the task’s taskid (the OutputCollector.emit method will also return taskid)
  7. .

Here let us see how we define the Grouping in code. Suppose we have a requirement to count the number of words in a text. For simplicity, we create a spout with some text defined in an array. Two bolts having the functionality to split and count words respectively.

We define the shuffle grouping like below in the topology builder:-

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“wcspout”, new WordCountSpout()).setNumTasks(2);
builder.setBolt(“split-bolt”, new SplitBolt(), 1).shuffleGrouping(“wcspout”).setNumTasks(1);
builder.setBolt(“counter-1”, new CountBolt(), 1).shuffleGrouping(“split-bolt”).setNumTasks(1);
builder.setBolt(“counter-2”, new CountBolt(), 3).fieldsGrouping(“counter-1”, new Fields(“word”)).setNumTasks(3);


Topology design paradigms
You can always start with the simplest functional components and then advance toward combining different operations together to reduce the number of partitions. It’s much harder to go the other way around. As you gain more experience with working with Storm and develop intuition for topology design, you’ll be able start with the lowest number of repartitions from the beginning.

Let’s see a approach to how we designed the topology:
1 We examined our data stream, and determined that our input tuples are based on what we start with. Then we determined the resulting tuples we need to end up with in order to achieve our goal (the end tuples).
2 We created a series of operations (as bolts) that transform the input tuples into end tuples.
3 We carefully examined each operation to understand its behavior and scaled it by making educated guesses based on our understanding of its behavior (by adjusting its executors/tasks).
4 At points of contention where we could no longer scale, we rethought our design and refactored the topology into scalable components.

Directed acyclic graph and tuple trees
Though we call it a tuple tree, it’s actually a directed acyclic graph (DAG). A directed graph is a set of nodes connected by edges, where the edges have a direction to them. A DAG is a directed graph such that you can’t start at one node and follow a sequence of edges to eventually get back to that same node. Early versions of Storm only worked with trees; even though Storm now supports DAGs, the term “tuple tree” has stuck.

What about the packages offer by Storm:
We may have used BaseBasicBolt (with its automated anchoring and acking).
We may have used BaseRichBolt (with its explicit anchoring and acking).

Degrees of reliability in Storm:
Much like with experience we see different kinds of scaling problems to carefully examined data streams, there are varying degrees of reliability when we carefully examine our topology design. We identify three degrees of reliability:
¦ At-most-once processing
¦ At-least-once processing
¦ Exactly-once processin

How does one go about tuning their topologies? It may seem like a daunting task at first, but Storm provides us with tools that can be used to quickly identify bottlenecks, allowing us to take steps to alleviate those bottlenecks. Using the Storm UI and metrics collecting API, you have tools at your disposal to establish a repeatable process you can use for tuning your topologies.

Capacity columns in Storm UI:
Capacity tells you what percentage of the time in the time window the bolt has spent executing tuples. If this value is close to 1, then the bolt is “at capacity” and is a bottleneck in your topology. Address such bottlenecks by increasing the parallelism of the “at-capacity” bolts.

storm rebalance topology-name –e [bolt-name]=[number-of-executors] .

This command will redistribute executors for the given bolt, allowing us to increase the parallelism for the given bolt on the fly. All rebalance commands assume we’re running on Nimbus node and that we have the Storm command in our PATH.

storm rebalance flash-sale -e find-recommended-sales=4

If we still aren’t meeting our SLA s at this point in tuning, it’s time to start looking at how we can control the rate that data flows into our topology: To controls on spout parallelism there are two factors come into play:
¦ The number of spouts.
¦ The maximum number of tuples each spout will allow to be live in our topology.

Storm’s metrics-collecting API
Prior to the Storm 0.9.x series of releases, metrics were the Wild West. You had topology level metrics available in the UI , but if you wanted business level or JVM level metrics, you needed to roll your own. The Metrics API that now ships with Storm is an excellent way to get access to metrics that can be used to solve our current quandary: CountMetrics , not SuccessRateMetrics are Storm’s built in matrics collector in the form of logs.

You notice a topology isn’t processing any data or has a sudden drop in throughput and zero free slots are available, according to the Storm UI .
The bottom line is you have a fixed number of worker processes that can be allocated to the topologies requesting them. You can address this problem with these strategies:
¦ Decreasing the number of worker processes in use by existing topologies : This is the quickest and easiest way to free up slots for other topologies in your cluster.
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

TopologyBuilder builder = new TopologyBuilder();
// build the various pieces of your topology here
Config config = new Config();
config.setNumWorkers(2); <<<<<<<<<<<<<<<<<<<<<<—————–


¦ Increasing the total number of worker processes in the cluster

It’s important to always be aware of the resources available in your cluster when deploying new topologies. If you ignore what’s available within your cluster, you can easily affect every topology in your cluster by deploying something that consumes too many resources.

Our friend SAR : System statistics command
sar [option]
Command-line option for displaying different types of system activity.


Tells the command to write information at a specified interval in seconds. A value of 3 would write information every 3 seconds.


Tells the command how many times to write information at the specified interval. A value of 5 would write information every n seconds a total of 5 times.

One way to keep an eye on this in Linux is with the sar (System Activity Reporter) command. This is a Linux-based system statistics command that collects and displays all system activities and statistics. We run this command in the format of
sar [option] [interval [count]]
$ sar -S 1 3
one time our issue was a bug that caused a topology to burn CPU needlessly over and over in a tight loop. That’s always what you should check for first.
Various options can be passed in to display specific types of statistics. For diagnosing worker node memory contention, we use the –S option for reporting swap space utilization statistics.

Here, the worker node has a fixed amount of memory that is being used by its worker processes, the Supervisor process, the operating system, and any other processes that may be running on the worker node at any given time. If not enough memory is available for all processes requesting it, then memory contention at the worker node level may occur.

Network/socket I/O contention
If your topologies interact over a network with external services, network/socket I/O contention is bound to be a problem for your cluster. In our experience, the main cause for this type of contention is that all of the ports allocated for opening sockets are being used.
Most Linux installs will default to 1024 maximum open files/sockets per process. In an I/O -intensive topology, it’s easy to hit that limit quickly. We’ve written topologies that open several thousand sockets per worker node. To determine the limits of your OS , you can examine the /proc filesystem to check your processes limits. In order to do this, you’ll first need to know your process ID . Once you do that, you can get a listing of all limits for that process. You can use ps and grep commands to find your process ID (aka PID ) and then get your process limits from the /proc filesystem.

If you’re hitting this limit, the Storm UI for your topology should display an exception in the “Last Error” column that the max open files limit has been reached. This will most likely be a stack trace starting with java.net.SocketException: Too many open files.

-bash-3.2$ ps aux | grep MY-TOPOLOGY-NAME

If you’re writing to NFS or some other network filesystem, stop immediately. Writing to NFS is slow and you’re setting yourself up for disk I/O contention if you do.

Leave a Reply

Your email address will not be published. Required fields are marked *