7 RDD常用算子(2)(rd算法)
itomcoil 2025-07-02 21:22 2 浏览
filter()
def filter(f: T => Boolean): RDD[T]
函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
object RDD_Transform_filter {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 filter
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val filterRDD = rdd.filter(num => num % 2 != 0)
filterRDD.collect.foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
1
3
应用场景,日志过滤。
object RDD_Transform_filter_LogFilter {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 groupBy
val rdd = sc.textFile("datas/apache.log")
var timeRDD = rdd.filter(
line => {
val data = line.split(" ")
val time = data(3)
time.startsWith("17/May/2023")
}
).collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
日志内容:
https://gitee.com/wuji1626/bigdata/blob/master/datas/apache.log
输出结果:
220.181.108.112 - - 17/May/2023:02:04:48 +0800 "GET / HTTP/1.1" 200 9603 www.sohu.com
220.181.108.112 - - 17/May/2023:01:04:58 +0800 "GET / HTTP/1.1" 200 9603 www.iqiyi.com
220.181.108.112 - - 17/May/2023:12:04:58 +0800 "GET / HTTP/1.1" 200 9603 www.163.com
sample()
函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
)
参数说明:
抽取数据不放回(伯努利算法)伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
要
- withReplacement:boolean,抽取的数据不放回,false;
- fraction:Double,抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
- seed:Long,随机数种子。
抽取数据放回(泊松算法)
- withReplacement:boolean,抽取的数据是否放回,true:放回;false:不放回;
- fraction:Double,重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数,但实际的抽取次数可能远大于期望值;
- seed:Long,随机数种子。
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
)
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")
withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
函数说明
根据指定的规则从数据集中抽取数据。
object RDD_Transform_sample {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 filter
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(rdd.sample(
withReplacement = false,
0.4,
1
).collect().mkString(","))
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
2,5,6,8
sample() 算子源码如下,
- 当 withReplacement 为 false 时,会使用 BernoulliSampler() 贝努力(伯努利)算法选取样本,贝努力算法相当于抛硬币,将符合人头或字的数字作为样本返回。
- 当 withReplacement 为 true 时,会使用 PoissonSampler() 离散概率分布算法生成样本。
应用场景:在产生数据倾斜时使用。在 shuffle 的情况下,数据可能出现倾斜,通过 sample() 算子获取一定数量的数据样本观察其中的某些 key 值是否重复的比例过高,如果反复取 sample 对应的 key 都有重复值较多的情况,就说明该 key 可能引起数据倾斜,需要加以处理。
distinct()
函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
函数说明
将数据集中数据去重。
object RDD_Transform_distinct {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 distinct
val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
val distinctRDD = rdd.distinct()
distinctRDD.collect.foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
1
2
3
4
Scala 集合的 distinct 方法,其中通过 HashSet 的方法识别重复值。
default Repr distinct() {
boolean isImmutable = this instanceof scala.collection.immutable.Seq;
if (isImmutable && this.lengthCompare(1) <= 0) {
return this.repr();
} else {
Builder b = this.newBuilder();
HashSet seen = new HashSet();
Iterator it = this.iterator();
boolean different = false;
...
}
RDD 的去重方式,主要看 partitioner 部分,由于 partitioner 默认为 null,因此主要逻辑体现在下述代码行:
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
以数组 List(1, 2, 3, 4, 1, 2, 3, 4) 为例:
- map(x => (x, null)):将数组变为:(1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null) 的 tuple 数组;
- reduceByKey((x, _) => x, numPartitions):对 tuple 数组做聚合 (1, null),(1, null),聚合后,key 不变,(null,null),取第一个值,即 null,所以聚合后的结果:(1, null),(2, null),(3, null),(4, null);
- map(_._1):只保留 tuple 中的第一个元素,(1, null),(2, null),(3, null),(4, null) 变回 1, 2, 3, 4。
distinct() 算子的完整源码如下:
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
// Create an instance of external append only map which ignores values.
val map = new ExternalAppendOnlyMap[T, Null, Null](
createCombiner = _ => null,
mergeValue = (a, b) => a,
mergeCombiners = (a, b) => a)
map.insertAll(partition.map(_ -> null))
map.iterator.map(_._1)
}
partitioner match {
case Some(_) if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
}
}
coalesce()
函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本。
object RDD_Transform_coalesce {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 distinct
val rdd = sc.makeRDD(List(1, 2, 3, 4), numSlices = 4)
val coalesceRDD = rdd.coalesce(2)
coalesceRDD.saveAsTextFile("output")
// Step3: 关闭环境
sc.stop()
}
}
生成的分区文件:
[1,2] [3,4]
如果原始数据与分区数据调整成下述的方式,数组有 6 个元素,变化前 3 个分区,变化后 2 个分区。
object RDD_Transform_coalesce {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 coalesce
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), numSlices = 3)
val coalesceRDD = rdd.coalesce(2)
coalesceRDD.saveAsTextFile("output")
// Step3: 关闭环境
sc.stop()
}
}
默认情况下,coalesce() 算子不打乱原始分区,如下图所示,coalesce() 采用的是整个分区合并的方式。
可以通过指定 shuffle 参数的方式,显式让 shuffle 执行。
val coalesceRDD = rdd.coalesce(2, shuffle = true)
指定 shuffle 参数后,分区随机进行打散:
[1,2] [3,4] [5,6] => [1,4,5] [2,3,6]
如果需要扩展分区,可以使用 coalesce() 进行分区扩展,如果不指定 shuffle = true,将无效,只有指定 shuffle = true 才能实现分区扩大。
repartition()
为让 coalesce() 功能更明确,只作为分区缩减,专门增加 repartition() 算子,进行分区扩展。
函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
函数说明
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。
查看 repartion() 源代码,发现底层就是调用 coalesce() 算子,并默认指定 shuffle = ture。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
sortBy()
根据指定规则对数据进行排序。
函数签名
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
函数说明
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。
object RDD_Transform_sortBy {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 sortBy
val rdd = sc.makeRDD(List(6, 2, 4, 5, 3, 1), numSlices = 2)
val sortRDD = rdd.sortBy(num => num)
sortRDD.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
数据的顺序:[6, 2, 4] [5, 3, 1] => [1, 2, 3] [4, 5, 6]
从结果知:sortBy() 算子要进行 shuffle。
应用场景,对 tuple 进行排序。
object RDD_Transform_sortBy_tuple {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 sortBy
val rdd = sc.makeRDD(List(("1",1),("11",2),("2",3)), numSlices = 2)
val sortRDD = rdd.sortBy(t=> t._1)
sortRDD.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
默认会按照字典序进行排序:
(1,1)
(11,2)
(2,3)
若不想按字典序排列,可以进行数据类型转换:
val sortRDD = rdd.sortBy(t=> t._1.toInt)
输出结果:
(1,1)
(2,3)
(11,2)
sortBy() 默认为升序,第二个方式可以改变排序方式:
val sortRDD = rdd.sortBy(t=> t._1,ascending = false)
输出结果,按字典序降序:
(2,3)
(11,2)
(1,1)
相关推荐
- MariaDB开窗函数(开窗函数max)
-
在使用GROUPBY子句时,总是需要将筛选的所有数据进行分组操作,它的分组作用域是整张表。分组以后,为每个组只返回一行。而使用基于窗口的操作,类似于分组,但却可以对这些"组"(即窗口...
- 你还不知道什么是MySQL窗口函数?(mysql5.7窗口函数)
-
MySQL中的窗口函数是一类用来在某一部分查询结果上进行计算的函数,这些函数的用法与普通的聚合函数如SUM、AVG、COUNT类似,但是与聚合函数不同的是,窗口函数不会讲多行数据合并成一行结果,而是...
- 精通88道题包你面试通过BAT-精简版-不得不收藏!
-
J2SE基础1.九种基本数据类型的大小,以及他们的封装类。2.Switch能否用string做参数?3.equals与==的区别。4.Object有哪些公用方法?5.Java的四种引用,强弱...
- Transact-SQL学习笔记21——排名窗口函数
-
将OVER()子句和排名函数连用,就是排名窗口函数,它们只能用在SELECT子句或ORDERBY子句之后。如果放在SELECT之后,它运行的逻顺序在DISTINCT之前。逻辑处理顺序如下:SE...
- MySQL8 窗口函数是真的省事!(mysql中的窗口函数)
-
@[toc]MySQL9已经出来了,MySQL8相信也慢慢走进各位小伙伴的工作中了。MySQL8还是有很多重量级变化的,一些底层优化大家在使用中有时候不易察觉,但是有一些用法,还是带给我们耳目一...
- Lodash 这 20 个方法,既高级又超级实用!
-
一、安全操作篇1._.get:防御性取值2._.set:智能路径赋值3._.invoke:安全方法调用二、集合处理篇4._.keyBy:快速对象映射5._.orderBy:多条件排序6._...
- Oracle有哪些常见的函数?(oracle常用函数有哪些)
-
恢复删除的数据insertinto'表名'select*from'表名'asofTIMESTAMPTO_TIMESTAMP("当前时间...
- excel的高级用法——宏,原来如此实用
-
使用excel时,直接手动计算或者输入公式,你会感到很苦恼或者操作很繁琐,如果使用vba直接输出结果,虽然效率很高,但是不够直观。excel宏最方便的用法是作为公式里的函数使用,打开宏编辑器,编写一个...
- 7 RDD常用算子(2)(rd算法)
-
filter()deffilter(f:T=>Boolean):RDD[T]函数说明将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分...
- 从零开始学SQL进阶,数据分析师必备SQL取数技巧,建议收藏
-
上一节给大家讲到SQL取数的一些基本内容,包含SQL简单查询与高级查询,需要复习相关知识的同学可以跳转至上一节,本节给大家讲解SQL的进阶应用,在实际过程中用途比较多的子查询与窗口函数,下面一起学习。...
- SQL窗口函数知多少?(sql窗口怎么执行)
-
我们在日常工作中是否经常会遇到需要排名的情况,比如:每个部门按业绩来排名,每人按绩效排名,对部门销售业绩前N名的进行奖励等。面对这类需求,我们就需要使用sql的高级功能——窗口函数。一、什么是窗口函数...
- SQL开窗函数讲解,让查询统计更简单
-
用了这么多关系型数据库产品,开源的商业的,如:Oracle、MySql(注意5.7以上版本才可以使用)、SqlServer、postgreSQL。如果从应用角度来看,谁都逃离不了增删改查;而查又是难点...
- mysql窗口函数(mysql窗口函数rank)
-
MySQL窗口函数是一种高级的SQL函数,它可以进行一些比较复杂的数据分析和处理。与传统的聚合函数不同,窗口函数不会合并行,而是根据特定的条件为每行分配一个值。MySQL窗口函数可以用来计算每...
- 一文讲懂SQL窗口函数 大厂必考知识点
-
大家好,我是宁一。今天是我们的第24课:窗口函数。窗口函数,也叫OLAP(OnlineAnallyticalProcessing,联机分析处理),可以对数据库数据进行实时分析处理。窗口函数是数据分...
- C++20 四大特性之一:Module 特性详解
-
C++20最大的特性是什么?最大的特性是迄今为止没有哪一款编译器完全实现了所有特性。文章来源:网易云信有人认为C++20是C++11以来最大的一次改动,甚至比C++11还要大。本文仅介绍...
- 一周热门
- 最近发表
- 标签列表
-
- ps图案在哪里 (33)
- super().__init__ (33)
- python 获取日期 (34)
- 0xa (36)
- super().__init__()详解 (33)
- python安装包在哪里找 (33)
- linux查看python版本信息 (35)
- python怎么改成中文 (35)
- php文件怎么在浏览器运行 (33)
- eval在python中的意思 (33)
- python安装opencv库 (35)
- python div (34)
- sticky css (33)
- python中random.randint()函数 (34)
- python去掉字符串中的指定字符 (33)
- python入门经典100题 (34)
- anaconda安装路径 (34)
- yield和return的区别 (33)
- 1到10的阶乘之和是多少 (35)
- python安装sklearn库 (33)
- dom和bom区别 (33)
- js 替换指定位置的字符 (33)
- python判断元素是否存在 (33)
- sorted key (33)
- shutil.copy() (33)