Monday, December 7, 2015

Spark SQL Parquet with string group by error

(Where <> below is the name of the column you are trying to group by on using Spark SQL.
Exception in thread "main" org.apache.spark.sql.AnalysisException: binary type expression <> cannot be used in grouping expression;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidGroupingExprs$1(CheckAnalysis.scala:121)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$4.apply(CheckAnalysis.scala:130)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$4.apply(CheckAnalysis.scala:130)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:130)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:49)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
    at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
    at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
The issue? per the docs, Spark assumes a binary is NOT a string unless you tell it. Except the only way to encode a string is as a binary.
Fix:

        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        sqlContext.setConf("spark.sql.parquet.binaryAsString", "true");

Parquet Strings

This took me a lot longer than expected to find: how define a String in a Parquet schema.

Basically, use binary. Yes, it is that easy.

   final MessageType schema = MessageTypeParser.parseMessageType(
                    "message event { required int64 reportid; required int64 event_ts; required int64 rid; required binary event;  optional binary details; }");

Wednesday, October 1, 2014

Redis, BloomFilters and failing fast

Last month I presented to the Atlanta Java Users Group (AJUG) about Redis, BloomFilters, Redis-backed BloomFilters and how to 'fail fast' to avoid a possibly expensive operation.

I received a lot of good questions and positive feedback about the presentation.

The slides are here:



A video of the talk is here:


AJUG - Using Redis and BloomFilters to fail fast - Chris Curtin (09.16.2014) from AJUG on Vimeo.

Monday, August 5, 2013

Adding Coda Hale Metrics to Tomcat

I spent a couple of frustrating hours trying to get the Coda Hale Metrics (also known as 'Yammer Metrics') working in a simple Tomcat instance. The documentation isn't great, so I found a couple of steps that were missing. (Note: It has been a few years since I've had to mess with the web.xml, so that might be why it took so long. Spring MVC and annotations means less .xml hell!)

First you need to define a Listener to your web.xml:

<listener>
 <listener-class>com.company.urltesting.MetricsListener</listener-class>
 </listener>
The logic in the Listener is pretty straightforward:

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.servlets.MetricsServlet;

public class MetricsListener extends MetricsServlet.ContextListener {
    public static final MetricRegistry REGISTRY = new MetricRegistry();

    @Override
    protected MetricRegistry getMetricRegistry() {
        return REGISTRY;
    }
}


The 'magic' here is the Coda Hale logic will call your listener to setup the Registry the first time. It isn't clear here but it is adding a specifically named Registry to the SessionContext (more about this in a minute).
Next setup the Servlet filter to be able to query the Metrics via an HTTP browser:


<servlet>
        <servlet-name>CodahaleMetrics</servlet-name>
        <servlet-class>com.codahale.metrics.servlets.MetricsServlet</servlet-class>
        <init-param>
            <param-name>metrics-uri</param-name>
            <param-value>/metrics</param-value>
        </init-param>
        <init-param>
            <param-name>ping-uri</param-name>
            <param-value>/ping</param-value>
        </init-param>
        <init-param>
            <param-name>healthcheck-uri</param-name>
            <param-value>/health</param-value>
        </init-param>
        <init-param>
            <param-name>threads-uri</param-name>
            <param-value>/threads</param-value>
        </init-param>
    </servlet>

    <servlet-mapping>
        <servlet-name>CodahaleMetrics</servlet-name>
        <url-pattern>/CodahaleMetrics/*</url-pattern>
    </servlet-mapping>





Finally, to create/use a Metric you need to know how to get them yourself from the Session Context.

Here is the most basic use:
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;

import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class Encrypted extends HttpServlet {
   // public static final MetricRegistry metrics = new MetricRegistry();

    public void service(HttpServletRequest request,
                        HttpServletResponse response) {

        ServletContext servletContext = getServletContext();
        MetricRegistry metrics = (MetricRegistry) servletContext.getAttribute("com.codahale.metrics.servlets.MetricsServlet.registry");
        Timer tempTimer = metrics.timer(MetricRegistry.name(Encrypted.class, "Encrypted"));
        final Timer.Context context = tempTimer.time();

        //  do your work
        context.stop();
    }
}



Note the 'magic' string in the getAttribute call on the Servlet Context. The Metrics library adds this during startup if you've provided the Listener we defined in the first step.

Finally, hit the reporting URL to see your Metric values:

http://localhost:8080/CodahaleMetrics/metrics

{"version":"3.0.0","gauges":{},"counters":{},"histograms":{},"meters":{},"timers":{"com.company.urltesting.Encrypted.Encrypted":{"count":1,"max":0.21837355500000002,"mean":0.21837355500000002,"min":0.21837355500000002,
"p50":0.21837355500000002,"p75":0.21837355500000002,"p95":0.21837355500000002,
"p98":0.21837355500000002,"p99":0.21837355500000002,"p999":0.21837355500000002,
"stddev":0.0,"m15_rate":0.0,"m1_rate":0.0,"m5_rate":0.0,"mean_rate":0.2579625731812282,
"duration_units":"seconds","rate_units":"calls/second"}}}

Thursday, March 21, 2013

Scala for the Intrigued video from AJUG January

I really enjoyed this presentation from Venkat Subramaniam from the January Atlanta Java User's Group meeting.

The Scala language he is talking about is very interesting and his presentation style is really fun. As I tweeted right after this, all I could think of during certain parts was how much he reminded me of Bill Cosby's stand up.

http://vimeo.com/57988159

Wednesday, March 20, 2013

Apache Kafka Presentation to AJUG

The presentation went well, had a lot of questions and quite a few people nodding as I talked.

The slides are here: http://www.slideshare.net/chriscurtin/ajug-march-2013-kafka

The video should be ready in a week or so. I'll update the blog when it is ready.

Monday, March 11, 2013

Storm-Kafka Trident implementation notes

When updating the Storm-Kafka project to work with Kafka 0.8.0 I learned a lot about how the IPartitionedTridentSpout-derived classes should work. Here are a few of my observations:

The ZooKeeper data, including the transaction id, is stored by the NAME you give the STREAM object in your topology. So changing the name could be really bad when you try to figure out how to store State data since the transaction ids will be reset to 1.

When the topology starts there is a VERY good chance that your emitPartitionBatch method is going to be called even if the last emitPartitionBatchNew call succeeded. Why? Trident is storing the meta data returned from the last call in ZooKeeper as the 'last meta' when it calls emitPartitionBatchNew again. Upon successful completion of the emitPartitionBatch it removes the data. If the topology starts and there are objects in ZooKeeper for the partition it will assume the last emitPartitionBatchNew failed and call the emitPartitionBatch method.

The Storm-Kafka package stores the NEXT offset that should be requested from Kafka in the meta data returned. For example if the fetch from Kafka returns offsets 1000 to 1005, 'next' is set to 1006. This way the next time the emitPartitionBatchNew method is called it can call Kafka and ask to start at offset 1006. However if there is nothing in Kafka to return, you can't increment 'next' to 1007 or you will get an invalid offset error from Kafka. If emitPartitionBatch  doesn't get anything from Kafka it sets the 'start' offset to the 'next' offset. 

Storing the "NEXT" offset is also important to know when coding the emitPartitionBatch method. Since there may now be data in Kafka since the topology was stopped, the emitPartitionBatch called on startup MUST NOT emit that offset, otherwise you will get that tuple played twice (once from emitPartitionBatch on startup and again from emitPartitionBatchNew when it starts the next batch.)

Web Scale Architecture Study Group: How Complex Systems Fail

“How Complex Systems Fail”. Worth a whole session by itself
Comments from a DevOps? perspective:
A pretty good list of things to think about around complex systems:

And just something to smile about:

Web Scale Architecture Study Group: Large Scale Deployments

So to get started, this Stack Overflow Q&A gives a good overview of the parts of a deployment system:
Next up, is a fairly long article about how Facebook does full life cycle deployments. Lots of ‘fluff’ at the beginning and end, but the bulk of the article is interesting:
Dan Nemec recommended this article to get an introduction to Automation tools:

I was able to find some examples of patterns around risk aversion in deployments:

Some more details on using BitTorrent? for deployment (Don’t bother with the video, the blog post describes the idea well):
Finally, a reminder that cloud deployments are very different:
Thanks to Ken Giles and Dan Nemec for suggesting articles about this topic.

Tuesday, March 5, 2013

Storm ZooKeeper error in LocalCluster mode

This one is more of a frustration than anything else.

When running a test Trident Topology in IntelliJ I kept getting the following errors:

2328 [main-SendThread(localhost:2000)] WARN  org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.SocketException: Address family not supported by protocol family: connect
 at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_33]
 at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) ~[na:1.6.0_33]
 at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1050) ~[zookeeper-3.3.3.jar:3.3.3-1073969]
 at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1077) ~[zookeeper-3.3.3.jar:3.3.3-1073969]


After digging around for a while I found that this can safely be ignored. The cause is that the local ZooKeeper hasn't started up yet. Once it starts the errors go away.