PySpark - upsert two dataframe
The following PySpark code will upsert target dataframe with source dataframe even though source column is null when key column matched.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
def upsert(target: DataFrame, source: DataFrame, columns: List[str]) -> DataFrame:
# assuming the first column is a key column for outer join
key_column = columns[0]
# if key_column matched, it will take a value from source,
# if not matched, it will take a value from source if not null,
# otherwise it will take a value from target
replace_f = (F.when(F.col("target." + key_column) == F.col("source." + key_column), F.col("source." + col))
.otherwise(F.coalesce("source." + col, "target." + col)).alias(col) for col in columns)
result_df = target.alias("target") \
.join(source.alias("source"), on = [key_column], how = "outer") \
.select(*(replace_f)) \
.distinct()
return result_df