df1.join(df2, $"df1Key" === $"df2Key")
df1.join(df2).where($"df1Key" === $"df2Key")
Join Operators
SQL | JoinType | Alias / joinType |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Special case for |
|
|
Special case for |
Tip
|
Aliases are case-insensitive and can use the underscore (_ ) at any position, i.e. left_anti and LEFT_ANTI are acceptable.
|
You can use the join expression as part of join operator or leave it out and describe using where operator.
join
Methods
join(right: Dataset[_]): DataFrame (1)
join(right: Dataset[_], usingColumn: String): DataFrame (2)
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame (3)
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame (4)
join(right: Dataset[_], joinExprs: Column): DataFrame (5)
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame (6)
-
Inner join
-
Inner join
-
Inner join
-
Equi-join with explicit join type
-
Inner join
-
Join with explicit join type
join
joins two Dataset
s.
val left = Seq((0, "zero"), (1, "one")).toDF("id", "left")
val right = Seq((0, "zero"), (2, "two"), (3, "three")).toDF("id", "right")
// Inner join
scala> left.join(right, "id").show
+---+----+-----+
| id|left|right|
+---+----+-----+
| 0|zero| zero|
+---+----+-----+
scala> left.join(right, "id").explain
== Physical Plan ==
*Project [id#50, left#51, right#61]
+- *BroadcastHashJoin [id#50], [id#60], Inner, BuildRight
:- LocalTableScan [id#50, left#51]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#60, right#61]
// Full outer
scala> left.join(right, Seq("id"), "fullouter").show
+---+----+-----+
| id|left|right|
+---+----+-----+
| 1| one| null|
| 3|null|three|
| 2|null| two|
| 0|zero| zero|
+---+----+-----+
scala> left.join(right, Seq("id"), "fullouter").explain
== Physical Plan ==
*Project [coalesce(id#50, id#60) AS id#85, left#51, right#61]
+- SortMergeJoin [id#50], [id#60], FullOuter
:- *Sort [id#50 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#50, 200)
: +- LocalTableScan [id#50, left#51]
+- *Sort [id#60 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#60, 200)
+- LocalTableScan [id#60, right#61]
// Left anti
scala> left.join(right, Seq("id"), "leftanti").show
+---+----+
| id|left|
+---+----+
| 1| one|
+---+----+
scala> left.join(right, Seq("id"), "leftanti").explain
== Physical Plan ==
*BroadcastHashJoin [id#50], [id#60], LeftAnti, BuildRight
:- LocalTableScan [id#50, left#51]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#60]
Internally, join
creates a DataFrame
with Join
logical operator (in the current SparkSession).
joinWith
Method
joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] (1)
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
-
Inner join
Caution
|
FIXME |
Broadcast Join (aka Map-Side Join)
Caution
|
FIXME: Review BroadcastNestedLoop .
|
Note
|
According to the article Map-Side Join in Spark, broadcast join is also called a replicated join (in the distributed system community) or a map-side join (in the Hadoop community). |
Note
|
At long last! I have always been wondering what a map-side join is and it appears I am close to uncover the truth! |
And later in the article Map-Side Join in Spark, you can find that with the broadcast join, you can very effectively join a large table (fact) with relatively small tables (dimensions), i.e. to perform a star-schema join you can avoid sending all data of the large table over the network.
CanBroadcast
object matches a LogicalPlan with output small enough for broadcast join.
Note
|
Currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE [tableName] COMPUTE STATISTICS noscan has been run.
|
It uses spark.sql.autoBroadcastJoinThreshold setting to control the size of a table that will be broadcast to all worker nodes when performing a join.