Tutorial
14 min read

Flink SQL - Changelog and Races

Managing data efficiently and accurately is a significant challenge in the ever-evolving landscape of stream processing. Apache Flink, a powerful framework for real-time data processing, provides robust solutions, but has its complexities. One critical aspect is handling race conditions, especially when working with Flink SQL for stream processing.

This blog post will explore the intricacies of race conditions and changelogs in Flink SQL, examining the potential pitfalls and solutions to ensure data consistency and reliability. We will dive into the mechanics of changelogs, the implications of race conditions, and practical strategies to mitigate these issues, enabling you to harness the full potential of Flink SQL in your streaming applications.

Understanding Race Conditions

Race conditions can occur during parallel processing. Data can be processed at different speeds and along different paths. Without synchronization, which reduces throughput and performance, there is no guarantee that data will maintain the same order as it had at the beginning of the pipeline. This is a significant issue when the input order is expected at the end, and it can lead to problems such as missed updates and data corruption.

More formally, a race condition, also known as a race hazard, occurs when a system or application is influenced by uncontrollable factors such as timing, event sequence, external or shared state or resources. This dependency can lead to incorrect or unpredictable results.

Flink, a framework for streaming data processing, minimizes these hazards. FIFO buffers, used for data exchange between subtasks, guarantee the order of events processed along the same path. However, race conditions can still occur for events processed in parallel by different subtasks.

Let’s imagine a simple join written in Flink SQL and executed in parallel, as shown in the picture below. 

join-flink-sql-getindata

The source table produced two following insert row events: I (id=1, …) and I (id=2, …). These events were shuffled by the id field and transferred to different subtasks. What will be the order of the events in the Sink? It’s nondeterministic. Flink only preserves the order of events processed by a particular subtask. There is no synchronization or shared state between subtasks, which can be problematic for correlated events, such as updates of the same row.

Changelogs in Flink SQL

Flink SQL has adopted the concept of a changelog, which introduces the following row types: 

  • Insert (+I)
  • UpdateBefore (-U)
  • UpdateAfter (+U)
  • Delete (-D)

This concept is well-known from relational databases (e.g. Change Data Capture) and helps to track subsequent changes in databases.

Flink SQL introduces the concept of a changelog, which tracks changes in data using row kinds like Insert (+I), UpdateBefore (-U), UpdateAfter (+U), and Delete (-D). This system helps manage data consistency, but can still face challenges when events are processed asynchronously.

While +I, +U, and -D are intuitive, the meaning of -U in distributed processing seems to be somewhat underestimated. UpdateBefore acts as a “technical” event, informing the subtask that a value has been modified and is now expired. This allows for state cleanup, preventing the release of invalid joined rows, etc. Most sinks skip -U events, making -U a kind of gray eminence. This isn’t visible outside of the job but plays a crucial role in data processing.

Let’s consider a regular one-to-one join between the STORE and ADDRESS tables, described by a simple query in FlinkSQL:

INSERT INTO STORE_WITH_ADDRESS
SELECT
	s.id,
	s.name,
	s.a_id,
	a.city,
	a.street
FROM
	STORE s
LEFT JOIN
	ADDRESS a
ON
	s.a_id = a.a_id

First, the job received some rows from the ADDRESS table, followed by the insert and update (with changed address id) from the STORE table.

store-table-getindata-flink-sql

source-store-getindata

The order of events in the sink is nondeterministic. Possible scenarios include:

  1. I, -U, +U (expected)
  2. I, +U, -U
  3. +U, I, -U

The sink terminates data processing in the Flink job. It can write to an external store or simply print the results. It processes events in the order they appear. For instance, the upsert-Kafka connector only stores values (without row kind), skipping UpdateBefore events. A Delete event is represented as a key with a null message body. An incorrect order of events, as in the third scenario, impacts output correctness. Note that the shop from the example is now located on Szewska Street, not Grodzka Street!

Sink Upsert Materializer

Having a changelog stream makes it possible to deduce the proper order of events and correct them if necessary. An Insert event should be at the beginning of the stream or after a Delete event. If not, it’s either delayed or has overtaken the Delete event. In the third scenario, it’s delayed, which is confirmed by its retract (-U).

Based on that deduction, an algorithm was built and implemented as a sink upsert materializer. You can read more details about this here. The Flink planner automatically adds it just before the sink when needed. You can expect this before the Kafka connector sink, but it will be omitted if you simply print the results. While it corrects the data order, it has some disadvantages and limitations:

  • It’s stateful and impacts performance and checkpointing.
  • Using it together with the TTL configuration can affect data integrity.
  • It requires a complete changelog, and the correctness of results may be compromised by missing or incomplete retracts.
  • Dynamic and nondeterministic columns like CURRENT_TIMESTAMP can lead to state explosion.

