1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| import pyspark.sql.functions as fn
from confluent_kafka.schema_registry import SchemaRegistryClient
schema_registry_conf = { url: 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
topic = "topic"
schema_key = schema_registry.client.get_latest_version(f"{topic}-key")
schema_value = schema_registry.client.get_latest_version(f"{topic}-value")
df = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", confluentBootstrapServers)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", confluentTopicName)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.withColumn('key', fn.col("key").cast(StringType()))
.withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
.withColumn('fixedValue', from_avro('fixedValue', schema_value.schema.schema_str))
.select(fn.col("fixedValue.*"))
)
|