Apache Spark- RDD Actions

March 30, 2017

RDD actions are RDD operations which don’t generate another RDD. Instead, RDD actions return a value of any types (such as List()) but not RDD[T]. And RDD actions are just like any other RDD operations. They are lazy; meaning they don’t compute right away, only when an action requires to return values. Let’s see how it’s done.

  1. Reference Book

Functions that will be covered

Collect Function

We will first create a standalone program. More info can be found here to create a standalone program.

Purpose: Return all elements from the RDD

This is pretty simple. You just collect the elements which are defined in Rdd.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

object Spark {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[2]").set("spark.executor.memory", "1g");
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3))
    val result: String = rdd.collect().mkString(",")
    println("the result is: " + result)

    sc.stop()
  }
}

the result is: 1,2,3


count Function

This time, the fragment of code will be shown from now on.

Purpose: Number of elements in the RDD.

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3))
    val result: Long = rdd.count()
    println("the result is: " + result)

the result is : 3


countByValue Function

Purpose: Number of times each element occurs in the RDD.

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 3))
    val result: collection.Map[Int, Long] = rdd.countByValue()
    println("the result is: " + result)

the result is: Map(2 -> 1, 1 -> 1, 3 -> 2)


countByKey Function

Purpose: Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

    val rdd: RDD[(Int, Int)] = sc.parallelize(List(1 -> 2, 3 -> 4, 3 -> 6))
    val result: collection.Map[Int, Long] = rdd.countByKey()
    println("the result is: " + result)

the result is: Map(1 -> 1, 3 -> 2)


take Function

Signature: take(n)

Purpose: Return an array with the first n elements of the dataset.

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 3))
    val result: Array[Int] =  rdd.take(2)
    println("the result is: " + result.mkString(","))

the result is: 1,2


takeOrdered Function

Signature: takeOrdered(n, [ordering])

Purpose: Return the first n elements of the RDD using either their natural order or a custom comparator.`

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 3))

    val firstResult: Array[Int] =  rdd.takeOrdered(2)
    println("the first result is: " + firstResult.mkString(","))

    val secondResult: Array[Int] =  rdd.takeOrdered(2)(Ordering[Int].reverse)
    println("the second result is: " + secondResult.mkString(","))

the first result is: 1,2

the second result is: 3,3


takeSample Function

Signature: takeSample(withReplacement, num, [seed])

Purpose: Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.`

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 3))
    val result: Array[Int] =  rdd.takeSample(withReplacement = false, 2)
    println("the first result is: " + result.mkString(","))

Note: The results are nondeterministic.

the result is: 2,3


reduce Function

Signature: reduce(func)

Purpose: Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.`

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 3))
    val result: Int =  rdd.reduce((a, b) => a + b)
    println("the result is: " + result)

the result is: 9