Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Streaming Job with multiple queries MERGE INTO the same target table (Runtime file filtering is not possible) #11094

Open
cccs-jc opened this issue Sep 7, 2024 · 5 comments
Labels
question Further information is requested

Comments

@cccs-jc
Copy link
Contributor

cccs-jc commented Sep 7, 2024

Query engine

  • Spark 3.5
  • Iceberg 1.5.2
  • Azure storage account ABFSS

Question

I’m trying to use the MERGE INTO statement within a Spark streaming application that runs several streaming queries. These queries all write to the same target table.

I understand that the MERGE INTO statement performs a copy-on-write operation on files affected by updated or deleted rows. To prevent multiple queries from modifying the same files, I have partitioned the target table by query_id.

Additionally, I’ve configured the target table with snapshot isolation, which is less strict than the serializable isolation level.

However, despite these precautions, the commit operation occasionally fails with the following error:

java.lang.IllegalStateException: Runtime file filtering is not possible: the table has been concurrently modified. Row-level operation scan snapshot ID: 6599699721474649632, current table snapshot ID: 8543071133912689275. If an external process modifies the table, enable table caching in the catalog. If multiple threads modify the table, use independent Spark sessions in each thread.

@RussellSpitzer explained on a Slack channel

Row level operations do 2 passes over the data, once with the join columns and once with the whole row. If the data changes between those passes you get the above error. The message suggests using a different thread/session because changes within the same session would change the Spark cache and cause the error. You would also see this if the Spark Catalog cache is disabled and another process modifies the table between the passes.

My question is: How can I avoid this error when committing? Specifically, what does it mean to "use independent Spark sessions in each thread"?

I have attached a simple pyspark Spark application illustrating the problem and I'm curious how this example could be modified to prevent the error. I would I introduce "independent Spark sessions" in this example.

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import time

table_name = <put target table name here>

# Initialize Spark Session
spark = ( SparkSession.builder
    .appName("streaming merg into")
    .config("spark.sql.shuffle.partitions", "2")
    .config("spark.log.level", "ERROR")
    .getOrCreate()
    )

spark.sparkContext.setLogLevel("ERROR")

spark.sql(f"drop table if exists {table_name}")

spark.sql(f"""
    create table if not exists {table_name} (
    timestamp timestamp,
    value long,
    query_id integer
    )
    using iceberg
    partitioned by (query_id)
    tblproperties (
        "commit.retry.num-retries"="25",
        "write.delete.isolation-level"= "snapshot",
        "write.update.mode"= "copy-on-write",
        "write.update.isolation-level"= "snapshot",
        "write.merge.mode"= "copy-on-write",
        "write.merge.isolation-level"= "snapshot"
    )
""")

spark.table(table_name).printSchema()


# Function to run a streaming query with foreachBatch
def start_streaming_query(query_id):

    # Define a write function using foreachBatch
    def write_to_table(df, batch_id):
        view_name = f"updates_{query_id}"
        full_view_name = "global_temp." + view_name
        spark.sql(f"uncache table if exists {full_view_name}")
        df.createOrReplaceGlobalTempView(view_name)
        spark.sql(f"cache table {full_view_name}")
        df.show(truncate=False, n=10000)

        # Execute the MERGE INTO statement
        merge_query = f"""
        MERGE INTO {table_name} AS t
        USING {full_view_name} AS s
        ON (t.query_id = s.query_id and t.value = s.value)
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """

        num_retry = 0
        while True:
            try:
                spark.sql(merge_query)
                break
            except Exception as e:
                num_retry += 1
                error_message = str(e)
                print(f"Query {query_id} is retrying {num_retry} error was {error_message}")
                if num_retry > 20:
                    raise e



    # Simulate a streaming DataFrame
    (
        spark
        .readStream
        .format("rate")
        .option("rowsPerSecond", 1)
        .load()
        .withColumn("query_id", F.lit(query_id))
        .writeStream
        .foreachBatch(write_to_table)
        .outputMode("append")
        .trigger(processingTime="10 seconds")
        .start()
    )

num_queries = 10  # Number of concurrent streaming queries
for i in range(num_queries):
    start_streaming_query(i)

spark.streams.awaitAnyTermination()

spark.stop()

Is my issue related to this issue #11066 (comment) ?

@cccs-jc cccs-jc added the question Further information is requested label Sep 7, 2024
@cccs-jc cccs-jc changed the title Spark Streaming Job with multiple queries MERGE INTO the same target table Spark Streaming Job with multiple queries MERGE INTO the same target table (Runtime file filtering is not possible) Sep 7, 2024
@eric-maynard
Copy link

To use multiple sessions you would essentially move this part of your code into the start_streaming_query function:

# Initialize Spark Session
spark = ( SparkSession.builder
    .appName("streaming merg into")
    .config("spark.sql.shuffle.partitions", "2")
    .config("spark.log.level", "ERROR")
    .getOrCreate()
    )

You might also want to make some changes such as appending the thread ID to the app name.

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Sep 9, 2024

@eric-maynard I have other examples where I do what you suggest. If I move this inside start_streaming_query the builder will simply return the existing spark session getOrCreate it will not create a new spark application.

When I do so I see the same behavior.

@RussellSpitzer
Copy link
Member

@eric-maynard I have other examples where I do what you suggest. If I move this inside start_streaming_query the builder will simply return the existing spark session getOrCreate it will not create a new spark application.

When I do so I see the same behavior.

You need to create a new session object, not get the active one like so:

scala> spark.newSession()
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5d643896

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Sep 9, 2024

Ha a new session.. is this possible in pyspark did not know you could create multiple sessions. Are multiple sessions within the same spark application (i.e.: using the same resources CPU/memory) used by an application.

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Sep 9, 2024

Well what do you know in pyspark you can create a new session spark.newSession() which will isolate views, UDF and have a separate SQLConf. For example

s1 = spark.newSession()
s2 = spark.newSession()

print(s1)
df1 = s1.sql("select 1")
df1.createOrReplaceTempView("view1")
s1.sql("show tables").show()

print(s2)
df2 = s2.sql("select 2")
df2.createOrReplaceTempView("view2")
s2.sql("show tables").show()

outputs

<pyspark.sql.session.SparkSession object at 0x7f50688726e0>
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |    view1|       true|
+---------+---------+-----------+

<pyspark.sql.session.SparkSession object at 0x7f5068870040>
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |    view2|       true|
+---------+---------+-----------+

When we modify the streaming MERGE INTO example above with separate sessions for each streaming query no more errors :-) Thank you @RussellSpitzer and @eric-maynard

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import time

table_name = <put target table name here>

# Initialize Spark Session
g_spark = ( SparkSession.builder
    .appName("streaming merg into")
    .config("spark.sql.shuffle.partitions", "2")
    .config("spark.log.level", "ERROR")
    .getOrCreate()
    )

g_spark.sparkContext.setLogLevel("ERROR")

g_spark.sql(f"drop table if exists {table_name}")

g_spark.sql(f"""
    create table if not exists {table_name} (
    timestamp timestamp,
    value long,
    query_id integer
    )
    using iceberg
    partitioned by (query_id)
    tblproperties (
        "commit.retry.num-retries"="25",
        "write.delete.isolation-level"= "snapshot",
        "write.update.mode"= "copy-on-write",
        "write.update.isolation-level"= "snapshot",
        "write.merge.mode"= "copy-on-write",
        "write.merge.isolation-level"= "snapshot"
    )
""")

g_spark.table(table_name).printSchema()


# Function to run a streaming query with foreachBatch
def start_streaming_query(q_spark, query_id):

    # Define a write function using foreachBatch
    def write_to_table(df, batch_id):
        view_name = f"updates_{query_id}"
        full_view_name = "global_temp." + view_name
        q_spark.sql(f"uncache table if exists {full_view_name}")
        df.createOrReplaceGlobalTempView(view_name)
        q_spark.sql(f"cache table {full_view_name}")
        df.show(truncate=False, n=10000)

        # Execute the MERGE INTO statement
        merge_query = f"""
        MERGE INTO {table_name} AS t
        USING {full_view_name} AS s
        ON (t.query_id = s.query_id and t.value = s.value)
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """

        num_retry = 0
        while True:
            try:
                q_spark.sql(merge_query)
                break
            except Exception as e:
                num_retry += 1
                error_message = str(e)
                print(f"Query {query_id} is retrying {num_retry} error was {error_message}")
                if num_retry > 20:
                    raise e



    # Simulate a streaming DataFrame
    (
        q_spark
        .readStream
        .format("rate")
        .option("rowsPerSecond", 1)
        .load()
        .withColumn("query_id", F.lit(query_id))
        .writeStream
        .foreachBatch(write_to_table)
        .outputMode("append")
        .trigger(processingTime="10 seconds")
        .start()
    )

num_queries = 10  # Number of concurrent streaming queries
for i in range(num_queries):
    start_streaming_query(g_spark.newSession(), i)

g_spark.streams.awaitAnyTermination()

g_spark.stop()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants