Skip to content

Commit

Permalink
Link spark_kafka_processor
Browse files Browse the repository at this point in the history
  • Loading branch information
expediamatt committed Feb 11, 2024
1 parent 6673e7a commit e3c7f42
Showing 1 changed file with 10 additions and 18 deletions.
28 changes: 10 additions & 18 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import List, Optional

import pandas as pd
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, from_json
Expand Down Expand Up @@ -43,7 +45,7 @@ def __init__(
sfv.stream_source.kafka_options.message_format, AvroFormat
) and not isinstance(
sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat
)and not isinstance(
) and not isinstance(
sfv.stream_source.kafka_options.message_format, JsonFormat
):
raise ValueError(
Expand All @@ -55,7 +57,7 @@ def __init__(
self.format = "json"
elif isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat):
self.format = "confluent_avro"

self.init_confluent_avro_processor()

if not isinstance(config, SparkProcessorConfig):
raise ValueError("config is not spark processor config")
Expand All @@ -66,6 +68,12 @@ def __init__(
self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities]
super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source)


def init_confluent_avro_processor(self) -> None:
"""Extra initialization for Confluent Avro processor, which uses
SchemaRegistry and the Avro Deserializer, both of which need initialization."""
pass

def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
ingested_stream_df = self._ingest_stream_data()
transformed_df = self._construct_transformation_plan(ingested_stream_df)
Expand All @@ -75,7 +83,6 @@ def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json, avro and confluent_avro formats currently."""
# Test that we reach this path, and stop.
raise ValueError("No, no, NO NO NO, stop everything. Shut it down.")
if self.format == "json":
if not isinstance(
self.data_source.kafka_options.message_format, JsonFormat
Expand Down Expand Up @@ -104,21 +111,6 @@ def _ingest_stream_data(self) -> StreamTable:
self.data_source.kafka_options.message_format, ConfluentAvroFormat
):
raise ValueError("kafka source message format is not confluent_avro format")
#TODO: process ConfluentAvro format.
# stream_df = (
# self.spark.readStream.format("kafka")
# .options(**self.kafka_options_config)
# .load()
# .withColumn("byteValue", func.expr("substring(value, 6, length(value)-5)"))
# .select(
# from_avro(
# func.col("byteValue"),
# self.data_source.kafka_options.message_format.schema_json,
# self.avro_options,
# ).alias("table")
# )
# .select("table.*")
# )
raise ValueError("HOLY MOLY I AM NOT READY TO DEAL WITH CONFLUENT AVRO, GUYS")
stream_df = None
else:
Expand Down

0 comments on commit e3c7f42

Please sign in to comment.