Join Operators

Table 1. Joins
SQL JoinType Alias / joinType

CROSS

Cross

cross

INNER

Inner

inner

FULL OUTER

FullOuter

outer, full, fullouter

LEFT ANTI

LeftAnti

leftanti

LEFT OUTER

LeftOuter

leftouter, left

LEFT SEMI

LeftSemi

leftsemi

RIGHT OUTER

RightOuter

rightouter, right

NATURAL

NaturalJoin

Special case for Inner, LeftOuter, RightOuter, FullOuter

USING

UsingJoin

Special case for Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti

Tip
Aliases are case-insensitive and can use the underscore (_) at any position, i.e. left_anti and LEFT_ANTI are acceptable.

You can use the join expression as part of join operator or leave it out and describe using where operator.

df1.join(df2, $"df1Key" === $"df2Key")
df1.join(df2).where($"df1Key" === $"df2Key")

join Methods

join(right: Dataset[_]): DataFrame (1)
join(right: Dataset[_], usingColumn: String): DataFrame (2)
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame (3)
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame (4)
join(right: Dataset[_], joinExprs: Column): DataFrame (5)
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame (6)
  1. Inner join

  2. Inner join

  3. Inner join

  4. Equi-join with explicit join type

  5. Inner join

  6. Join with explicit join type

join joins two Datasets.

val left = Seq((0, "zero"), (1, "one")).toDF("id", "left")
val right = Seq((0, "zero"), (2, "two"), (3, "three")).toDF("id", "right")

// Inner join
scala> left.join(right, "id").show
+---+----+-----+
| id|left|right|
+---+----+-----+
|  0|zero| zero|
+---+----+-----+

scala> left.join(right, "id").explain
== Physical Plan ==
*Project [id#50, left#51, right#61]
+- *BroadcastHashJoin [id#50], [id#60], Inner, BuildRight
   :- LocalTableScan [id#50, left#51]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#60, right#61]

// Full outer
scala> left.join(right, Seq("id"), "fullouter").show
+---+----+-----+
| id|left|right|
+---+----+-----+
|  1| one| null|
|  3|null|three|
|  2|null|  two|
|  0|zero| zero|
+---+----+-----+

scala> left.join(right, Seq("id"), "fullouter").explain
== Physical Plan ==
*Project [coalesce(id#50, id#60) AS id#85, left#51, right#61]
+- SortMergeJoin [id#50], [id#60], FullOuter
   :- *Sort [id#50 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#50, 200)
   :     +- LocalTableScan [id#50, left#51]
   +- *Sort [id#60 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#60, 200)
         +- LocalTableScan [id#60, right#61]

// Left anti
scala> left.join(right, Seq("id"), "leftanti").show
+---+----+
| id|left|
+---+----+
|  1| one|
+---+----+

scala> left.join(right, Seq("id"), "leftanti").explain
== Physical Plan ==
*BroadcastHashJoin [id#50], [id#60], LeftAnti, BuildRight
:- LocalTableScan [id#50, left#51]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [id#60]

Internally, join creates a DataFrame with Join logical operator (in the current SparkSession).

joinWith Method

joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]  (1)
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
  1. Inner join

Caution
FIXME

crossJoin Method

crossJoin(right: Dataset[_]): DataFrame
Caution
FIXME

Broadcast Join (aka Map-Side Join)

Caution
FIXME: Review BroadcastNestedLoop.

You can use broadcast function to mark a Dataset to be broadcast when used in a join operator.

Note
According to the article Map-Side Join in Spark, broadcast join is also called a replicated join (in the distributed system community) or a map-side join (in the Hadoop community).
Note
At long last! I have always been wondering what a map-side join is and it appears I am close to uncover the truth!

And later in the article Map-Side Join in Spark, you can find that with the broadcast join, you can very effectively join a large table (fact) with relatively small tables (dimensions), i.e. to perform a star-schema join you can avoid sending all data of the large table over the network.

CanBroadcast object matches a LogicalPlan with output small enough for broadcast join.

Note
Currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE [tableName] COMPUTE STATISTICS noscan has been run.

It uses spark.sql.autoBroadcastJoinThreshold setting to control the size of a table that will be broadcast to all worker nodes when performing a join.

results matching ""

    No results matching ""