You can disable this by setting: table.exec.sink.upsert-materialize to "none".

Note that the sink materializer will only solve race conditions when working correctly. It will only work when the changelog is valid and there is no issue with dynamic columns. Otherwise, it will only degrade the job performance.

Moreover, there are scenarios when the sink materializer is the only way to handle hazards:

  • full outer join (many-to-many relationships),
  • lack of the row’s version.

For other situations, it should be possible to create custom versioning with ordering, but due to the additional complexity, I recommend using the sink materializer when possible.

Incomplete retracts

The Sink Materializer isn't a good companion for temporal and lookup joins. There are cases where it won’t work. Let’s analyze the changelog generated from a temporal join:

temporal-join-getindata

What happened? Why are the right-side values null? The temporal join operator keeps the latest version of the row. It doesn’t know what the previous value is, so the UpdateBefore event is incomplete. This may affect the Sink Materializer, as it may not be able to match the retract with the previous version.

This situation can worsen when joining these rows with a regular join, using a column from the temporal table. The SQL may look like the code snippet below.

SELECT
	*
FROM
	table_a a
LEFT JOIN
	temporal_table for SYSTEM_TIME AS OF a.PROC_TIME AS b
ON
	a.fk = b.id
LEFT JOIN
	table_c c
ON
	b.col_b = c.id

The regular join at the end (with table_c) will maintain the state for the left side grouped by b.col_b. The retract (-U) with b.col_b = NULL won’t remove the event (id=7882, b.id=82, b.col_b=‘x’) because the key doesn’t match. Moreover, +U will add a new row (id=7882, b.id=82, b.col_b=’y’). The regular join state has two values:

  1. (id=7882, b.id=82, b.col_b=‘x’)
  2. (id=7882, b.id=82, b.col_b=‘y’)

The first should be retracted, but it won’t happen. Events from table_c with both id=’x’ and id=’y’ will be matched and passed downstream, while only the match with ‘y’ should take place.

The lookup join also doesn’t generate the correct changelog. It's a stateless operator and cannot keep track of previous values. The UpdateBefore event will have the same values as UpdateAfter. Similarly, it may lead to data corruption if you add a regular join operator based on the values from a lookup table.

lookup-join-getindata

Note that the value from -U should be X, not Y.

This issue can be worked around by regenerating the changelog. Flink SQL doesn’t allow adding ChangelogNormalize directly. To achieve this, storing intermediate results and reading them with, for example, the upsert-Kafka connector may be required.

Time-to-live state configuration

Flink allows configuring TTL state expiration:

  • globally,
  • per operator (released in Flink 1.17),
  • per join using hint (Flink 1.19).

While this can help to limit state size and boost job performance, it may negatively affect changelog correctness. The lack of previous values in a regular join’s state will lead to generating incomplete retracts. This will affect the sink materializer and may corrupt output data. You should use the TTL configuration wisely, analyzing possible side effects.

Kafka, debezium and metadata

The Kafka connector supports the Debezium format for processing Change Data Capture events. Each message contains fields before and after, which are split by Flink into separate rows with proper types. All required information is provided within the message, enabling Flink to create a changelog using simple transformations. This operation is fast and does not require ChangelogNormalize or any other stateful operator. The rules of the transformation are listed in the table below.

debezium-table-getindata

Be cautious when using CDC with metadata columns, as it's easy to compromise changelog correctness. Let’s assume a new row has been inserted and updated. Debezium generated two messages, which were transformed by Flink:

  1. Row(kind=Insert, id=1, attr=’a’)
  2. Row(kind=UpdateBefore, id=1, attr=’a’)
  3. Row(kind=UpdateAfter, id=1, attr=’b’)

What happens after adding metadata columns like partition and offset into Flink's temporal table? The events will be enriched by proper values and will look like this:

  1. Row(kind=Insert, id=1, attr=’a’, partition=0, offset=0)
  2. Row(kind=UpdateBefore, id=1, attr=’a’, partition=0, offset=1)
  3. Row(kind=UpdateAfter, id=1, attr=’b’, partition=0, offset=1)

As noted, the second event (-U) has an offset taken from the current message. This behavior is correct because UpdateBefore and UpdateAfter are taken from the same message with partition=0 and offset=1. However, this is incorrect if we consider the changelog, as UpdateBefore should contain the previous values.

Using metadata columns with CDC events is sufficient to break SinkMaterializer. The operator collects rows in state and cleans them up when not needed. Row detection is done by comparing upsert keys or whole rows. In the second scenario, a match isn’t possible because SinkMaterializer expects a row with the following values:

Row(kind=UpdateBefore, id=1, attr=’a’, partition=0, offset=0).

The result operator won’t fix race hazards and a bottleneck may be caused when handling the state, consuming more and more time. Another symptom of this issue is the accumulation of logs like:

org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.

This log can be misleading because it suggests that the problem is due to state expiration. This only means that the expected row is not present in the state. This issue can be caused not only by TTL configuration but also by race conditions (e.g., a DELETE event being processed first in SinkMaterializer) or an incorrect changelog (e.g., fields mismatch between UpdateBefore and UpdateAfter events).

The conclusion is simple: don’t combine the Kafka Debezium table with metadata columns when using SinkMaterializer!

DIY - Rank versioning

The versioning of temporal or lookup joins can be easily implemented. Note that matches are only emitted on events from the left side of the join, making it suitable for one-to-one relations. Therefore, adding a version to the driving table and deduplicating with its primary key, ordered by the version column is sufficient. This can be achieved using the TOP-N (Rank) function, where N=1 (ROW_NUMBER() OVER(...) pattern).

If you use a Kafka topic as a source and messages are partitioned on Kafka by the primary key, then the offset column can be used for versioning. Using a timestamp metadata column (a timestamp of writing the event to the Kafka topic) may be insufficient because of the millisecond resolution.

The rank function can also be used for regular joins except for many-to-many relationships (FULL OUTER JOIN). Please note that the proper order of version columns has to be preserved, and it is not always obvious how to define versions for incoming events.

The Rank function is faster than SinkMaterializer. It can be used in conjunction with a TTL configuration. Starting from Flink 1.17, TTL parameters can be set per operator, which helps limit state size and impacts performance. Indeed, hazards can only occur for the latest data.

Know your enemy

Flink SQL is indeed a great framework, but it does have its limitations. In my opinion, the problem lies not with the framework itself, but with its documentation. The insights I have shared with you are based on my experiences with Flink SQL. Issues such as race conditions, performance degradation in sink materializer due to state explosion, and broken or incomplete changelogs have been encountered. Often, these problems were only revealed relatively late, under specific conditions, or after processing a significant amount of data. I hope you don't encounter such issues and will fully appreciate the benefits of Flink SQL!

Remember:

  • The changelog in Flink SQL is vulnerable to race conditions.
  • Avoid using temporal or lookup joins with SinkMaterializer unless you're certain that the operator compares rows by upsert keys. Instead, use the rank function (TOP-1) to handle race conditions.
  • Avoid using non-deterministic columns with SinkMaterializer.
  • Don’t join using a regular join and values from the temporal table/view or lookup table, as these are not correctly retracted.
  • Metadata columns break the changelog created from the Kafka connector with the Debezium format. Don’t push them to SinkMaterializer.

Remarks

All observations were made on Flink 1.16.1.

Conclusion

Flink SQL is a powerful tool for stream processing, but understanding and managing race conditions is crucial for maintaining data integrity. By leveraging changelogs, implementing best practices, and being aware of potential pitfalls, you can effectively navigate the complexities of real-time data processing with Flink SQL. Schedule a consultation with our experts to stay informed and adopt these strategies to maximize the benefits of your streaming applications.

flink
stream processing
flink sql
changelogs
1 July 2024

Want more? Check our articles

getindata modern data platform features tools
Tech News

GetInData Modern Data Platform - features & tools

About the GetInData Modern Data Platform In our previous article you learned what our take on the Modern Data Platform is and that we took some steps…

Read more
16kxTuxGkZjskytKJLQKsJg
Tech News

Two BI companies in play. Tableau acquired by Salesforce and Looker by Google.

The two recently announced acquisitions by Google and Salesforce in the thriving business analytics market appear to be strategic moves to remain…

Read more
getindata blog big data machine learning models tools comparation no text
Tutorial

Machine Learning model serving tools comparison - KServe, Seldon Core, BentoML

Intro Machine Learning is now used by thousands of businesses. Its ubiquity has helped to drive innovations that are increasingly difficult to predict…

Read more
getindata cover nifi ingestion kafka poc notext
Tutorial

NiFi Ingestion Blog Series. PART V - It’s fast and easy, what could possibly go wrong - one year history of certain nifi flow

Apache NiFi, a big data processing engine with graphical WebUI, was created to give non-programmers the ability to swiftly and codelessly create data…

Read more
getindata big data tech main 1
Big Data Event

A Review of the Presentations at the Big Data Technology Warsaw Summit 2022!

The 8th edition of the Big Data Tech Summit is already over, and we would like to thank all of the attendees for joining us this year. It was a real…

Read more
read mlops snowflake getindata
Tutorial

From 0 to MLOps with ❄️ Part 2: Architecting the cloud-agnostic MLOps Platform for Snowflake Data Cloud

From 0 to MLOps with Snowflake ❄️ In the first part of the blogpost, we presented our kedro-snowflake plugin that enables you to run your Kedro…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy