Thursday, March 21, 2013

Scala for the Intrigued video from AJUG January

I really enjoyed this presentation from Venkat Subramaniam from the January Atlanta Java User's Group meeting.

The Scala language he is talking about is very interesting and his presentation style is really fun. As I tweeted right after this, all I could think of during certain parts was how much he reminded me of Bill Cosby's stand up.

http://vimeo.com/57988159

Wednesday, March 20, 2013

Apache Kafka Presentation to AJUG

The presentation went well, had a lot of questions and quite a few people nodding as I talked.

The slides are here: http://www.slideshare.net/chriscurtin/ajug-march-2013-kafka

The video should be ready in a week or so. I'll update the blog when it is ready.

Monday, March 11, 2013

Storm-Kafka Trident implementation notes

When updating the Storm-Kafka project to work with Kafka 0.8.0 I learned a lot about how the IPartitionedTridentSpout-derived classes should work. Here are a few of my observations:

The ZooKeeper data, including the transaction id, is stored by the NAME you give the STREAM object in your topology. So changing the name could be really bad when you try to figure out how to store State data since the transaction ids will be reset to 1.

When the topology starts there is a VERY good chance that your emitPartitionBatch method is going to be called even if the last emitPartitionBatchNew call succeeded. Why? Trident is storing the meta data returned from the last call in ZooKeeper as the 'last meta' when it calls emitPartitionBatchNew again. Upon successful completion of the emitPartitionBatch it removes the data. If the topology starts and there are objects in ZooKeeper for the partition it will assume the last emitPartitionBatchNew failed and call the emitPartitionBatch method.

The Storm-Kafka package stores the NEXT offset that should be requested from Kafka in the meta data returned. For example if the fetch from Kafka returns offsets 1000 to 1005, 'next' is set to 1006. This way the next time the emitPartitionBatchNew method is called it can call Kafka and ask to start at offset 1006. However if there is nothing in Kafka to return, you can't increment 'next' to 1007 or you will get an invalid offset error from Kafka. If emitPartitionBatch  doesn't get anything from Kafka it sets the 'start' offset to the 'next' offset. 

Storing the "NEXT" offset is also important to know when coding the emitPartitionBatch method. Since there may now be data in Kafka since the topology was stopped, the emitPartitionBatch called on startup MUST NOT emit that offset, otherwise you will get that tuple played twice (once from emitPartitionBatch on startup and again from emitPartitionBatchNew when it starts the next batch.)

Web Scale Architecture Study Group: How Complex Systems Fail

“How Complex Systems Fail”. Worth a whole session by itself
Comments from a DevOps? perspective:
A pretty good list of things to think about around complex systems:

And just something to smile about:

Web Scale Architecture Study Group: Large Scale Deployments

So to get started, this Stack Overflow Q&A gives a good overview of the parts of a deployment system:
Next up, is a fairly long article about how Facebook does full life cycle deployments. Lots of ‘fluff’ at the beginning and end, but the bulk of the article is interesting:
Dan Nemec recommended this article to get an introduction to Automation tools:

I was able to find some examples of patterns around risk aversion in deployments:

Some more details on using BitTorrent? for deployment (Don’t bother with the video, the blog post describes the idea well):
Finally, a reminder that cloud deployments are very different:
Thanks to Ken Giles and Dan Nemec for suggesting articles about this topic.

Tuesday, March 5, 2013

Storm ZooKeeper error in LocalCluster mode

This one is more of a frustration than anything else.

When running a test Trident Topology in IntelliJ I kept getting the following errors:

2328 [main-SendThread(localhost:2000)] WARN  org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.SocketException: Address family not supported by protocol family: connect
 at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_33]
 at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) ~[na:1.6.0_33]
 at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1050) ~[zookeeper-3.3.3.jar:3.3.3-1073969]
 at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1077) ~[zookeeper-3.3.3.jar:3.3.3-1073969]


After digging around for a while I found that this can safely be ignored. The cause is that the local ZooKeeper hasn't started up yet. Once it starts the errors go away.

Custom Partitioners for Kafka 0.8.0

One of the nice features of Kafka is the ability to define the partitioning algorithm. This allows the user to define the key that should be used for partitioning, instead of letting Kafka randomly assign it. However it wasn't obvious how to do this at first, turns out you need a couple of things defined to make it work.

First, is you need to tell Kafka the name of your Partitioner class:

props.put("partitioner.class", "com.curtinhome.kafka.playproducer.OrganizationPartitioner");

Then tell Kafka how to serialize the key so the Partitioner class can be called with the value:

 props.put("serializer.class", "kafka.serializer.StringEncoder");
 props.put("key.serializer.class", "kafka.serializer.StringEncoder");


Here we're telling Kafka that both the message being written and the key are Strings. If you wanted to pass an Integer key, you'd need to provide a class that coverts the Integer to a byte array.

Now when you write the message, you pass the Key and the Message:

        long events = Long.parseLong(args[0]);
        int blocks = Integer.parseInt(args[1]);

        Random rnd = new Random();
        Properties props = new Properties();
        props.put("broker.list",  "broker1.atlnp1:9092,broker2.atlnp1:9092,broker3.atlnp1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "com.curtinhome.kafka.playproducer.OrganizationPartitioner");
        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);
          for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String msg = runtime + "," + (50 + nBlocks) + "," + nEvents+ "," + rnd.nextInt(1000);
                KeyedMessage data =
                    new KeyedMessage("test1", String.valueOf(nBlocks), msg);
                producer.send(data);
            }
        }
        producer.close();


The Partitioner is pretty straight forward:

public class OrganizationPartitioner implements Partitioner {
    public OrganizationPartitioner(VerifiableProperties props) {

    }
    public int partition(String key, int a_numPartitions) {
        long organizationId = Long.parseLong(key);
        return (int) (organizationId % a_numPartitions);
    }

}

The number of partitions is provided by Kafka based on the topic configuration.

Finding Leader Broker in Kafka 0.8.0

With Kafka 0.8.0 the paradigm for storing data has changed. No longer can you connect to any broker and get access to the data for a topic and partition.

Starting with 0.8.0 Kafka has implemented replication, so now data is only on a subset of your Brokers (assuming you have more Brokers than count of replicas).

To figure out which Broker is the leader for your topic and partition, connect to any Broker and request the MetaData:

        kafka.javaapi.consumer.SimpleConsumer consumer  = new SimpleConsumer("mybroker01",9092,100000,64 * 1024, "test");
        List topics2 = new ArrayList();
        topics2.add("storm-anon");
        TopicMetadataRequest req = new TopicMetadataRequest(topics2);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<kafka.javaapi.TopicMetadata> data3 =  resp.topicsMetadata();

The reply from Kafka contains information about every partition in the topic. Iterate through the response and find the partition you want, then call 'leader().host()' to figure out what Broker to connect to.

The following code shows all the partitions for a specific topic, including where each of the replicas resides.

        for (kafka.javaapi.TopicMetadata item : data3) {
           for (kafka.javaapi.PartitionMetadata part: item.partitionsMetadata() ) {
               String replicas = "";
               String isr = "";

               for (kafka.cluster.Broker replica: part.replicas() ) {
                   replicas += " " + replica.host();
               }

               for (kafka.cluster.Broker replica: part.isr() ) {
                   isr += " " + replica.host();
               }

              System.out.println( "Partition: " +   part.partitionId()  + ": " + part.leader().host() + " R:[ " + replicas + "] I:[" + isr + "]");
           }
        }


Note that you only have to do this if you are managing your own offsets and using the SimpleConsumer. Using Consumer Groups takes care of this automatically.

Also note that you don't need to do this on the Producer side, Kafka handles figuring out which partition is on which Broker for you.