Monday, December 7, 2015

Spark SQL Parquet with string group by error

(Where <> below is the name of the column you are trying to group by on using Spark SQL.
Exception in thread "main" org.apache.spark.sql.AnalysisException: binary type expression <> cannot be used in grouping expression;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidGroupingExprs$1(CheckAnalysis.scala:121)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$4.apply(CheckAnalysis.scala:130)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$4.apply(CheckAnalysis.scala:130)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:130)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:49)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
    at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
    at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
The issue? per the docs, Spark assumes a binary is NOT a string unless you tell it. Except the only way to encode a string is as a binary.
Fix:

        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        sqlContext.setConf("spark.sql.parquet.binaryAsString", "true");

Parquet Strings

This took me a lot longer than expected to find: how define a String in a Parquet schema.

Basically, use binary. Yes, it is that easy.

   final MessageType schema = MessageTypeParser.parseMessageType(
                    "message event { required int64 reportid; required int64 event_ts; required int64 rid; required binary event;  optional binary details; }");