Apache Spark frequently used code snippets

Following are the frequently used code snippets as per my experience

Create spark session

val spark = SparkSession.builder.appName("Simple Application").getOrCreate()


Read Hive table 

// Creating the spark session with Hive support
val spark = SparkSession
  .builder()
  .appName("Sample Spark Hive")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

// Reading the Hive table using spark session
spark.sql("SELECT * FROM <hive_schema>.<hive_table>")


Creating a Dataframe

val spark: SparkSession = createSparkSession

    /* List */
    val data = Seq(Row(1111, "Prod1", "4/24/2020 21:06:10", 5.0),
      Row(1101, "Prod2", "4/24/2020 21:06:11", 4.5),
      Row(1102, "Prod3", "4/24/2020 21:06:12", 1.5),
      Row(1103, "Prod4", "4/24/2020 21:06:13", 2.5),
      Row(1104, "Prod5", "4/24/2020 21:06:14", 3.0))

    val order_schema = StructType(List(
      StructField("order_id", IntegerType, false),
      StructField("product_name", StringType, false),
      StructField("purchase_date", StringType, false),
      StructField("price", DoubleType, true)
    ))

    // Create RDD from Seq
    val seqRdd = spark.sparkContext.parallelize(data)

    // Convert RDD to Dataframe
    val df = spark.createDataFrame(seqRdd, order_schema)
    df.printSchema()
    df.show()


Reading CSV file

// Read CSV file
val fileDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\orders.csv")
fileDF.show(10, false)

// If the file doesn't contain header the infer schema
val fileDFSchemaInfered = spark.read.option("header","false").option("delimiter","|").schema(order_schema).csv("C:\\tmp\\orders.csv")
fileDFSchemaInfered.show(10, false)


Writing the Dataframe as a CSV and Parquet file

// Writing the dataframe as a CSV file   
fileDF.write.option("header","true").option("delimiter",",").mode(SaveMode.Overwrite).csv("C:\\tmp\\orders_as_csv.csv")

// Writing the dataframe as a CSV file
fileDF.write.option("header","true").option("delimiter",",").parquet("C:\\tmp\\orders_as_parquet.csv")

// Reading the parquet file
val parquetDF = spark.read.parquet("C:\\tmp\\orders_as_parquet.csv")
parquetDF.show( false)


Generating the plan

// Read products
val productsDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\products.csv")
productsDF.show()

// Read bill info
val billDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\bill.csv")
billDF.show()

// Generate bill with product names
val billWithDescriptionDF = billDF.as("b").join(productsDF.as("p"), billDF.col("prod_id").equalTo(productsDF.col("prod_id")), "left")
      .select("b.prod_id","p.prod_name", "b.bill_time","b.qty","b.price")
billWithDescriptionDF.show()


billWithDescriptionDF.explain("formatted") // Accepted explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'.


Plan output

== Physical Plan ==
* Project (8)
+- * BroadcastHashJoin LeftOuter BuildRight (7)
   :- * Project (2)
   :  +- BatchScan (1)
   +- BroadcastExchange (6)
      +- * Project (5)
         +- * Filter (4)
            +- BatchScan (3)


