Monday, August 5, 2013

Adding Coda Hale Metrics to Tomcat

I spent a couple of frustrating hours trying to get the Coda Hale Metrics (also known as 'Yammer Metrics') working in a simple Tomcat instance. The documentation isn't great, so I found a couple of steps that were missing. (Note: It has been a few years since I've had to mess with the web.xml, so that might be why it took so long. Spring MVC and annotations means less .xml hell!)

First you need to define a Listener to your web.xml:

<listener>
 <listener-class>com.company.urltesting.MetricsListener</listener-class>
 </listener>
The logic in the Listener is pretty straightforward:

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.servlets.MetricsServlet;

public class MetricsListener extends MetricsServlet.ContextListener {
    public static final MetricRegistry REGISTRY = new MetricRegistry();

    @Override
    protected MetricRegistry getMetricRegistry() {
        return REGISTRY;
    }
}


The 'magic' here is the Coda Hale logic will call your listener to setup the Registry the first time. It isn't clear here but it is adding a specifically named Registry to the SessionContext (more about this in a minute).
Next setup the Servlet filter to be able to query the Metrics via an HTTP browser:


<servlet>
        <servlet-name>CodahaleMetrics</servlet-name>
        <servlet-class>com.codahale.metrics.servlets.MetricsServlet</servlet-class>
        <init-param>
            <param-name>metrics-uri</param-name>
            <param-value>/metrics</param-value>
        </init-param>
        <init-param>
            <param-name>ping-uri</param-name>
            <param-value>/ping</param-value>
        </init-param>
        <init-param>
            <param-name>healthcheck-uri</param-name>
            <param-value>/health</param-value>
        </init-param>
        <init-param>
            <param-name>threads-uri</param-name>
            <param-value>/threads</param-value>
        </init-param>
    </servlet>

    <servlet-mapping>
        <servlet-name>CodahaleMetrics</servlet-name>
        <url-pattern>/CodahaleMetrics/*</url-pattern>
    </servlet-mapping>





Finally, to create/use a Metric you need to know how to get them yourself from the Session Context.

Here is the most basic use:
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;

import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class Encrypted extends HttpServlet {
   // public static final MetricRegistry metrics = new MetricRegistry();

    public void service(HttpServletRequest request,
                        HttpServletResponse response) {

        ServletContext servletContext = getServletContext();
        MetricRegistry metrics = (MetricRegistry) servletContext.getAttribute("com.codahale.metrics.servlets.MetricsServlet.registry");
        Timer tempTimer = metrics.timer(MetricRegistry.name(Encrypted.class, "Encrypted"));
        final Timer.Context context = tempTimer.time();

        //  do your work
        context.stop();
    }
}



Note the 'magic' string in the getAttribute call on the Servlet Context. The Metrics library adds this during startup if you've provided the Listener we defined in the first step.

Finally, hit the reporting URL to see your Metric values:

http://localhost:8080/CodahaleMetrics/metrics

{"version":"3.0.0","gauges":{},"counters":{},"histograms":{},"meters":{},"timers":{"com.company.urltesting.Encrypted.Encrypted":{"count":1,"max":0.21837355500000002,"mean":0.21837355500000002,"min":0.21837355500000002,
"p50":0.21837355500000002,"p75":0.21837355500000002,"p95":0.21837355500000002,
"p98":0.21837355500000002,"p99":0.21837355500000002,"p999":0.21837355500000002,
"stddev":0.0,"m15_rate":0.0,"m1_rate":0.0,"m5_rate":0.0,"mean_rate":0.2579625731812282,
"duration_units":"seconds","rate_units":"calls/second"}}}

Thursday, March 21, 2013

Scala for the Intrigued video from AJUG January

I really enjoyed this presentation from Venkat Subramaniam from the January Atlanta Java User's Group meeting.

The Scala language he is talking about is very interesting and his presentation style is really fun. As I tweeted right after this, all I could think of during certain parts was how much he reminded me of Bill Cosby's stand up.

http://vimeo.com/57988159

Wednesday, March 20, 2013

Apache Kafka Presentation to AJUG

The presentation went well, had a lot of questions and quite a few people nodding as I talked.

The slides are here: http://www.slideshare.net/chriscurtin/ajug-march-2013-kafka

The video should be ready in a week or so. I'll update the blog when it is ready.

Monday, March 11, 2013

Storm-Kafka Trident implementation notes

When updating the Storm-Kafka project to work with Kafka 0.8.0 I learned a lot about how the IPartitionedTridentSpout-derived classes should work. Here are a few of my observations:

The ZooKeeper data, including the transaction id, is stored by the NAME you give the STREAM object in your topology. So changing the name could be really bad when you try to figure out how to store State data since the transaction ids will be reset to 1.

When the topology starts there is a VERY good chance that your emitPartitionBatch method is going to be called even if the last emitPartitionBatchNew call succeeded. Why? Trident is storing the meta data returned from the last call in ZooKeeper as the 'last meta' when it calls emitPartitionBatchNew again. Upon successful completion of the emitPartitionBatch it removes the data. If the topology starts and there are objects in ZooKeeper for the partition it will assume the last emitPartitionBatchNew failed and call the emitPartitionBatch method.

The Storm-Kafka package stores the NEXT offset that should be requested from Kafka in the meta data returned. For example if the fetch from Kafka returns offsets 1000 to 1005, 'next' is set to 1006. This way the next time the emitPartitionBatchNew method is called it can call Kafka and ask to start at offset 1006. However if there is nothing in Kafka to return, you can't increment 'next' to 1007 or you will get an invalid offset error from Kafka. If emitPartitionBatch  doesn't get anything from Kafka it sets the 'start' offset to the 'next' offset. 

Storing the "NEXT" offset is also important to know when coding the emitPartitionBatch method. Since there may now be data in Kafka since the topology was stopped, the emitPartitionBatch called on startup MUST NOT emit that offset, otherwise you will get that tuple played twice (once from emitPartitionBatch on startup and again from emitPartitionBatchNew when it starts the next batch.)

Web Scale Architecture Study Group: How Complex Systems Fail

“How Complex Systems Fail”. Worth a whole session by itself
Comments from a DevOps? perspective:
A pretty good list of things to think about around complex systems:

And just something to smile about:

Web Scale Architecture Study Group: Large Scale Deployments

So to get started, this Stack Overflow Q&A gives a good overview of the parts of a deployment system:
Next up, is a fairly long article about how Facebook does full life cycle deployments. Lots of ‘fluff’ at the beginning and end, but the bulk of the article is interesting:
Dan Nemec recommended this article to get an introduction to Automation tools:

I was able to find some examples of patterns around risk aversion in deployments:

Some more details on using BitTorrent? for deployment (Don’t bother with the video, the blog post describes the idea well):
Finally, a reminder that cloud deployments are very different:
Thanks to Ken Giles and Dan Nemec for suggesting articles about this topic.

Tuesday, March 5, 2013

Storm ZooKeeper error in LocalCluster mode

This one is more of a frustration than anything else.

When running a test Trident Topology in IntelliJ I kept getting the following errors:

2328 [main-SendThread(localhost:2000)] WARN  org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.SocketException: Address family not supported by protocol family: connect
 at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_33]
 at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) ~[na:1.6.0_33]
 at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1050) ~[zookeeper-3.3.3.jar:3.3.3-1073969]
 at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1077) ~[zookeeper-3.3.3.jar:3.3.3-1073969]


After digging around for a while I found that this can safely be ignored. The cause is that the local ZooKeeper hasn't started up yet. Once it starts the errors go away.

Custom Partitioners for Kafka 0.8.0

One of the nice features of Kafka is the ability to define the partitioning algorithm. This allows the user to define the key that should be used for partitioning, instead of letting Kafka randomly assign it. However it wasn't obvious how to do this at first, turns out you need a couple of things defined to make it work.

First, is you need to tell Kafka the name of your Partitioner class:

props.put("partitioner.class", "com.curtinhome.kafka.playproducer.OrganizationPartitioner");

Then tell Kafka how to serialize the key so the Partitioner class can be called with the value:

 props.put("serializer.class", "kafka.serializer.StringEncoder");
 props.put("key.serializer.class", "kafka.serializer.StringEncoder");


Here we're telling Kafka that both the message being written and the key are Strings. If you wanted to pass an Integer key, you'd need to provide a class that coverts the Integer to a byte array.

Now when you write the message, you pass the Key and the Message:

        long events = Long.parseLong(args[0]);
        int blocks = Integer.parseInt(args[1]);

        Random rnd = new Random();
        Properties props = new Properties();
        props.put("broker.list",  "broker1.atlnp1:9092,broker2.atlnp1:9092,broker3.atlnp1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "com.curtinhome.kafka.playproducer.OrganizationPartitioner");
        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);
          for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String msg = runtime + "," + (50 + nBlocks) + "," + nEvents+ "," + rnd.nextInt(1000);
                KeyedMessage data =
                    new KeyedMessage("test1", String.valueOf(nBlocks), msg);
                producer.send(data);
            }
        }
        producer.close();


The Partitioner is pretty straight forward:

public class OrganizationPartitioner implements Partitioner {
    public OrganizationPartitioner(VerifiableProperties props) {

    }
    public int partition(String key, int a_numPartitions) {
        long organizationId = Long.parseLong(key);
        return (int) (organizationId % a_numPartitions);
    }

}

The number of partitions is provided by Kafka based on the topic configuration.

Finding Leader Broker in Kafka 0.8.0

With Kafka 0.8.0 the paradigm for storing data has changed. No longer can you connect to any broker and get access to the data for a topic and partition.

Starting with 0.8.0 Kafka has implemented replication, so now data is only on a subset of your Brokers (assuming you have more Brokers than count of replicas).

To figure out which Broker is the leader for your topic and partition, connect to any Broker and request the MetaData:

        kafka.javaapi.consumer.SimpleConsumer consumer  = new SimpleConsumer("mybroker01",9092,100000,64 * 1024, "test");
        List topics2 = new ArrayList();
        topics2.add("storm-anon");
        TopicMetadataRequest req = new TopicMetadataRequest(topics2);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<kafka.javaapi.TopicMetadata> data3 =  resp.topicsMetadata();

The reply from Kafka contains information about every partition in the topic. Iterate through the response and find the partition you want, then call 'leader().host()' to figure out what Broker to connect to.

The following code shows all the partitions for a specific topic, including where each of the replicas resides.

        for (kafka.javaapi.TopicMetadata item : data3) {
           for (kafka.javaapi.PartitionMetadata part: item.partitionsMetadata() ) {
               String replicas = "";
               String isr = "";

               for (kafka.cluster.Broker replica: part.replicas() ) {
                   replicas += " " + replica.host();
               }

               for (kafka.cluster.Broker replica: part.isr() ) {
                   isr += " " + replica.host();
               }

              System.out.println( "Partition: " +   part.partitionId()  + ": " + part.leader().host() + " R:[ " + replicas + "] I:[" + isr + "]");
           }
        }


Note that you only have to do this if you are managing your own offsets and using the SimpleConsumer. Using Consumer Groups takes care of this automatically.

Also note that you don't need to do this on the Producer side, Kafka handles figuring out which partition is on which Broker for you.

Thursday, February 7, 2013

Presenting Apache Kafka at AJUG

I'll be presenting about Apache Kafka at AJUG in March.

http://www.meetup.com/atlantajug/events/99878712/

I'll post up the slides after the presentation and may even have a video this time.

"Web Scale Architecture" study group


While reading about Netflix' downtime on Christmas Eve 2012 a thought entered my mind: I wonder what thinks about this?

From this thought came an idea for a study group at work. Our focus? Studying "web scale" architectures. Of course "web scale" can mean anything, so we decided to focus on things around best practices in Resiliency of systems, occasionally looking at cool algorithm implementations and 'how they built it' articles.

Of course there is no one book (and while we have a book budget, buying a bunch of books to read one chapter isn't a wise use of it!) we are instead looking at articles, blogs and videos.

Today was our first meeting (12-1 during lunch) and we had a very eclectic group of people in the room. Many software engineers, a few architects, a couple of DevOps, a couple of QA, a Security Engineer and a few product managers. We had a very good discussion about 'falling over' and the CircuitBreaker pattern.

Here is what we looked at.

Netflix blog about Resilient Systems:

http://techblog.netflix.com/2011/12/making-netflix-api-more-resilient.html

Circuit Breaker Pattern description from HubSpot:

http://dev.hubspot.com/blog/bid/64543/Building-a-Robust-System-Using-the-Circuit-Breaker-Pattern

Sample source code:

http://thatextramile.be/blog/2008/05/the-circuit-breaker

We of course went off a few tangents, but overall I really enjoyed discussing this with such a diverse group of people.

I am going to try to update this blog after each meeting in case you too are interested in learning about this stuff.

What does it mean to be a Senior Engineer?


Read a great blog post today about what it means to be a 'senior engineer'. Applicable to pretty much any discipline not just engineering. Same can be said for 'senior mechanics' or 'senior plumbers'.

http://www.kitchensoap.com/2012/10/25/on-being-a-senior-engineer/

This blog linked to a second blog with some great Fatherly advice for a son:

http://blog.stephenwyattbush.com/2012/04/07/dad-and-the-ten-commandments-of-egoless-programming

My favorite is definitely:

"Treat people who know less than you with respect, deference, and patience. Non-technical people who deal with developers on a regular basis almost universally hold the opinion that we are prima donnas at best and crybabies at worst. Don’t reinforce this stereotype with anger and impatience."