RDD持久化、广播、累加器实质上分别涉及了RDD的数据如何保存,RDD在构建高效算法的时候涉及了persist或者checkpoint,以及广播和累加器,通过spark-shell可以试验一些小功能,spark-shell本身是spark的发行包推出的一个程序,通过这个程序可以直接写代码,spark-shell会把代码直接进行运行。
1.1. RDD持久化实战
从2个层面考虑持久化:
1)操作RDD的时候怎么保存结果(属于Action的部分)
下面使用Spark-shell进行实战:
1.1.1. Action级别的操作进行持久化——启动运行环境
我们使用基于Hadoop的HDFS的文件系统,所以只需启动Hadoop的HDFS即可:
查看启动是否成功:
启动Spark集群:
启动日志管理器:
启动spark-shell:
构建一个RDD:
1.1.1.1. reduce
执行一个Action操作:
1.1.1.2. map
1.1.1.3. collect
把各个Executor上的结果进行收集后在集群终端显示。
我们可以看一下collect的源码:(RDD.scala 926行)
/** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
collect返回一个Araay,它的工作机制流程图如下图:
如果想在命令终端上看到结果必须使用collect。
凡是Action级别的操作都会触发sc.runJob:
1.1.1.4. count
1.1.1.5. take
1.1.1.6. countByKey
/** * Count the number of elements for each key, collecting the results to a local Map. * * Note that this method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }
统计每个key出现的次数:
1.1.1.7. saveAsTextFile
saveAsTextFile可以直接把数据写到HDFS上。
2)在实现算法的时候要进行cache、persist,另外还有一个是checkpoint
1.1.2. 通过persist进行持久化
Spark在默认情况下它的数据是放在内存中的,放在内存中适合高速的迭代。在一下情况下需要持久化:
1)在某步骤计算特别耗时
2)计算链条特别长的情况
3)checkpoint所在的RDD也一定要持久化(在checkpoint之前persist)
4)shuffle之后
5)shuffle之前(框架默认帮助我们把数据持久化到本地磁盘),如果shuffle出错的话,所有的父RDD都要重新计算,代价很大的。
如果发现内存经常不够用或出现OOM的话,一个非常好的方式就是MEMORY中的内容序列化,当然了,在使用数据的时候需要反序列化,反序列话是很消耗CPU的;
下面两个是优先放到内存,在内存放不下的情况下再放到磁盘:
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
上面这两个执行时间没什么区别,但是如果先cache再count
执行时间就会提高将近20倍。
由此也得出一个结论:cache后一定不能立即有其它算子!!!
cache不是一个Action,因为它并没有执行一个作业。persist是lazy级别的,unpersist是eager级别的,cache之后可以使用unpersist清除cache。cache只放在内存,而persist可以是内存也可以是磁盘。
1.2. Spark广播实战
在构建发算法时至关重要,无论是在降低网络传输的数据量、提高内存的使用效率,还是加快程序的运行速度,广播对于我们而言都是非常重要的。
为什么需要广播?
广播是由Driver发给当前Application分配的所有Executor内存级别的全局制度变量,Executor中的线程池中的线程共享该全局变量,极大地减少了网络传输(否则的话每个Task都要纯属一次该变量)并极大地节省了内存,当然也隐形的提高了CPU的有效工作。
1.3. Spark累加器实战
累加器在Spark集群中是一个全局的指针步减的变量,且在所有的Executor中只能修改累加器的内容,也就是说只能增加累加器的内容,在Executor中不可以读累加器的内容,在Driver中可以读累加器的内容。
备注:
资料来源于:DT_大数据梦工厂(IMF传奇行动绝密课程)-IMF
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
Life is short,you need to Spark.