PySpark and Kafka with Schema registry

1 minute read

PySpark and Kafka with Schema registry

1
2
3
4
5
6
7
8
9
10
11
schema_registry_url = "http://localhost:8081"
bootstrap_servers = "localhost:9092"
topic_name = "topic-name"

from confluenct_kafaka.schema_registry import SchemaRegistryClient

schema_registry_conf = { "url": schema_registry_rul}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

schema_key = schema_registry_client.get_latest_version(f"{topic_name}-key")
schema_value = schema_registry_client.get_latest_version(f"{topic_name}-value")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType

binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
starting_offset - "earliest"

kafka_raw = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", bootstrap_servers) \
  .option("subscribe", topic_name) \
  .option("startingOffsets", starting_offset) \
  .option("failOnDataLoss", "false") \
  .load()
1
2
3
4
5
6
7
8
9
10
from pyspark.sql.avro.functions import from_avro, to_avro

from_avro_options = { "mode" : "PERMISSIVE"}

kafka_raw_df = kafka_raw \
  .withColumn('key', fn.expr("substring(key, 6, length(value) - 5))") \
  .withColumn('keySchemaId', binary_to_string(fn.expr("substring(key, 2, 4)"))) \
  .withColumn('topicBinaryValue', fn.expr("substring(value, 6, length(value) - 5")) \
  .withColumn('topicValue', from_avro('topicBinaryValue', schema_value.schema.schema_str, from_avro_options))

Explode example if the value has multiple array in the JSON string

1
2
3
kafka_df = kafka_raw_df \
  .select('timestamp', fn.explode(kafka_raw_df.topicValue.items).alias(item))

Generating a new column from the existing column in JSON

1
2
3
4
5
6
json_columns = ["prop1", "prop2"]

kafka_df = kafka_df \
  .withColumn('key', fn.concat_ws("_", kafka_df.prop1, kafka_df.prop2)) \
  .withColumn('value', fn.to_json(fn.struct([kafka_df[x] for x in json_columns])))

Write back to another topic in Kafka

1
2
3
4
5
6
7
8
kafka_df \
  .writeStream \
  .option("kafka.bootstrap.servers", bootstrap_servers) \
  .option("checkpointLocation", "tmp/test") \
  .option("topic", "another-topic") \
  .outputMode("update") \
  .start()

Categories:

Updated: