I am planning to share my knowledge on Apache Spark RDD, Dataframes API and some tips and tricks. If I combine everything into one then it would be a very lengthy article. Therefore I am dividing the long article into three separate articles and this article is the first series in that continuation.
- Spark RDD API
- Dataframe API
- Tips and tricks on RDD API and Dataframe API.
Let us start with basics of RDD API. Resilient Distributed Dataset(RDD) is, essentially, the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it. RDD could come from any datasource, e.g. text files, JSON, csv file, a database via JDBC etc.
Here in demo I am using Scala prompt spark-shell to show usage of API like below:-
[root@victoria bin]# /usr/hdp/current/spark-client/bin/spark-shell
17/12/06 01:48:39 INFO SecurityManager: Changing view acls to: root
17/12/06 01:48:39 INFO SecurityManager: Changing modify acls to: root
17/12/06 01:48:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/12/06 01:48:41 INFO HttpServer: Starting HTTP Server
17/12/06 01:48:41 INFO Server: jetty-8.y.z-SNAPSHOT
17/12/06 01:48:41 INFO AbstractConnector: Started SocketConnector@https://www.linkedin.com/redir/invalid-link-page?url=0%2e0%2e0%2e0:46329
17/12/06 01:48:41 INFO Utils: Successfully started service 'HTTP class server' on port 46329.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.3
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
..
..
scala>
MAP:
Map performs a conversion operation on the dataset, each element of the RDD executes the function to generate new data.
map(f: T=>U) RDD [T] => RDD [U] One to one conversion
scala> val arr = Array((1,"a"),(2,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f"),(7,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,b), (3,c), (4,d), (5,e), (6,f), (7,g))
scala> val rdd = sc.parallelize(arr).map(f=>("A"+f._1*10,f._2+"#"))
rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at <console>:23
scala> println(rdd.collect().mkString(","))
17/12/06 02:08:01 INFO SparkContext: Starting job: collect at <console>:26
17/12/06 02:08:01 INFO DAGScheduler: Got job 0 (collect at <console>:26) with 2 output partitions
17/12/06 02:08:04 INFO DAGScheduler: ResultStage 0 (collect at <console>:26) finished in 1.687 s
17/12/06 02:08:04 INFO DAGScheduler: Job 0 finished: collect at <console>:26, took 3.053065 s
(A10,a#),(A20,b#),(A30,c#),(A40,d#),(A50,e#),(A60,f#),(A70,g#)
flatMap
Same as map, but an element may generate multiple result data.
flatMap(f: T=>Seq[U]) RDD [T] => RDD [U] One to many conversion
flatMap(f: T=>Seq[U]) RDD [T] => RDD [U]
One to many conversion
scala> val arr = Array("1#2#3","4#5","6")
17/12/06 02:10:59 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:44644 in memory (size: 1208.0 B, free: 511.1 MB)
17/12/06 02:10:59 INFO ContextCleaner: Cleaned accumulator 1
arr: Array[String] = Array(1#2#3, 4#5, 6)
scala> val rdd = sc.parallelize(arr).flatMap(f=>f.split("#"))
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:23
scala> println(rdd.collect().mkString(","))
1,2,3,4,5,6
Filter
Filtering operation returns data whose function f is true
filter(f: T=>Bool) RDD [T] => RDD [T] filter
scala> val arr = Array("1#2#3","4#5","6")
arr: Array[String] = Array(1#2#3, 4#5, 6)
scala> val rdd = sc.parallelize(arr).filter(f=>f.length>=3)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:23
scala> println(rdd.collect().mkString(","))
1#2#3,4#5
Distinct
Go heavy operation.
scala> val arr = Array(1,2,3,2,3,4,5,4,6)
arr: Array[Int] = Array(1, 2, 3, 2, 3, 4, 5, 4, 6)
scala> val rdd = sc.parallelize(arr).distinct()
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at distinct at <console>:23
scala> println(rdd.collect().mkString(","))
4,6,2,1,3,5
Sample/Random sampling.
If the first parameter is true, there may be duplicate elements, if it is false, there will be no duplicate elements; the second parameter takes the value of [0,1], the last number of data is about equal to the second parameter Multiplied by the total number; the third parameter is a random factor.
scala> val arr = 1 to 20
arr: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:23
scala> val a = rdd.sample(true,0.5,10)
a: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[11] at sample at <console>:25
scala> val b = rdd.sample(false,0.5,10)
b: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[12] at sample at <console>:25
scala> println("a:"+a.collect().mkString(","))
a:2,7,11,12,12,15,15,18
scala> println("b:"+b.collect().mkString(","))
b:7,8,11,13,15,16,17,18,19,20
groupByKey
The same key value together
scala> val arr = Array((1,"a"),(2,"b"),(2,"c"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr: Array[(Int, String)] = Array((1,a), (2,b), (2,c), (1,b), (3,c), (4,d), (2,d))
scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[13] at parallelize at <console>:23
scala> val a = rdd.groupByKey()
a: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[14] at groupByKey at <console>:25
scala> println(a.collect().mkString(","))
(3,CompactBuffer(c)),(4,CompactBuffer(d)),(1,CompactBuffer(a, b)),(2,CompactBuffer(b, c, d))
combineByKey
Perform processing and aggregation on elements of the same key value. CombineByKey three parameters createCombiner: V => C, the first element of the same key when entering mergeValue: (C, V) => C, the same element of the Nth N key (N ==2) When merged with the first function result merging function mergeCombiners: (C, C) => C, the same key different partition generated by the merger function of the second function result.
scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (1,b), (3,c), (4,d), (2,d))
scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[15] at parallelize at <console>:23
scala> val a = rdd.combineByKey(f=>new StringBuffer(f),(a:StringBuffer,b:String)=>(a.append(b)),(c:StringBuffer,d:StringBuffer)=>(c.append(d)))
a: org.apache.spark.rdd.RDD[(Int, StringBuffer)] = ShuffledRDD[16] at combineByKey at <console>:39
scala>
scala> println(a.collect().mkString(","))
(3,c),(4,d),(1,ab),(2,cbd)
reduceByKey
Reduce the same key implementation
scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (1,b), (3,c), (4,d), (2,d))
scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at parallelize at <console>:23
scala> val a = rdd.reduceByKey((a,b)=>a+"|"+b)
a: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[18] at reduceByKey at <console>:39
scala> println(a.collect().mkString(","))
(3,c),(4,d),(1,a|b),(2,c|b|d)
union
Two RDD aggregates
scala> val arr = Array((1,"a"),(2,"c"),(2,"b"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b))
scala> val arr2 = Array((1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr2: Array[(Int, String)] = Array((1,b), (3,c), (4,d), (2,d))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[19] at parallelize at <console>:23
scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:23
scala> val a = rdd1.union(rdd2)
a: org.apache.spark.rdd.RDD[(Int, String)] = UnionRDD[21] at union at <console>:29
scala> println(a.collect().mkString(","))
(1,a),(2,c),(2,b),(1,b),(3,c),(4,d),(2,d)
join
Two RDD key, join contains join, fullOuterJoin, leftOuterJoin, rightOuterJoin and other methods, the usage is similar to the SQL operation.
scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (5,g))
scala> val arr2 = Array((1,"B"),(3,"C"),(4,"D"),(2,"D"),(2,"E"))
arr2: Array[(Int, String)] = Array((1,B), (3,C), (4,D), (2,D), (2,E))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[22] at parallelize at <console>:23
scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at parallelize at <console>:23
scala> val a = rdd1.join(rdd2)
a: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[26] at join at <console>:29
scala> val b = rdd1.fullOuterJoin(rdd2)
b: org.apache.spark.rdd.RDD[(Int, (Option[String], Option[String]))] = MapPartitionsRDD[29] at fullOuterJoin at <console>:29
scala> val c = rdd1.leftOuterJoin(rdd2)
c: org.apache.spark.rdd.RDD[(Int, (String, Option[String]))] = MapPartitionsRDD[32] at leftOuterJoin at <console>:29
scala> val d = rdd1.rightOuterJoin(rdd2)
d: org.apache.spark.rdd.RDD[(Int, (Option[String], String))] = MapPartitionsRDD[35] at rightOuterJoin at <console>:29
scala> println("join:"+a.collect().mkString(","))
join:(1,(a,B)),(2,(c,D)),(2,(c,E)),(2,(b,D)),(2,(b,E))
scala> println("fullOuterJoin:"+b.collect().mkString(","))
fullOuterJoin:(3,(None,Some(C))),(4,(None,Some(D))),(1,(Some(a),Some(B))),(5,(Some(g),None)),(2,(Some(c),Some(D))),(2,(Some(c),Some(E))),(2,(Some(b),Some(D))),(2,(Some(b),Some(E)))
scala> println("leftOuterJoin:"+c.collect().mkString(","))
leftOuterJoin:(1,(a,Some(B))),(5,(g,None)),(2,(c,Some(D))),(2,(c,Some(E))),(2,(b,Some(D))),(2,(b,Some(E)))
scala> println("rightOuterJoin:"+d.collect().mkString(","))
rightOuterJoin:(3,(None,C)),(4,(None,D)),(1,(Some(a),B)),(2,(Some(c),D)),(2,(Some(c),E)),(2,(Some(b),D)),(2,(Some(b),E))
cartesian
Cartesian product operation
scala> val arr = Array((1,"a"),(2,"c"))
arr: Array[(Int, String)] = Array((1,a), (2,c))
scala> val arr2 = Array((3,"C"),(4,"D"),(5,"E"))
arr2: Array[(Int, String)] = Array((3,C), (4,D), (5,E))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[50] at parallelize at <console>:23
scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[51] at parallelize at <console>:23
scala> val a = rdd1.cartesian(rdd2)
a: org.apache.spark.rdd.RDD[((Int, String), (Int, String))] = CartesianRDD[52] at cartesian at <console>:29
scala> println(a.collect().mkString("\n"))
((1,a),(3,C))
((1,a),(4,D))
((1,a),(5,E))
((2,c),(3,C))
((2,c),(4,D))
((2,c),(5,E))
cogroup
Multiple RDD aggregations : Output show that if the key does not match will default to a null value
scala> val arr = Array((1,"a"),(2,"c"))
arr: Array[(Int, String)] = Array((1,a), (2,c))
scala> val arr2 = Array((1,"B"),(3,"C"))
arr2: Array[(Int, String)] = Array((1,B), (3,C))
scala> val arr3 = Array((1,"C"),(3,"D"))
arr3: Array[(Int, String)] = Array((1,C), (3,D))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[36] at parallelize at <console>:23
scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:23
scala> val rdd3 = sc.parallelize(arr3)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:23
scala> val a = rdd1.cogroup (rdd2, rdd3) // Support 1-3 RDD parameters
a: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[40] at cogroup at <console>:33
scala> println(a.collect().mkString(","))
(3,(CompactBuffer(),CompactBuffer(C),CompactBuffer(D))),(1,(CompactBuffer(a),CompactBuffer(B),CompactBuffer(C))),(2,(CompactBuffer(c),CompactBuffer(),CompactBuffer()))
mapValue
The value of the key, value structure data (two tuples) implementation of the map
scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (5,g))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[41] at parallelize at <console>:23
scala> val a = rdd1.mapValues(f=>f.toUpperCase())
a: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[42] at mapValues at <console>:25
scala> println(a.collect().mkString(","))
(1,A),(2,C),(2,B),(5,G)
sort
RDD sort sortBy and sortByKey and other methods.
scala> val arr = Array((1,"a"),(2,"c"),(3,"b"),(5,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (3,b), (5,g))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[43] at parallelize at <console>:23
scala> val a = rdd1.sortByKey(true)
scala> println(a.collect().mkString(","))
(1,a),(2,c),(3,b),(5,g)
scala> val b = rdd1.sortByKey(false)
scala> println(b.collect().mkString(","))
(5,g),(3,b),(2,c),(1,a)
count
count returns the length of the RDD
scala> val arr = Array((1,"a"),(2,"c"),(2,"c"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,c))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[53] at parallelize at <console>:23
scala> val a = rdd1.count()
a: Long = 3
scala> println(a)
3
collect
collect will trigger the execution of the data and generate the result set
reduce
scala> val arr = Array((1,"a"),(2,"c"),(2,"D"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,D))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[54] at parallelize at <console>:23
scala> val a = rdd1.reduce((a,b)=>(a._1+b._1,a._2+b._2))
a: (Int, String) = (5,aDc)
scala> println(a)
(5,aDc)
lookup
lookup query key for the participation of all data sets
scala> val arr = Array((1,"a"),(2,"c"),(2,"D"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,D))
scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[55] at parallelize at <console>:23
scala> val a = rdd1.lookup(2)
a: Seq[String] = WrappedArray(c, D)
scala> println(a.mkString(","))
c,D
zip
The two RDD fight together. Note that zip must have the same number of partitions for both RDDs and the same length.
scala> val a = Array(1,2,3,4,5,6,7,8,9)
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> val b = Array("a","b","c","d","e","f","g","h","i")
b: Array[String] = Array(a, b, c, d, e, f, g, h, i)
scala> val m1 = sc.parallelize(a,3)
m1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[60] at parallelize at <console>:23
scala> val m2 = sc.parallelize(b,3)
m2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[61] at parallelize at <console>:23
scala> val m3 = m1.zip(m2)
m3: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[62] at zip at <console>:29
scala> println("m3:"+m3.collect().mkString(","))
m3:(1,a),(2,b),(3,c),(4,d),(5,e),(6,f),(7,g),(8,h),(9,i)
scala> val m4 = m1.zipWithIndex()
m4: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[63] at zipWithIndex at <console>:25
scala> println("m4:"+m4.collect().mkString(","))
m4:(1,0),(2,1),(3,2),(4,3),(5,4),(6,5),(7,6),(8,7),(9,8)
scala> val m5 = m1.zipWithUniqueId()
m5: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[64] at zipWithUniqueId at <console>:25
scala> println("m5:"+m5.collect().mkString(","))
m5:(1,0),(2,3),(3,6),(4,1),(5,4),(6,7),(7,2),(8,5),(9,8)
subtract
Calculate the remainder of two RDDs
scala> val a = Array(1,2,3,4,5,6,7,8,9)
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> val b = Array(3,4)
b: Array[Int] = Array(3, 4)
scala> val m1 = sc.parallelize(a,3)
m1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:23
scala> val m2 = sc.parallelize(b,2)
m2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at parallelize at <console>:23
scala> val m3 = m1.subtract(m2)
m3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[70] at subtract at <console>:29
scala> println("m3:"+m3.collect().mkString(","))
m3:6,9,1,7,2,5,8
scala> val a = Array(1,2,3,4,5,6,7,8,9)
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> val b = Array(3,4)
b: Array[Int] = Array(3, 4)
scala> val m1 = sc.parallelize(a,3)
m1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:23
scala> val m2 = sc.parallelize(b,2)
m2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at parallelize at <console>:23
scala> val m3 = m1.subtract(m2)
m3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[70] at subtract at <console>:29
scala> println("m3:"+m3.collect().mkString(","))
m3:6,9,1,7,2,5,8
coalesce/repartition
scala> val rdd = sc.parallelize ( Array ( 1 , 2 , 3 , 4 , 5 , 6 ), 3 )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:21
scala> val rdd1 = rdd.coalesce ( 1 , false )
rdd1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[72] at coalesce at <console>:23
scala> val rdd2 = rdd.repartition ( 1 )
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[76] at repartition at <console>:23
scala> println("rdd:"+rdd.partitions.length)
rdd:3
scala> println("rdd1:"+rdd1.partitions.length)
rdd1:1
scala> println("rdd2:"+rdd2.partitions.length)
rdd2:1
continue…