Spark Join 原理分析

当前 Spark SQL 支持三种 Join 算法:shuffle hash join、broadcast hash join、sort merge join,本文做一个总结。

hash join

先来看这样一条 sql 语句,select * from order,item where item.id = order.i_id,很简单一个 join 节点,参与 join 的两张表是 item 和 order,join key 分别是 item.id 和 order.i_id。如果使用 hash join 整个过程会经历三步:

  1. 确定 build table 和 probe table,小表会作为 build table,大表作为 probe table。一般业务场景下,使用 item 表为 build table,order 表为 probe table。build table 使用 join key 构建 hash table,而 probe table 使用 join key 进行探测,探测成功就可以 join 在一起。
  2. 依次读取 build table(item) 的数据,对于每一行数据根据 join key(item.id) 进行 hash,hash 到对应的 bucket,生成 hash table 中的一条记录(发生哈希冲突用拉链法解决)。数据缓存在内存中,如果内存放不下需要 dump 到外存。
  3. 依次扫描 probe table(order) 的数据,使用相同的 hash 函数映射 hash table 中的记录,映射成功之后再检查 join 条件 (item.id = order.i_id),如果匹配成功就可以将两者 join 在一起。

upload successful

hash join分布式改造一般有两种经典方案:

  1. broadcast hash join:将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行 hash join。broadcast 适用于小表很小,可以直接广播的场景。

  2. shuffler hash join:一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据 join key 相同必然分区相同的原理,将两张表分别按照 join key 进行重新组织分区,这样就可以将 join 分而治之,划分为很多小 join,充分利用集群资源并行化。

Broadcast Hash Join

如下图所示,broadcast hash join可以分为两步:

  1. broadcast 阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给 driver,driver 再统一分发给所有 executor,要不就是基于 bittorrete 的 p2p 思路。

  2. hash join 阶段:在每个 executor 上执行单机版 hash join,小表映射,大表试探;

Spark SQL 规定 broadcast hash join 执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。

upload successful

Shuffle Hash Join

在大数据条件下如果一张表很小,执行 join 操作最优的选择无疑是 broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join 就不再是最优方案。此时可以按照 join key 进行分区,根据 key 相同必然分区相同的原理,就可以将大表 join 分而治之,划分为很多小表的 join,充分利用集群资源并行化。如下图所示,shuffle hash join 也可以分为两步:

  1. shuffle 阶段:分别将两个表按照 join key 进行分区,将相同 join key 的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。

  2. hash join 阶段:每个分区节点上的数据单独执行单机 hash join 算法。

upload successful

看到这里,可以初步总结出来如果两张小表 join 可以直接使用单机版 hash join,如果一张大表 join 一张极小表,可以选择 broadcast hash join 算法,而如果是一张大表 join 一张小表,则可以选择 shuffle hash join 算法,那如果是两张大表进行 join 呢?

Sort-Merge Join

Spark SQL 对两张大表 join 采用了全新的算法-sort-merge join,如下图所示,整个过程分为三个步骤:

  1. shuffle 阶段:将两张大表根据 join key 进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理。

  2. sort 阶段:对单个分区节点的两表数据,分别进行排序。

  3. merge阶段:对排好序的两张分区表数据执行 join 操作。join 操作很简单,分别遍历两个有序序列,碰到相同 join key 就 merge 输出,否则取更小一边。

upload successful

目前 spark 的 sort-based shuffle 算法,在经过 shuffle 之后 partition 数据都是按照 key 排序的。因此理论上可以认为数据经过 shuffle 之后是不需要 sort 的,可以直接 merge。