百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

8 RDD 常用算子(3)(rd算法)

itomcoil 2025-07-02 21:21 3 浏览

双值类型

两个数据源之间的关联操作。

intersection

函数签名

def intersection(other: RDD[T]): RDD[T]

函数说明

对源RDD和参数RDD求交集后返回一个新的RDD。

union()

函数签名

def union(other: RDD[T]): RDD[T]

函数说明

对源RDD和参数RDD求并集后返回一个新的RDD。

subtract()

函数签名

def subtract(other: RDD[T]): RDD[T]

函数说明

以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集。

intersection() 、union()、subtract() 不支持两个数据类型不一致的 RDD 进行求交集操作,参数要求与原 RDD 的元素类型一致。

zip()

函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

函数说明

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

拉链操作,两个 RDD 的数据类型可以不一致。

分区数不相等时,不能进行拉链操作。

val rdd1 = sc.makeRDD(List(1, 2, 3, 4), numSlices = 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 4)

// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd3 = rdd1.zip(rdd2)
println("Zip:" + rdd3.collect().mkString(","))

会报如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 4)

两个数据源中,分区中的数据数量也要保持一致。

val rdd4 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), numSlices = 2)
val rdd5 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 2)

// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd6 = rdd4.zip(rdd5)
println("Zip:" + rdd6.collect().mkString(","))

否则报如下错误:

org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

综合示例:

object RDD_Transform_MultipleValue {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 双Value 类型
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6))

    // 交集 期望【3,4】
    val rdd3 = rdd1.intersection(rdd2)
    println("Intersection:" + rdd3.collect().mkString(","))

    // 并集 期望【1,2,3,4,3,4,5,6】
    val rdd4 = rdd1.union(rdd2)
    println("Union:" + rdd4.collect().mkString(","))

    // 差集 期望【1,2】
    val rdd5 = rdd1.subtract(rdd2)
    println("Subtract:" + rdd5.collect().mkString(","))

    // 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
    val rdd6 = rdd1.zip(rdd2)
    println("Zip:" + rdd6.collect().mkString(","))


    // Step3: 关闭环境
    sc.stop()
  }

}

输出结果:

Intersection:3,4
Union:1,2,3,4,3,4,5,6
Subtract:1,2
Zip:(1,3),(2,4),(3,5),(4,6)

key-value类型

partitionBy()

函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

函数说明

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。

partitionBy() 算子并不是属于 RDD 的方法,而是属于 PairRDDFunctions 类的方法,要求 RDD 必须是 Key-Value 类型的 RDD。Scala 隐式转换,程序在编译错误时,会尝试在作用域范围内查找转换规则,将类型转换成特定类型后编译通过。隐式转换,相当于一种二次编译。

RDD 中有 rddToPairRDDFunctions() 隐式函数,尝试将 [K,V] 类型的 RDD 转换成 PairRDDFunctions 类型。

partitionBy() 要与 coalesce()、repartition() 算子区分开,前者是将其数据进行重分区;后者是调整分区的数量。

object RDD_Transform_partitionBy {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 partitionBy
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    val mapRDD:RDD[(Int, Int)] = rdd.map((_, 1))
    mapRDD.partitionBy(new HashPartitioner(partitions = 2)).saveAsTextFile("output")
    // Step3: 关闭环境
    sc.stop()
  }
}

最后的输出:

最终实现【1,3】【2,4】分组。

【说明1】HashPartitioner 继承自 Partitioner,有两个主要方法:

  • numPartitions:获取分区数量;
  • getPartition:获取分区号,返回一个整数值,其实就是 key 值对于分区号进行取模计算,获得分区值。
def getPartition(key: Any): Int = key match {
  case null => 0
  case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

nonNegativeMod() 方法源码:

def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

【说明2】如果 RDD 连续进行两次 partitionBy() 算子操作,partitionBy() 算子会对自身的 partitioner 对象进行比较。Scala 的 == 会进行类型和非空比较。

partithoner 自身又有 equals() 方法,比较两个分区器是否同一。比较逻辑,要看:类型、分区数量。如果同一,分区器将不做任何处理,保留self 的分区器。

override def equals(other: Any): Boolean = other match {
  case h: HashPartitioner =>
    h.numPartitions == numPartitions
  case _ =>
    false
}

【问题3】其他类型分区器 Partitioner 有 3 个子类。其中 PythonPartitioner (包含小锁头)说明其是在特定包下运行的分区器。RangePartitioner 多用于排序操作。

reduceByKey()

函数签名

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

函数说明

可以将数据按照相同的 Key 对 Value 进行聚合。

  1. Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合。
  2. reduceByKey() 中如果 Key 的数据只有一个,则该 Key 不参与运算。
object RDD_Transform_reduceByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 reduceByKey:相同的 Key 的数据进行 value 数据的聚合操作
    // Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合
    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
    val reduceRDD = rdd.reduceByKey(
      (x: Int, y: Int) => {
        println(s"x=${x}, y=${y}")
        (x + y)
      })
    reduceRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

x=1, y=2
x=3, y=3
(a,6)
(b,4)

groupByKey()

函数签名

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

函数说明

将数据源的数据根据 key 对 value 进行分组。

object RDD_Transform_groupByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 groupByKey
    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
    val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    groupRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))

groupByKey() 与 groupBy()的区别:

  • groupByKey():1)通过 Key 进行分组;2)返回 RDD[(String, Iterable[Int])],聚合后的结果。
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
  • groupBy():1)可以使用元组中任意元素进行分组;2)返回的是分组后的元组,元组仍然是原始的元组。
val groupRDD1: RDD[(String, Iterable[(String, Int)])]=rdd.groupBy(_._1)

groupByKey() 与 reduceByKey() 的区别:

从性能上看:reduceByKey 在分区内进行预聚合(分区内做聚合),在本地将数据量进行压缩,可以使 shuffle 落盘时数据量减少,同时在 reduce 时从文件读取的数据量的大小也进行压缩,从而提高 shuffle 的效果。如果进行聚合,reduceByKey() 性能较高

从功能上看:如果只分组,那只能使用 groupByKey()。

reduceByKey() 分区内和分区间计算规则相同。

aggregateByKey()

函数签名

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
     combOp: (U, U) => U): RDD[(K, U)]

aggregateByKey() 存在参数柯里化,接受两个参数列表。

  • 第一个参数列表, 需要传递一个参数:表示初始值。主要用于当我们遇到第一个 key 时,和 value 进行分区计算
  • 第二个参数列表:
    • 第一个参数表示分区内计算规则;
    • 第二个参数表示分区件计算规则。

函数说明

将数据根据不同的规则进行分区内计算和分区间计算。

object RDD_Transform_aggregateByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregateByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)

    // aggregateByKey() 存在参数柯里化
    // 第一个参数列表, 需要传递一个参数:表示初始值
    // 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
    // 第二个参数列表:
    //    第一个参数表示分区内计算规则
    //    第二个参数表示分区件计算规则
    rdd.aggregateByKey(zeroValue = 0)(
      (x, y) => math.max(x,y),
      (x, y) => x + y
    ).collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(b,8)
(a,8)

修改 zeroValue 值后的效果:

object RDD_Transform_aggregateByKey_zeroValue {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregateByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)

    // aggregateByKey() 存在参数柯里化
    // 第一个参数列表, 需要传递一个参数:表示初始值
    // 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
    // 第二个参数列表:
    //    第一个参数表示分区内计算规则
    //    第二个参数表示分区件计算规则
    rdd.aggregateByKey(zeroValue = 5)(
      (x, y) => math.max(x,y),
      (x, y) => x + y
    ).collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果,如预期:

(b,8)
(a,8)

aggregateByKey() 方法,如果分区内、分区间使用相同的聚合函数,则效果与 reduceByKey() 相同。

    rdd.aggregateByKey(zeroValue = 0)(
      (x, y) => x + y,
      (x, y) => x + y
    )
    // 简化写法:匿名函数的字典原则
    rdd.aggregateByKey(zeroValue = 0)( _+_,_+_)

应用场景,按 Key 值取平均数。

object RDD_Transform_aggregateByKey_average {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregateByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
    // 初始值 (0,0),tuple 的第一个元素为求和,第二个值为 key 出现的次数
    val averageRDD: RDD[(String,(Int, Int))] = rdd.aggregateByKey(zeroValue = (0, 0))(
      (t, v) => {
        // 分区内,tuple 第一个元素为值相加,第二个元素为次数相加
        (t._1 + v, t._2 + 1)
      },
      // 分区间,tuple 中的元素,第一个元素值相加,第二个元素数量相加
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    // RDD 中,如果 Key 保持不变,只对 Value 进行处理,可以使用 mapValues() 算子
    averageRDD.mapValues{
      case (num, cnt) => {
        num / cnt
      }
    }.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(b,4)
(a,3)

相关推荐

字节三面:MySQL数据同步ES的4种方法!你能想到几种?

如何进行数据同步MySQL是一种流行的关系型数据库,而Elasticsearch是一个强大的搜索引擎和分析平台。将MySQL数据同步到Elasticsearch中可以帮助我们更方便地搜索和分析数据。在...

Java 连接 MySQL 数据库(java连接mysql课设)

一、环境准备1.1依赖管理(Maven)在项目的pom.xml中添加MySQL驱动依赖:<dependency><groupId>mysql</gro...

Spring Boot 连接 MySQL 数据库(spring boot配置数据库连接)

一、环境准备1.1依赖管理(Maven)<!--方案1:JdbcTemplate--><dependency><groupId>org.sprin...

java连接mysql数据库达成数据查询详细教程

前言:本篇文章适用于所有前后端开发者众所周知,只要是编程,那肯定是需要存储数据的,无论是c语言还是java,都离不开数据的读写,数据之间传输不止,这也就形成了现代互联网的一种相互存在关系!而读写存储的...

既然有MySQL了,为什么还要有MongoDB?

大家好,我是哪吒,最近项目在使用MongoDB作为图片和文档的存储数据库,为啥不直接存MySQL里,还要搭个MongoDB集群,麻不麻烦?让我们一起,一探究竟,了解一下MongoDB的特点和基本用法,...

用 JSP 连接 MySQL 登入注册项目实践(JSP + HTML + CSS + MySQL)

目录一、写在前面二、效果图三、实现思路四、实现代码1、login总界面2、registercheck总代码3、logoutcheck总代码4、amendcheck总代码相关文章一、写在前面哈喽~大家好...

MySQL关联查询时,为什么建议小表驱动大表?这样做有什么好处

在SQL数据库中,小表驱动大表是一种常见的优化策略。这种策略在涉及多表关联查询的情况下尤其有效。这是因为数据库查询引擎会尽可能少的读取和处理数据,这样能极大地提高查询性能。"小表驱动大表&...

mysql8驱动兼容规则(mysql8.0驱动)

JDBC版本:Connector/J8.0支持JDBC4.2规范.如果Connector/J8.0依赖于更高版本的jdbclib,对于调用只有更高版本特定的方法会抛出SQLFea...

mysql数据表如何导入MSSQL中(mysql怎样导入数据)

本案例演示所用系统是windowsserver2012.其它版本windows操作系统类似。1,首先需要下载mysqlodbc安装包。http://dev.mysql.com/downloa...

MySQL 驱动中虚引用 GC 耗时优化与源码分析

本文要点:一种优雅解决MySQL驱动中虚引用导致GC耗时较长问题的解决方法虚引用的作用与使用场景MySQL驱动源码中的虚引用分析背景在之前文章中写过MySQLJDBC驱动中的虚引用导致...

ExcelVBA 连接 MySQL 数据库(vba 连接sqlserver)

上期分享了ExcelVBA连接sqlite3数据库,今天给大家分享ExcelVBA连接另一个非常流行的MySQL数据库。一、环境win10Microsoftoffice2010(...

QT 5.12.11 编译MySQL 8 驱动教程- 1.01版

安装编译环境:qt5.12.11mysql8.0.28修改mysql.pro工程文件,编译生成动态库mysql.pro文件位置:D:\Alantop_Dir\alantop_sde\Qt\Qt5....

「Qt入门第22篇」 数据库(二)编译MySQL数据库驱动

导语在上一节的末尾我们已经看到,现在可用的数据库驱动只有两类3种,那么怎样使用其他的数据库呢?在Qt中,我们需要自己编译其他数据库驱动的源码,然后当做插件来使用。下面就以现在比较流行的MySQL数据库...

(干货)一级注册计量师第五版——第四章第三节(三)

计量标准的建立、考核及使用(三)PS:内容都是经过个人学习而做的笔记。如有错误的地方,恳请帮忙指正!计量标准考核中有关技术问题1检定或校准结果的重复性重复性是指在一组重复性测量条件下的测量精密度。检定...

声学测量基础知识分享(声学测量pdf)

一、声学测量的分类和难点1.声学测量的分类声学测量按目的可分为:声学特性研究(声学特性研究、媒质特性研究、声波发射与接收的研究、测量方法与手段的研究、声学设备的研究),声学性能评价和改善(声学特性评价...