Distributed Complex Event Processing (C.E.P.)
Reverse Engineering Apache Storm To Architect Real-Time Continuity-of-Service using Redis
(Performed while engaged at Bank Of America)

Note: Click on the image at the end of this article for a detailed PDF on this reverse engineering deep dive

Real-Time Apache Storm Continuity of Service

Real-Time Apache Storm Continuity of Service

The need to recover Storm task-state after failure:
Distributed compute loss in a Complex Event Processor (C.E.P.)

The state of an Apache Storm topology computation is a composite of the state of it’s distributed tasks at any given time. This divide-and-conquer architecture (common to big data platforms) has consequences with respect to task-level failures: when a task fails (not if), it’s in-memory data structures are lost. Such data structures could, for example, represent a long-running aggregate calculation of some kind. While Nimbus (Storm’s master) will re-launch the failed task and then replay timed-out tuples that the task missed while it was temporarily unavailable, Storm cannot replay every tuple the task ever received during it’s lifetime (i.e. in order to reconstruct the in-memory state that was lost). This means that the calculation(s) computed by that specific task are irretrievable lost, as are the downstream composite/aggregated results that depend on it.

Note: While it’s possible to design topologies such that restarted tasks can reach back into some persistence layer and have all archived tuples be replayed end-to-end, such a replay would take a long time, making this approach not viable for real-time use cases.

Storm does not provide explicit protection against this kind of failure, however it does provide two very important task-level properties that enable users to implement their own protections.

The following properties hold true when a task fails and is restarted:

    1. Task indices of Bolts persist (they do not change)
    2. If a Task was receiving Fields-Grouped tuples, that grouping remains the same.

To illustrate, let’s say that the Bolt01 component has 5 tasks, which would mean each would be assigned a task index of 0, 1, 2, 3, and 4, respectively. And let’s also say that task number ‘3’ receives fields-grouped 1-tuples containing strings that begin with letters [a – i]. The above properties tell us that when task number ‘3’ fails and is restarted, it will still have ‘3’ as it’s task index number, and that it will also continue to receive 1-tuples containing strings that begin with letters [a – i].

Consider a real life analogue to this. Two bank tellers receive checks to process. The first teller only processes checks from clients whose names begin with the letters [a – m], while the second teller only processes checks from clients whose names begin with the letters [n – z]. Each maintain running balances for the subset of clients they exclusively handle. We take for granted that when tellers go to sleep at night (temporarily die) and wake up remembering their identities — their name, their job, their specific job role — that that is what enables them to continue where they left off. Teller number two, for instance, knows to process [n – z] checks that came in over night and not to process checks [a – m].

We want this same continuity-of-service across task fail/restarts in a Storm topology, and the two ‘recollection’ properties above will help us achieve it, as we’ll see next.

Using task identity persistence to recover task state after failure

Each task in a Storm topology can be fully and uniquely qualified using two task-level attributes:

Both attributes are available via the TopologyContext object passed to a Spout’s open() method and to a Bolt’s prepare() method, and we use them to create a fully-qualifying identifier, fqTaskId, as shown here:

As definedfqTaskId (a String) is unique across all component tasks — topology wide — and can be used to identify a task specifically, whenever it may be necessary to perform some operation unique to that task. As a general best-practice we normally avoid performing task-instance specific operations in a topology, but save/restore operations to defend against task failures is a legitimate exception.

Task instances can make clever use fqTaskId by using it as a unique index into an out-of-band persistence layer for saving (and when necessary restoring) task-critical data. For example if Bolt01 maintains a hashmap of customerID-to-Balance data, then when a task updates it, it could also persist that update to an external Redis Hash pointed to by it’s unique Redis-Key-Prefix, fqTaskID. (Note: And if the persistence layer were highly-available, you could opt to use it exclusively to store the hashmap, thereby eliminating the double write). Now, should a Bolt task fail and be restarted, the first thing it’s prepare() method would do is connect to the persistence layer and, again using fqTaskId as it’s index into it, recover the various task-critical data structures that were lost. Again we can do this because, as discussed earlier, the value of fqTaskId persists across task fail / restarts. Once the task progresses to the execute() method, all outstanding tuples that timed out and were marked as failed will be replayed by Storms Guaranteed Message Processing facility which would complete the recovery process.

Reverse Engineering Apache Storm was needed for this

Prior to the arrival of Apache Storm v2.x (circa mid-2019), the vulnerability describe above persisted since it’s first releases (circa 2012). In order to architect a safeguard against this, I undertook the below reverse engineering effort of Apache Storm (because no books existed at the time), to see if I could, in fact, use some secondary out-of-band persistence store to doubly-write task I/Os (as described above). I performed this work in my personal R&D Lab environment, then transferred my results (e.g. configuration files) onto Bank Of America / ML servers. My investigation and tests were successful, and I employed Redis as that secondary store. Ironically, I also used Redis during the reverse engineering investigation itself.

Click on the image below for a detailed PDF on this reverse engineering deep dive.

Real-Time Apache Storm Continuity of Service

Real-Time Apache Storm Continuity of Service