Monday, March 11, 2013

Storm-Kafka Trident implementation notes

When updating the Storm-Kafka project to work with Kafka 0.8.0 I learned a lot about how the IPartitionedTridentSpout-derived classes should work. Here are a few of my observations:

The ZooKeeper data, including the transaction id, is stored by the NAME you give the STREAM object in your topology. So changing the name could be really bad when you try to figure out how to store State data since the transaction ids will be reset to 1.

When the topology starts there is a VERY good chance that your emitPartitionBatch method is going to be called even if the last emitPartitionBatchNew call succeeded. Why? Trident is storing the meta data returned from the last call in ZooKeeper as the 'last meta' when it calls emitPartitionBatchNew again. Upon successful completion of the emitPartitionBatch it removes the data. If the topology starts and there are objects in ZooKeeper for the partition it will assume the last emitPartitionBatchNew failed and call the emitPartitionBatch method.

The Storm-Kafka package stores the NEXT offset that should be requested from Kafka in the meta data returned. For example if the fetch from Kafka returns offsets 1000 to 1005, 'next' is set to 1006. This way the next time the emitPartitionBatchNew method is called it can call Kafka and ask to start at offset 1006. However if there is nothing in Kafka to return, you can't increment 'next' to 1007 or you will get an invalid offset error from Kafka. If emitPartitionBatch  doesn't get anything from Kafka it sets the 'start' offset to the 'next' offset. 

Storing the "NEXT" offset is also important to know when coding the emitPartitionBatch method. Since there may now be data in Kafka since the topology was stopped, the emitPartitionBatch called on startup MUST NOT emit that offset, otherwise you will get that tuple played twice (once from emitPartitionBatch on startup and again from emitPartitionBatchNew when it starts the next batch.)

4 comments:

infomessages said...

is already stable the Storm-Kafka project to work with Kafka 0.8?

git url?

Chris Curtin said...

Not stable. Found a number of things still to do once he basics were working. Here is what I was able to get working:

https://github.com/ChrisCurtin/storm-contrib/tree/master/storm-kafka

Note that the README lists the things still to do.

We've decided not to use Storm at this time so I've moved on to other projects.

The Gich said...

Why did you decide to move away from Storm? Was it that it didn't fulfill your business needs, or something more specifically to do with Storm itself?

Chris Curtin said...

Business reasons and I'm getting the feeling that the project is being abandoned. Go look at the google group and the type of questions and who is answering them.

A lot of hard technical questions aren't being answered and the total lack of response on what the 0.9.0 release timeframe is also unsettling.