How to rename Hadoop files using wildcards while patterns

Rename function cannot rename the file if you are providing wildcard characters for finding files in hadoop path. To do that first need to search file using patterns and then use searched path in rename function. Here is the code to search hadoop files using wildcards(^,*) using Hadoop file system’s globstatus method. It will give […]

Continue reading


How to do SQL aggregation on dataframes

Here is the example of how to perform sum(), count(), groupBy operation in DataFrames in Spark //Read the files from hdfs and create a dataframe by applying a schema on that val filePath = “hdfs://user/Test/*.csv” val User_Dataframe = sqlContext.read.format(“com.databricks.spark.csv”).option(“header”, “false”).option(“mode”, “permissive”).load(filePath) //Define Schema val User_schema = StructType(Array(StructField(“USER_ID”, StringType, true), StructField(“APP_ID”, StringType, true), StructField(“TIMESTAMP”, StringType, true), […]

Continue reading


Scala code to get 6 month’s date from current date

This is the code to get 6 month’s date from parsed date witten in scala: import org.joda.time import org.joda.time._ import org.joda.time.format._ val CurrentDate =”2017-02-12″ // Sample of parsed date val d1 = new DateTime(CurrentDate.toString()) val d2 = d1.minusMonths(6) val dateAfter6months = dayCalc(d2.toYearMonthDay().toString()) println(dateAfter6months)  

Continue reading


groupByKey, reduceByKey, aggregateByKey and combineByKey in Spark

1. ReduceByKey : Data is combined so that at each partition there is at most one value for each key and then shuffle happens and it is sent over the network to some particular executor for some action such as reduce. val words = Array(“one”, “two”, “two”, “three”, “three”, “three”) val wordRDD = sc.parallelize(words).map(word => […]

Continue reading


Cache vs persist method in Spark

The difference between cache and persist operations that cache is persist(MEMORY_ONLY), i.e. cache is merely persist with the default storage level MEMORY_ONLY whereas persist() can be as: MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER,DISK_ONLY,MEMORY_ONLY_2, MEMORY_AND_DISK_2 cache is having below representation: cache(): this.type = persist() cache is a synonym of persist with MEMORY_ONLY storage level. Eg. cache: – rdd.persist(StorageLevel.MEMORY_ONLY). persist is having […]

Continue reading


SQL count function example using spark dataframes

Here I want to get count of person having name as ‘Joy’ out of total count of person’s working in a company First I am defining a class class to define schema for input file case class Person(name: String, id: Long) // For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD […]

Continue reading


How to set hive configuration in spark code

If you want to use hive for data loading or quering in spark and want to optiize your queries to fetch partitioned data then set below mentioned configurations in our spark code hiveTest.scala object hiveTest  { def main(args: Array[String]) { //Creation of spark context System.setProperty(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) val conf = new SparkConf().setAppName(“Load_Hive_Data”) conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) val sc […]

Continue reading


Simple way to analyse your data using Spark

If you want to perform sql operation in data to be analysed the simplest way in spark is as follows: Read the data from your file and create a dataframe for that: val userDF = spark.read.format(“json”).load(“user/Test/userdata.json”) userDF.select(“name”, “userid”).write.format(“parquet”).save(“namesAndUid.parquet”) Instead of using read API to load a file into DataFrame and query it, you can also […]

Continue reading


How to index elements of RDD and dataframes

How to index elements of RDD and dataframes There is a ‘zipWithIndex’ function available in scala which creates indexs for each elements of each partitions of rdd. Here I am creating indexes for rdd created using parallelized collection scala> val r1 = sc.parallelize(List(“A”, “B”, “C”, “D”, “E”, “F”, “G”), 3) scala> val r2 = r1.zipWithIndex […]

Continue reading


How to create daywise partition in hadoop while loading data using apache nifi

How to create daywise partition in hadoop while loading data using apache nifi? Here is the complete flow to get the JSON file from source. Here I want to store my data as a csv file in hadoop and data should be ingested daywise in every date folder. GetFile-> EvaluateJSONPath->ReplaceText->UpdateAttribute->PutHDFS   1.Using ‘Getfile’ I am […]

Continue reading