Apache Spark- Two RDD Transformation
March 29, 2017
Two RDD transformation is to create a new dataset by using two datasets. There are only a few functions to support this transformation. So let’s take a look.
Related Topics
Union Function
We will first create a standalone program. More info can be found here to create a standalone program.
Purpose: Produce an RDD containing elements from both RDDs.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
object Spark {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[2]").set("spark.executor.memory", "1g");
val sc = new SparkContext(conf)
val firstRdd: RDD[Int] = sc.parallelize(List(1, 2, 3))
val secondRdd: RDD[Int] = sc.parallelize(List(3, 4, 5))
val result: Array[Int] = firstRdd.union(secondRdd).collect()
print("the result is : " + result.mkString(","))
sc.stop()
}
}
the result is : 1,2,3,3,4,5
Intersection Function
This time, the fragment of code will be shown from now on.
Purpose: RDD containing only elements found in both RDDs.
val firstRdd: RDD[Int] = sc.parallelize(List(1, 2, 3))
val secondRdd: RDD[Int] = sc.parallelize(List(3, 4, 5))
val result: Array[Int] = firstRdd.intersection(secondRdd).collect()
print("the result is : " + result.mkString(","))
the result is : 3
Subtract Function
Purpose: Remove the contents of one RDD (e.g.,remove training data).
Here, firstRdd has 1,2,3 and secondRdd has 3,4,5. When subtract function is called, the elements which are included on both Rdds are removed from firstRdd. Since 3 is included on both Rdds, 3 is removed from firstRdd. Therefore, only 1,2 are left on firstRdd.
val firstRdd: RDD[Int] = sc.parallelize(List(1, 2, 3))
val secondRdd: RDD[Int] = sc.parallelize(List(3, 4, 5))
val result: Array[Int] = firstRdd.subtract(secondRdd).collect()
print("the result is : " + result.mkString(","))
the result is : 2,1
Cartesian Function
Purpose: Cartesian product with the other RDD.
In this example, firstRdd has 1,2,3 and secondRdd has 3,4,5 . The first element of firstRdd 1 is paired with the first element of secondRdd 3. The resultant of this action generates (1,3). Next, the first element of firstRdd 1 is paired with the second element of secondRdd 4 which generates (1,4). And same pairing for the rest of the elements.
val firstRdd: RDD[Int] = sc.parallelize(List(1, 2, 3))
val secondRdd: RDD[Int] = sc.parallelize(List(3, 4, 5))
val result: Array[(Int, Int)] = firstRdd.cartesian(secondRdd).collect()
print("the result is : " + result.mkString(","))
the result is : (1,3),(1,4),(1,5),(2,3),(3,3),(2,4),(2,5),(3,4),(3,5)