(1) BatchScan 
Output: [prod_id#163, bill_time#164, qty#165, price#166]
     
(2) Project [codegen id : 2]
Output    : [prod_id#163, bill_time#164, qty#165, price#166]
Input     : [prod_id#163, bill_time#164, qty#165, price#166]
     
(3) BatchScan 
Output: [prod_id#97, prod_name#98]
     
(4) Filter [codegen id : 1]
Input     : [prod_id#97, prod_name#98]
Condition : isnotnull(prod_id#97)
     
(5) Project [codegen id : 1]
Output    : [prod_id#97, prod_name#98]
Input     : [prod_id#97, prod_name#98]
     
(6) BroadcastExchange 
Input: [prod_id#97, prod_name#98]
     
(7) BroadcastHashJoin [codegen id : 2]
Left keys: List(prod_id#163)
Right keys: List(prod_id#97)
Join condition: None
     
(8) Project [codegen id : 2]
Output    : [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166]
Input     : [prod_id#163, bill_time#164, qty#165, price#166, prod_id#97, prod_name#98]

Plan with extended mode true

billWithDescriptionDF.explain(true)


== Parsed Logical Plan ==
'Project [unresolvedalias('b.prod_id, None), unresolvedalias('p.prod_name, None), unresolvedalias('b.bill_time, None), unresolvedalias('b.qty, None), unresolvedalias('b.price, None)]
+- Join LeftOuter, (prod_id#163 = prod_id#97)
   :- SubqueryAlias `b`
   :  +- RelationV2[prod_id#163, bill_time#164, qty#165, price#166] csv file:/C:/tmp/bill.csv
   +- SubqueryAlias `p`
      +- RelationV2[prod_id#97, prod_name#98] csv file:/C:/tmp/products.csv

== Analyzed Logical Plan ==
prod_id: string, prod_name: string, bill_time: string, qty: string, price: string
Project [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166]
+- Join LeftOuter, (prod_id#163 = prod_id#97)
   :- SubqueryAlias `b`
   :  +- RelationV2[prod_id#163, bill_time#164, qty#165, price#166] csv file:/C:/tmp/bill.csv
   +- SubqueryAlias `p`
      +- RelationV2[prod_id#97, prod_name#98] csv file:/C:/tmp/products.csv

== Optimized Logical Plan ==
Project [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166]
+- Join LeftOuter, (prod_id#163 = prod_id#97)
   :- RelationV2[prod_id#163, bill_time#164, qty#165, price#166] csv file:/C:/tmp/bill.csv
   +- Filter isnotnull(prod_id#97)
      +- RelationV2[prod_id#97, prod_name#98] csv file:/C:/tmp/products.csv

== Physical Plan ==
*(2) Project [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166]
+- *(2) BroadcastHashJoin [prod_id#163], [prod_id#97], LeftOuter, BuildRight
   :- *(2) Project [prod_id#163, bill_time#164, qty#165, price#166]
   :  +- BatchScan[prod_id#163, bill_time#164, qty#165, price#166] CSVScan Location: InMemoryFileIndex[file:/C:/tmp/bill.csv], ReadSchema: struct<prod_id:string,bill_time:string,qty:string,price:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])), [id=#213]
      +- *(1) Project [prod_id#97, prod_name#98]
         +- *(1) Filter isnotnull(prod_id#97)
            +- BatchScan[prod_id#97, prod_name#98] CSVScan Location: InMemoryFileIndex[file:/C:/tmp/products.csv], ReadSchema: struct<prod_id:string,prod_name:string>


Frequently used function utilities:

Date format conversion 

when and otherwise clause usage


val billDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\bill.csv")
billDF.show(false)

Output:
+-------+-------------------+---+-----+
|prod_id|          bill_time|qty|price|
+-------+-------------------+---+-----+
|     21|04/24/2020 18:30:31|  2| 10.0|
|     22|04/24/2020 18:30:32|  1| 15.5|
|     23|04/24/2020 18:30:33|  3|24.25|
|     25|04/24/2020 18:30:34|  1| 8.75|
|     26|04/24/2020 18:30:35|  4| 16.0|
|     28|04/24/2020 18:30:36|  2| 20.0|
|     29|04/24/2020 18:30:37|  2| 10.0|
|     30|04/24/2020 18:30:38|  2|  9.0|
|     31|04/24/2020 18:30:39|  2| 14.0|
|     32|04/24/2020 18:30:40|  2|  6.0|
|     33|04/24/2020 18:30:41|  2|  4.0|
|     34|04/24/2020 18:30:42|  2| 30.0|
|     36|04/24/2020 18:30:43|  2| 18.0|
+-------+-------------------+---+-----+


val billDFWithAdditionalCols = billDF.as("b").withColumn("bill_timestamp", to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"))
      .withColumn("bill_time_epoch", unix_timestamp(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss")))
      .withColumn("bill_date", to_date(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss")))
      .withColumn("bill_only_time_24hrs_format", date_format(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"), "HH:mm:ss"))
      .withColumn("bill_only_time_12hrs_format", date_format(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"), "hh:mm:ss a"))
      .withColumn("bill_date_custom_format", date_format(to_date(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss")), "MM/dd/yyyy"))
      .withColumn("bill_date_year", date_format(to_date(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss")), "yyyy"))
      .withColumn("bill_date_Hour", date_format(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"), "HH"))
      .withColumn("product_category", when(col("b.prod_id").equalTo(lit("21")).or(col("b.prod_id").equalTo(lit("22"))), "Hair Care")
        .when(col("b.prod_id").equalTo(lit("23")).or(col("b.prod_id").equalTo(lit("24"))), "Dental Care")
        .when(col("b.prod_id").equalTo(lit("25")).or(col("b.prod_id").equalTo(lit("26"))), "Juices")
        .otherwise("unclassified"))
    billDFWithAdditionalCols.show()


+-------+-------------------+---+-----+-------------------+---------------+----------+---------------------------+---------------------------+-----------------------+--------------+--------------+--------------------+
|prod_id|          bill_time|qty|price|     bill_timestamp|bill_time_epoch| bill_date|bill_only_time_24hrs_format|bill_only_time_12hrs_format|bill_date_custom_format|bill_date_year|bill_date_Hour|    product_category|
+-------+-------------------+---+-----+-------------------+---------------+----------+---------------------------+---------------------------+-----------------------+--------------+--------------+--------------------+
|     21|04/24/2020 18:30:31|  2| 10.0|2020-04-24 18:30:31|     1587733231|2020-04-24|                   18:30:31|                06:30:31 PM|             04/24/2020|          2020|            18|           Hair Care|
|     22|04/24/2020 18:30:32|  1| 15.5|2020-04-24 18:30:32|     1587733232|2020-04-24|                   18:30:32|                06:30:32 PM|             04/24/2020|          2020|            18|           Hair Care|
|     23|04/24/2020 18:30:33|  3|24.25|2020-04-24 18:30:33|     1587733233|2020-04-24|                   18:30:33|                06:30:33 PM|             04/24/2020|          2020|            18|         Dental Care|
|     25|04/24/2020 18:30:34|  1| 8.75|2020-04-24 18:30:34|     1587733234|2020-04-24|                   18:30:34|                06:30:34 PM|             04/24/2020|          2020|            18|              Juices|
|     26|04/24/2020 18:30:35|  4| 16.0|2020-04-24 18:30:35|     1587733235|2020-04-24|                   18:30:35|                06:30:35 PM|             04/24/2020|          2020|            18|              Juices|
|     28|04/24/2020 18:30:36|  2| 20.0|2020-04-24 18:30:36|     1587733236|2020-04-24|                   18:30:36|                06:30:36 PM|             04/24/2020|          2020|            18|unclassified_cate...|
|     29|04/24/2020 18:30:37|  2| 10.0|2020-04-24 18:30:37|     1587733237|2020-04-24|                   18:30:37|                06:30:37 PM|             04/24/2020|          2020|            18|unclassified_cate...|
|     30|04/24/2020 18:30:38|  2|  9.0|2020-04-24 18:30:38|     1587733238|2020-04-24|                   18:30:38|                06:30:38 PM|             04/24/2020|          2020|            18|unclassified_cate...|
|     31|04/24/2020 18:30:39|  2| 14.0|2020-04-24 18:30:39|     1587733239|2020-04-24|                   18:30:39|                06:30:39 PM|             04/24/2020|          2020|            18|unclassified_cate...|
|     32|04/24/2020 18:30:40|  2|  6.0|2020-04-24 18:30:40|     1587733240|2020-04-24|                   18:30:40|                06:30:40 PM|             04/24/2020|          2020|            18|unclassified_cate...|
|     33|04/24/2020 18:30:41|  2|  4.0|2020-04-24 18:30:41|     1587733241|2020-04-24|                   18:30:41|                06:30:41 PM|             04/24/2020|          2020|            18|unclassified_cate...|
|     34|04/24/2020 18:30:42|  2| 30.0|2020-04-24 18:30:42|     1587733242|2020-04-24|                   18:30:42|                06:30:42 PM|             04/24/2020|          2020|            18|unclassified_cate...|
|     36|04/24/2020 18:30:43|  2| 18.0|2020-04-24 18:30:43|     1587733243|2020-04-24|                   18:30:43|                06:30:43 PM|             04/24/2020|          2020|            18|unclassified_cate...|
+-------+-------------------+---+-----+-------------------+---------------+----------+---------------------------+---------------------------+-----------------------+--------------+--------------+--------------------+

Filtering the elements from the Dataframe


billDF.as("b").filter(col("b.price").gt(lit(20.0))).show()

+-------+-------------------+---+-----+
|prod_id|          bill_time|qty|price|
+-------+-------------------+---+-----+
|     23|04/24/2020 18:30:33|  3|24.25|
|     34|04/24/2020 18:30:42|  2| 30.0|
+-------+-------------------+---+-----+


Get distinct elements 

billDF.select(col("qty")).distinct().show(false)

+---+
|qty|
+---+
|2  |
|1  |
|3  |
|4  |
+---+

Math operations: round, floor, ceil


val nums = List(Row(1.1), Row(1.2), Row(1.3), Row(1.4), Row(1.5), Row(1.6), Row(1.7), Row(1.8), Row(1.9), Row(2.0))

    val schema = StructType(List(
      StructField("nums", DoubleType, false)))

    // Create RDD from Seq
    val rdd = spark.sparkContext.parallelize(nums)

    // Convert RDD to Dataframe
    val num_df = spark.createDataFrame(rdd, schema)
    num_df.show()

    val new_df = num_df.as("n").withColumn("num_round", round(col("n.nums")))
      .withColumn("num_ceil", ceil(col("n.nums")))
      .withColumn("num_floor", floor(col("n.nums")))
      .withColumn("num_rint", rint(col("n.nums")))

    new_df.show()

+----+---------+--------+---------+--------+
|nums|num_round|num_ceil|num_floor|num_rint|
+----+---------+--------+---------+--------+
| 1.1|      1.0|       2|        1|     1.0|
| 1.2|      1.0|       2|        1|     1.0|
| 1.3|      1.0|       2|        1|     1.0|
| 1.4|      1.0|       2|        1|     1.0|
| 1.5|      2.0|       2|        1|     2.0|
| 1.6|      2.0|       2|        1|     2.0|
| 1.7|      2.0|       2|        1|     2.0|
| 1.8|      2.0|       2|        1|     2.0|
| 1.9|      2.0|       2|        1|     2.0|
| 2.0|      2.0|       2|        2|     2.0|
+----+---------+--------+---------+--------+


No comments:

Post a Comment