It’s been a while since I wrote a blog so here you go. I have been researching with Apache Spark currently and had to query complex nested JSON data set, encountered some challenges and ended up learning currently the best way to query nested structure as of writing this blog is to use HiveContext with Spark.
Apache Drill is another tool we can use to query nested JSON datasets but I will only cover Spark.
The example queries below are taken from Apache Drill Documents website.
The Yelp business reviews academic dataset can be downloaded from here .
val sc = new SparkContext("local[*]", "AnalyzeJSONWithDataFrame") val sqlContext = new SQLContext(sc) val input_file = "yelp_academic_dataset_business.json" val jsonData = sc.textFile(input_file) val businessReviews = sqlContext.read.json(jsonData).cache() businessReviews.registerTempTable("businessReviews")
Display First Row in the data set.
val firstRecordQuery = """SELECT | * | FROM businessReviews | limit 1 """.stripMargin val firstRecord = sqlContext.sql(firstRecordQuery) firstRecord.show()
Top states and cities in total number of reviews.
val countByCityStateQuery = """SELECT | state, city, COUNT(*) as totalReviews | FROM businessReviews | GROUP BY state, city ORDER BY totalReviews DESC LIMIT 10 | """.stripMargin val countByCityState = sqlContext.sql(countByCityStateQuery) countByCityState.show()
Average number of reviews per business star rating.
val avgReviewRatingQuery = """SELECT | stars,truncate(AVG(review_count),0) as reviewsAvg | from businessReviews | GROUP BY stars ORDER BY stars DESC | """.stripMargin val avgReviewRating = sqlContext.sql(avgReviewRatingQuery) avgReviewRating.show()
Top businesses with high review counts (> 1000)
val topBusinessQuery = """SELECT | name,state, city,review_count | from businessReviews | WHERE review_count > 1000 | ORDER BY review_count DESC LIMIT 10 | """.stripMargin val topBusiness = sqlContext.sql(topBusinessQuery) topBusiness.show()
Saturday open and close times for a few businesses.
val satOpenCloseTimeQuery = """SELECT | name,hours.Saturday.open as open, hours.Saturday.close as close | from businessReviews | LIMIT 10 | """.stripMargin val satOpenCloseTime = sqlContext.sql(satOpenCloseTimeQuery) satOpenCloseTime.show()val hiveCtx = new HiveContext(sc
Initialize Hive Context
This is needed because as of Spark 1.4 the explode function is available only via hive context.You should be able to use default sqlContext in Spark 1.5 and need not to initialize hive context.
val hiveCtx = new HiveContext(sc)
Top restaurants in number of reviews.
val restaurantsQuery = """SELECT | name,state,city,review_count , cat | FROM businessReviews | LATERAL VIEW explode(categories) tab AS cat | WHERE cat = 'Restaurants' | ORDER BY review_count DESC LIMIT 10 | """.stripMargin val restaurants = hiveCtx.sql(restaurantsQuery) restaurants.show()
Top restaurants in number of listed categories.
val restaurantsCategoryCountQuery = """SELECT | name, SIZE(categories) as categoryCount, categories | FROM businessReviews | LATERAL VIEW explode(categories) tab AS cat | WHERE cat = 'Restaurants' | ORDER BY categoryCount DESC LIMIT 10 | """.stripMargin val restaurantsCategoryCount = hiveCtx.sql(restaurantsCategoryCountQuery) restaurantsCategoryCount.show()
Top first categories in number of review counts.
val firstCategoryCountQuery = """SELECT | categories[0] as firstCategory, COUNT(categories[0]) as categoryCount | FROM businessReviews | GROUP BY categories[0] | ORDER BY categoryCount DESC LIMIT 10 | """.stripMargin val firstCategoryCount = hiveCtx.sql(firstCategoryCountQuery) firstCategoryCount.show()
Get a flattened list of categories for each business.
val flattenCategoryQuery = """SELECT | name,EXPLODE(categories) as category | FROM businessReviews | LIMIT 10 | """.stripMargin val flattenCategory = hiveCtx.sql(flattenCategoryQuery) flattenCategory.show()
Top categories used in business reviews.
val topCategoryBusinessReviewQuery = """SELECT | tempTable.category, COUNT(tempTable.category) categorycnt | FROM (SELECT EXPLODE(categories) category FROM businessReviews ) tempTable | GROUP BY tempTable.category | ORDER BY categorycnt DESC LIMIT 10 | """.stripMargin val topCategoryBusinessReview = hiveCtx.sql(topCategoryBusinessReviewQuery) topCategoryBusinessReview.show()
Hopefully this will help to query nested JSON via Spark Sql.
Good Luck!!!