Free Will

Spark笔记(4):RDD编程 Scala版本

创建

  • 从本地文件系统/分布式文件系统HDFS加载:
1
2
scala > val path = ''
scala > val lines = sc.textFile(path)
  • 通过并行集合(数组)创建
1
2
3
4
5
6
7
8
9
10
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)
array: Array[Int] = Array(1, 2, 3, 4, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:29
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(array)
list: List[Int] = List(1, 2, 3, 4, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29

RDD操作

惰性机制:整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”真正的计算。

转换操作

惰性求值,遇到行动操作才会触发“从头到尾”的真正的计算

  • filter
  • map
  • flatmap
  • groupByKey
  • reduceByKey

行动操作

真正触发计算的地方。

  • count():返回元素个数
  • collect():以数组形式返回所有元素
  • first():返回第一个元素
  • take(n):以数组形式返回前n个元素
  • reduce(func):通过函数func(输入两个参数并返回一个值)聚合元素
  • foreach(func):将每个元素传递到函数func中运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val rdd = sc.parallelize(Array(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27
rdd.count()
res28: Long = 6
rdd.first()
res29: Int = 1
rdd.take(3)
res30: Array[Int] = Array(1, 2, 3)
rdd.collect()
res31: Array[Int] = Array(1, 2, 3, 4, 5, 6)
rdd.reduce((a,b)=>a+b)
res32: Int = 21
rdd.take(5).foreach(println)
1
2
3
4
5
  • 注意:Local模式单机执行时,rdd.foreach(elem=>println(elem))会打印出一个RDD中的所有元素。但在集群模式下,在Worker节点上执行打印语句是输出到Worker节点的stdout中,而不是输出到任务控制节点Driver中,因此,任务控制节点Driver中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有Worker节点上的打印输出信息也显示到Driver中,就需要使用collect()方法。但collect()方法会把各个Worker节点上的所有RDD元素都抓到Driver中,因此,可能会导致Driver所在节点发生内存溢出。
  • 实例
1
2
3
4
5
val text = sc.parallelize(List("hadoop is good","spark is fast","spark is better"))
text: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[45] at parallelize at <console>:43
text.flatMap(line => line.split(" ")).map(word=> (word,1)).reduceByKey((a,b)=>a+b).take(10)
res155: Array[(String, Int)] = Array((is,3), (fast,1), (better,1), (spark,2), (hadoop,1), (good,1))

持久化

为避免重复计算产生的开销,可使用persist()方法对RDD标记为持久化。

  • persist(MEMORY_ONLY):将RDD作为反序列化的对象存储于JVM中,若内存不足,按照LRU原则(least recently used最近最少使用原则)替换缓存中的内容。
  • persist(MEMORY_AND_DISK):将RDD作为反序列化的对象存储于JVM中,若内存不足,超出的分区将会被存放在磁盘上。

  • cache()方法,会调用persist(MEMORY_ONLY)

1
2
3
4
5
6
7
8
9
10
11
12
13
val rdd = sc.parallelize(List("hadoop","spark","Hive"))
rdd.cache() // 会调用persist(MEMORY_ONLY),但是语句执行到这里,不会缓存rdd,因为这时的rdd还没有计算生成
println(rdd.count())// 第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
println(rdd.collect().mkString(","))//第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存的rdd
rdd.unpersist()
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[27] at parallelize at <console>:27
res109: rdd.type = ParallelCollectionRDD[27] at parallelize at <console>:27
3
hadoop,spark,Hive
res114: rdd.type = ParallelCollectionRDD[27] at parallelize at <console>:27

分区

分区的作用:

  • 增加并行度:分区可以在不同的工作节点上启动不同的线程进行并行处理
  • 减少通信开销:分布式系统中,通信代价巨大,控制数据分布以获得最少的网络传输可以极大地提升整体性能

如下图,当userData表和Events表进行连接时,默认情况下会将两个数据集中的所有key的哈希值都求出来,将哈希值相同的记录传送到同一台机器上,之后在该机器上对所有key相同的记录进行连接操作。这样每次进行连接操作都会有数据混洗的问题,造成很大的网络传输开销。
WechatIMG433.jpeg

实际上,由于userData这个RDD要比events大很多,可以先对userData进行哈希分区,这样在连接时,只有events表发生了数据混洗产生网络通信,userData是在本地引用的,不会产生网络开销。可以看出,Spark通过数据分区,对于一些特定类型的操作,比如join()/leftOuterJoin()/groupByKey()/reduceByKey()等,可以大大降低网络传输开销。

WechatIMG435.jpeg

分区原则:

分区个数尽量等于集群中CPU核心(core)数目。通过spark.default.parallelism配置默认分区数目

默认分区:

  • Local模式:默认是本地CPU数目
  • Standalone或YARN模式:在“集群中所有CPU核心数目总和”和“2”取大值
  • Mesos模式:默认分区为8

设置

  • 创建RDD时:sc.textFile(path,partitionNum),若从HDFS读取文件,分区数为文件分片数。

  • repartition

1
2
3
4
5
6
7
8
val rdd = sc.parallelize(List("hadoop","spark","Hive"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[30] at parallelize at <console>:27
rdd.partitions.size
res123: Int = 2
rdd.repartition(1).partitions.size
res124: Int = 1
  • 自定义分区
1
2
3
4
5
6
7
8
9
10
import org.apache.spark.HashPartitioner
class MyPartitioner(numParts:Int) extends HashPartitioner(numParts:Int){
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {key.toString.toInt%10}
}
data = sc.textFile(List(1,2,3,4,5,6))
data.map((_,1)).partitionBy(new MyPartitioner(10)).map((_._1)).take(10)

键值对

  • reduceByKey()
  • groupByKey()
  • keys()
  • values()
  • sortByKey()
  • sortBy()
  • mapValues()
  • join()
  • combineBykey()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
val text = sc.parallelize(Array(("hadoop",1),("spark",1),("spark",2),("hive",1)))
text: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[294] at parallelize at <console>:43
text.reduceByKey(_+_).take(10)
res521: Array[(String, Int)] = Array((hive,1), (spark,3), (hadoop,1))
text.groupByKey().map(x=>(x._1,x._2.sum)).take(10)
res522: Array[(String, Int)] = Array((hive,1), (spark,3), (hadoop,1))
text.keys.take(10)
res523: Array[String] = Array(hadoop, spark, spark, hive)
text.values.take(10)
res524: Array[Int] = Array(1, 1, 2, 1)
text.sortByKey().take(10)
res525: Array[(String, Int)] = Array((hadoop,1), (hive,1), (spark,1), (spark,2))
text.reduceByKey((x,y)=>x+y).sortBy(x=>x._2,false).take(10)
res526: Array[(String, Int)] = Array((spark,3), (hive,1), (hadoop,1))
text.reduceByKey((x,y)=>x+y).mapValues(x=>x+1).take(10)
res527: Array[(String, Int)] = Array((hive,2), (spark,4), (hadoop,2))
val text2 = sc.parallelize(List(("spark","best")))
text2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[311] at parallelize at <console>:44
text.join(text2).take(10)
res529: Array[(String, (Int, String))] = Array((spark,(1,best)), (spark,(2,best)))
val data = sc.parallelize(List(("c1",88),("c2",99),("c3",28),("c2",56),("c1",58)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[315] at parallelize at <console>:44
data.combineByKey(
(x)=>(x,1),
(x:(Int,Int),z) => (x._1+z,x._2+1),
(x:(Int,Int),y:(Int,Int)) => (x._1+y._1,x._2+y._2)
).map({case (x,y)=>(x,y._1/y._2.toFloat)}).take(10)
res531: Array[(String, Float)] = Array((c3,28.0), (c1,73.0), (c2,77.5))
data.mapValues(x=>(x,1)).reduceByKey((a,b)=>(a._1+b._1,a._2+b._2)).mapValues(x=>x._1/x._2.toFloat).take(10)
res532: Array[(String, Float)] = Array((c3,28.0), (c1,73.0), (c2,77.5))


本文结束 欢迎关注微信公众号【应统联盟】


一个集应统考研、机器学习、知识变现、实习求职于一身的微信公众号