PySpark - upsert two dataframe

less than 1 minute read

The following PySpark code will upsert target dataframe with source dataframe even though source column is null when key column matched.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql import functions as F


def upsert(target: DataFrame, source: DataFrame, columns: List[str]) -> DataFrame:

  # assuming the first column is a key column for outer join
  key_column = columns[0]

  # if key_column matched, it will take a value from source,
  # if not matched, it will take a value from source if not null,
  # otherwise it will take a value from target
  replace_f = (F.when(F.col("target." + key_column) == F.col("source." + key_column), F.col("source." + col))
                .otherwise(F.coalesce("source." + col, "target." + col)).alias(col) for col in columns)
  
  result_df = target.alias("target") \
              .join(source.alias("source"), on = [key_column], how = "outer") \
              .select(*(replace_f)) \
              .distinct()
  
  return result_df

Categories:

Updated: