Following are the frequently used code snippets as per my experience
Create spark session
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
Read Hive table
// Creating the spark session with Hive support val spark = SparkSession .builder() .appName("Sample Spark Hive") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() // Reading the Hive table using spark session spark.sql("SELECT * FROM <hive_schema>.<hive_table>")
Creating a Dataframe
val spark: SparkSession = createSparkSession /* List */ val data = Seq(Row(1111, "Prod1", "4/24/2020 21:06:10", 5.0), Row(1101, "Prod2", "4/24/2020 21:06:11", 4.5), Row(1102, "Prod3", "4/24/2020 21:06:12", 1.5), Row(1103, "Prod4", "4/24/2020 21:06:13", 2.5), Row(1104, "Prod5", "4/24/2020 21:06:14", 3.0)) val order_schema = StructType(List( StructField("order_id", IntegerType, false), StructField("product_name", StringType, false), StructField("purchase_date", StringType, false), StructField("price", DoubleType, true) )) // Create RDD from Seq val seqRdd = spark.sparkContext.parallelize(data) // Convert RDD to Dataframe val df = spark.createDataFrame(seqRdd, order_schema) df.printSchema() df.show()
Reading CSV file
// Read CSV file val fileDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\orders.csv") fileDF.show(10, false) // If the file doesn't contain header the infer schema val fileDFSchemaInfered = spark.read.option("header","false").option("delimiter","|").schema(order_schema).csv("C:\\tmp\\orders.csv") fileDFSchemaInfered.show(10, false)
Writing the Dataframe as a CSV and Parquet file
// Writing the dataframe as a CSV file fileDF.write.option("header","true").option("delimiter",",").mode(SaveMode.Overwrite).csv("C:\\tmp\\orders_as_csv.csv") // Writing the dataframe as a CSV file fileDF.write.option("header","true").option("delimiter",",").parquet("C:\\tmp\\orders_as_parquet.csv") // Reading the parquet file val parquetDF = spark.read.parquet("C:\\tmp\\orders_as_parquet.csv") parquetDF.show( false)
Generating the plan
// Read products val productsDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\products.csv") productsDF.show() // Read bill info val billDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\bill.csv") billDF.show() // Generate bill with product names val billWithDescriptionDF = billDF.as("b").join(productsDF.as("p"), billDF.col("prod_id").equalTo(productsDF.col("prod_id")), "left") .select("b.prod_id","p.prod_name", "b.bill_time","b.qty","b.price") billWithDescriptionDF.show() billWithDescriptionDF.explain("formatted") // Accepted explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'.
Plan output
== Physical Plan == * Project (8) +- * BroadcastHashJoin LeftOuter BuildRight (7) :- * Project (2) : +- BatchScan (1) +- BroadcastExchange (6) +- * Project (5) +- * Filter (4) +- BatchScan (3) (1) BatchScan Output: [prod_id#163, bill_time#164, qty#165, price#166] (2) Project [codegen id : 2] Output : [prod_id#163, bill_time#164, qty#165, price#166] Input : [prod_id#163, bill_time#164, qty#165, price#166] (3) BatchScan Output: [prod_id#97, prod_name#98] (4) Filter [codegen id : 1] Input : [prod_id#97, prod_name#98] Condition : isnotnull(prod_id#97) (5) Project [codegen id : 1] Output : [prod_id#97, prod_name#98] Input : [prod_id#97, prod_name#98] (6) BroadcastExchange Input: [prod_id#97, prod_name#98] (7) BroadcastHashJoin [codegen id : 2] Left keys: List(prod_id#163) Right keys: List(prod_id#97) Join condition: None (8) Project [codegen id : 2] Output : [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166] Input : [prod_id#163, bill_time#164, qty#165, price#166, prod_id#97, prod_name#98]
Plan with extended mode true
billWithDescriptionDF.explain(true)
== Parsed Logical Plan == 'Project [unresolvedalias('b.prod_id, None), unresolvedalias('p.prod_name, None), unresolvedalias('b.bill_time, None), unresolvedalias('b.qty, None), unresolvedalias('b.price, None)] +- Join LeftOuter, (prod_id#163 = prod_id#97) :- SubqueryAlias `b` : +- RelationV2[prod_id#163, bill_time#164, qty#165, price#166] csv file:/C:/tmp/bill.csv +- SubqueryAlias `p` +- RelationV2[prod_id#97, prod_name#98] csv file:/C:/tmp/products.csv == Analyzed Logical Plan == prod_id: string, prod_name: string, bill_time: string, qty: string, price: string Project [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166] +- Join LeftOuter, (prod_id#163 = prod_id#97) :- SubqueryAlias `b` : +- RelationV2[prod_id#163, bill_time#164, qty#165, price#166] csv file:/C:/tmp/bill.csv +- SubqueryAlias `p` +- RelationV2[prod_id#97, prod_name#98] csv file:/C:/tmp/products.csv == Optimized Logical Plan == Project [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166] +- Join LeftOuter, (prod_id#163 = prod_id#97) :- RelationV2[prod_id#163, bill_time#164, qty#165, price#166] csv file:/C:/tmp/bill.csv +- Filter isnotnull(prod_id#97) +- RelationV2[prod_id#97, prod_name#98] csv file:/C:/tmp/products.csv == Physical Plan == *(2) Project [prod_id#163, prod_name#98, bill_time#164, qty#165, price#166] +- *(2) BroadcastHashJoin [prod_id#163], [prod_id#97], LeftOuter, BuildRight :- *(2) Project [prod_id#163, bill_time#164, qty#165, price#166] : +- BatchScan[prod_id#163, bill_time#164, qty#165, price#166] CSVScan Location: InMemoryFileIndex[file:/C:/tmp/bill.csv], ReadSchema: struct<prod_id:string,bill_time:string,qty:string,price:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])), [id=#213] +- *(1) Project [prod_id#97, prod_name#98] +- *(1) Filter isnotnull(prod_id#97) +- BatchScan[prod_id#97, prod_name#98] CSVScan Location: InMemoryFileIndex[file:/C:/tmp/products.csv], ReadSchema: struct<prod_id:string,prod_name:string>
Frequently used function utilities:
Date format conversion
when and otherwise clause usage
val billDF = spark.read.option("header","true").option("delimiter","|").csv("C:\\tmp\\bill.csv") billDF.show(false)
Output:
+-------+-------------------+---+-----+ |prod_id| bill_time|qty|price| +-------+-------------------+---+-----+ | 21|04/24/2020 18:30:31| 2| 10.0| | 22|04/24/2020 18:30:32| 1| 15.5| | 23|04/24/2020 18:30:33| 3|24.25| | 25|04/24/2020 18:30:34| 1| 8.75| | 26|04/24/2020 18:30:35| 4| 16.0| | 28|04/24/2020 18:30:36| 2| 20.0| | 29|04/24/2020 18:30:37| 2| 10.0| | 30|04/24/2020 18:30:38| 2| 9.0| | 31|04/24/2020 18:30:39| 2| 14.0| | 32|04/24/2020 18:30:40| 2| 6.0| | 33|04/24/2020 18:30:41| 2| 4.0| | 34|04/24/2020 18:30:42| 2| 30.0| | 36|04/24/2020 18:30:43| 2| 18.0| +-------+-------------------+---+-----+
val billDFWithAdditionalCols = billDF.as("b").withColumn("bill_timestamp", to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss")) .withColumn("bill_time_epoch", unix_timestamp(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"))) .withColumn("bill_date", to_date(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"))) .withColumn("bill_only_time_24hrs_format", date_format(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"), "HH:mm:ss")) .withColumn("bill_only_time_12hrs_format", date_format(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"), "hh:mm:ss a")) .withColumn("bill_date_custom_format", date_format(to_date(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss")), "MM/dd/yyyy")) .withColumn("bill_date_year", date_format(to_date(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss")), "yyyy")) .withColumn("bill_date_Hour", date_format(to_timestamp(col("b.bill_time"), "MM/dd/yyy HH:mm:ss"), "HH")) .withColumn("product_category", when(col("b.prod_id").equalTo(lit("21")).or(col("b.prod_id").equalTo(lit("22"))), "Hair Care") .when(col("b.prod_id").equalTo(lit("23")).or(col("b.prod_id").equalTo(lit("24"))), "Dental Care") .when(col("b.prod_id").equalTo(lit("25")).or(col("b.prod_id").equalTo(lit("26"))), "Juices") .otherwise("unclassified")) billDFWithAdditionalCols.show()
+-------+-------------------+---+-----+-------------------+---------------+----------+---------------------------+---------------------------+-----------------------+--------------+--------------+--------------------+ |prod_id| bill_time|qty|price| bill_timestamp|bill_time_epoch| bill_date|bill_only_time_24hrs_format|bill_only_time_12hrs_format|bill_date_custom_format|bill_date_year|bill_date_Hour| product_category| +-------+-------------------+---+-----+-------------------+---------------+----------+---------------------------+---------------------------+-----------------------+--------------+--------------+--------------------+ | 21|04/24/2020 18:30:31| 2| 10.0|2020-04-24 18:30:31| 1587733231|2020-04-24| 18:30:31| 06:30:31 PM| 04/24/2020| 2020| 18| Hair Care| | 22|04/24/2020 18:30:32| 1| 15.5|2020-04-24 18:30:32| 1587733232|2020-04-24| 18:30:32| 06:30:32 PM| 04/24/2020| 2020| 18| Hair Care| | 23|04/24/2020 18:30:33| 3|24.25|2020-04-24 18:30:33| 1587733233|2020-04-24| 18:30:33| 06:30:33 PM| 04/24/2020| 2020| 18| Dental Care| | 25|04/24/2020 18:30:34| 1| 8.75|2020-04-24 18:30:34| 1587733234|2020-04-24| 18:30:34| 06:30:34 PM| 04/24/2020| 2020| 18| Juices| | 26|04/24/2020 18:30:35| 4| 16.0|2020-04-24 18:30:35| 1587733235|2020-04-24| 18:30:35| 06:30:35 PM| 04/24/2020| 2020| 18| Juices| | 28|04/24/2020 18:30:36| 2| 20.0|2020-04-24 18:30:36| 1587733236|2020-04-24| 18:30:36| 06:30:36 PM| 04/24/2020| 2020| 18|unclassified_cate...| | 29|04/24/2020 18:30:37| 2| 10.0|2020-04-24 18:30:37| 1587733237|2020-04-24| 18:30:37| 06:30:37 PM| 04/24/2020| 2020| 18|unclassified_cate...| | 30|04/24/2020 18:30:38| 2| 9.0|2020-04-24 18:30:38| 1587733238|2020-04-24| 18:30:38| 06:30:38 PM| 04/24/2020| 2020| 18|unclassified_cate...| | 31|04/24/2020 18:30:39| 2| 14.0|2020-04-24 18:30:39| 1587733239|2020-04-24| 18:30:39| 06:30:39 PM| 04/24/2020| 2020| 18|unclassified_cate...| | 32|04/24/2020 18:30:40| 2| 6.0|2020-04-24 18:30:40| 1587733240|2020-04-24| 18:30:40| 06:30:40 PM| 04/24/2020| 2020| 18|unclassified_cate...| | 33|04/24/2020 18:30:41| 2| 4.0|2020-04-24 18:30:41| 1587733241|2020-04-24| 18:30:41| 06:30:41 PM| 04/24/2020| 2020| 18|unclassified_cate...| | 34|04/24/2020 18:30:42| 2| 30.0|2020-04-24 18:30:42| 1587733242|2020-04-24| 18:30:42| 06:30:42 PM| 04/24/2020| 2020| 18|unclassified_cate...| | 36|04/24/2020 18:30:43| 2| 18.0|2020-04-24 18:30:43| 1587733243|2020-04-24| 18:30:43| 06:30:43 PM| 04/24/2020| 2020| 18|unclassified_cate...| +-------+-------------------+---+-----+-------------------+---------------+----------+---------------------------+---------------------------+-----------------------+--------------+--------------+--------------------+
Filtering the elements from the Dataframe
billDF.as("b").filter(col("b.price").gt(lit(20.0))).show()
+-------+-------------------+---+-----+ |prod_id| bill_time|qty|price| +-------+-------------------+---+-----+ | 23|04/24/2020 18:30:33| 3|24.25| | 34|04/24/2020 18:30:42| 2| 30.0| +-------+-------------------+---+-----+
Get distinct elements
billDF.select(col("qty")).distinct().show(false)
+---+ |qty| +---+ |2 | |1 | |3 | |4 | +---+
Math operations: round, floor, ceil
val nums = List(Row(1.1), Row(1.2), Row(1.3), Row(1.4), Row(1.5), Row(1.6), Row(1.7), Row(1.8), Row(1.9), Row(2.0)) val schema = StructType(List( StructField("nums", DoubleType, false))) // Create RDD from Seq val rdd = spark.sparkContext.parallelize(nums) // Convert RDD to Dataframe val num_df = spark.createDataFrame(rdd, schema) num_df.show() val new_df = num_df.as("n").withColumn("num_round", round(col("n.nums"))) .withColumn("num_ceil", ceil(col("n.nums"))) .withColumn("num_floor", floor(col("n.nums"))) .withColumn("num_rint", rint(col("n.nums"))) new_df.show()
+----+---------+--------+---------+--------+ |nums|num_round|num_ceil|num_floor|num_rint| +----+---------+--------+---------+--------+ | 1.1| 1.0| 2| 1| 1.0| | 1.2| 1.0| 2| 1| 1.0| | 1.3| 1.0| 2| 1| 1.0| | 1.4| 1.0| 2| 1| 1.0| | 1.5| 2.0| 2| 1| 2.0| | 1.6| 2.0| 2| 1| 2.0| | 1.7| 2.0| 2| 1| 2.0| | 1.8| 2.0| 2| 1| 2.0| | 1.9| 2.0| 2| 1| 2.0| | 2.0| 2.0| 2| 2| 2.0| +----+---------+--------+---------+--------+
No comments:
Post a Comment