Retep's

Streaming System and Flink


Part 1. What are Streaming Systems?

Streaming Essentials

Assume we have a streaming system, and the ingress data is as follows:

id value timestamp
1 5 12:00:02
2 9 12:01:30
3 7 12:02:20
4 8 12:03:00
5 3 12:03:50
6 4 12:04:20

We want to achieve the following task: Calculate the sum of event values within a 2-second window.

Sum Window Range
14 [12:00, 12:02)
18 [12:02, 12:04)
4 [12:04, 12:06)

Event Time vs Process Time

  • Event time: The time the event occurs, indicated by the timestamp in the table above.
  • Processing time: The time when the event is actually observed by the system.

Ideally, event time equals processing time, in which case the process order is the same as the event order. However, due to network delays and inefficiencies, this is not strictly achievable.

Triggers

In stream processing, data continuously enters windows. The role of triggers is to determine when to send the state (local state) of the window downstream while maintaining the state of the stream. Without triggers, the result would accumulate indefinitely, equivalent to a batch processing system.

Triggers can send results at any time or multiple times.

The main problem caused by out-of-order events is that we cannot know the state of each window at any given time (is it complete or not?), so how should we decide when to send the window results?

  • Strategy 1: Per-record triggering
    The simplest way is to update the window state and trigger sending to downstream every time an event is received. The downside is that communication overhead is high. For scenarios with large data volumes, a single window may contain hundreds or thousands of data points, resulting in low efficiency.
  • Strategy 2: Aligned delay triggering
    This strategy improves on Strategy 1 by triggering based on time instead of individual events. The advantage is that even if multiple data points arrive in one second, the trigger only fires once, reducing message volume.
  • Strategy 3: Unaligned delay triggering
    This further improves Strategy 2 by triggering based on the arrival time of the most recent event. This approach evenly distributes triggering, contributing to system stability.

Watermarks

Using triggers to periodically update window states and send results downstream can encounter two problems:

  1. Although periodic triggering helps approach the final state of a window, there is no explicit signal indicating the window is complete and the results are correct.
  2. Without such a signal, the window cannot be closed, and the system cannot reclaim local state, leading to increasing storage pressure.

The purpose of Watermarks/completeness trigger is to provide a signal that a window is complete. Watermarks can be seen as a function: F(p) → E

Where p is processing time, and E is the event time. A watermark indicates that all events with event times less than E have been processed.

Having watermarks allows us to determine when a window can be closed. In many cases, we cannot obtain perfect watermarks, and instead, infer them based on other indicators like network speed or file counts. This can lead to two situations:

  • Watermark too slow: The watermark is produced too late, causing downstream processing to idle and significantly increasing system latency.
  • Watermark too fast: The watermark is produced too early, causing windows to close prematurely and leading to inaccurate results. For example, an event with timestamp 12:01:30 may arrive late, but the window closed prematurely based on an earlier watermark.

To mitigate these issues, we can combine repeated update triggers with watermarks. Specifically, we trigger at three key points: before the window closes, when the window closes, and after the window closes to handle late events.

  • Early trigger: Before the window closes or before receiving the watermark, every record received triggers the result to be sent downstream after a short delay (e.g., 1 minute). The result may be inaccurate at this stage, but this approach prevents the entire pipeline from being blocked by a late watermark.
  • On-time trigger: When the watermark is received, the window is marked as closed, and the result is sent downstream.
  • Late trigger: To handle late records that arrive after the window is closed, we continue to process and send updates for some time (e.g., 10 minutes) before garbage collection occurs.

Exactly-once Semantics

Exactly-once semantics means that in stream processing, each record is processed exactly once. Achieving this in distributed systems is a significant challenge.

Typically, communication between workers uses RPC (Remote Procedure Call), which faces issues like network errors and delays. To ensure delivery, we can use Upstream Backup, which keeps retrying until the sender receives an acknowledgment (ACK). This guarantees at-least-once semantics.

However, for non-idempotent operations, repeated retries could lead to race conditions or multiple executions. One solution is to assign a unique ID to each message. The receiver maintains a KV Store to track all received messages. Before executing, the receiver checks the KV Store, and if a duplicate is found, the message is ignored.

For large data volumes, maintaining a KV Store becomes resource-intensive. To optimize performance, systems can fuse adjacent nodes to reduce the number of messages and use Bloom Filters to minimize the need for expensive lookups in the KV Store.

  • Bloom Filter is a data structure that quickly checks if an element is in a set. It may return false positives but never false negatives.

In some non-deterministic scenarios, message IDs are insufficient for ensuring exactly-once semantics. For example, when a record has a 50% chance of sending one message and a 50% chance of sending two, message IDs alone cannot guarantee exactly-once semantics. In these cases, advanced checkpointing mechanisms are needed.

Stream and Tables

Stream-Table Duality

Flink supports both stream and batch processing using the same API, known as stream-table duality. This concept refers to the relationship between streams and tables:

  • A table is the result of applying all logs (or updates) in a database.
  • A stream is the log or the change in the table over time.

From a mathematical perspective, a stream is the derivative of a table, and a table is the integral of a stream.

Stream to Table

Streams can be transformed into tables using grouping operators, such as join and accumulation. For instance, a map-reduce example:

def map(input_seq: Stream[(char, long)], output_seq: Stream[(Key, Value)]):
    pass

def reduce(key: Key, shuffled_seq: Stream[Value], output_seq: Stream[(char, long)]):
    pass

Here, the map function’s input is a stream, and the output is also a stream. The reduce function’s input and output are both streams. The intermediate table can be viewed as a large window grouped by keys: when all the map tasks have finished, the window can close, and then reduce can read (i.e., shuffle) the complete data and process it.

This example also demonstrates that batch processing and stream processing, as well as the representation of data (tables vs streams), are not inherently different. A map-reduce process is a typical batch processing example but still uses tables and streams.

Stream to Table

To understand how a stream can be converted into a table, we categorize operators into two types: grouping and non-grouping.

  • Grouping operators: Stream in, table out. For example, join and accumulation. The map-write operation is a grouping operator, which maintains a large window that keeps receiving the output of the map function until all threads complete. In other words, it groups the stream’s data via windows. The example in the Streaming Essentials section is also a grouping operator.

  • Non-grouping operators: Stream in, stream out. For example, filters, transformers, or data transformations. These operators can emit each record in the stream immediately after processing.

A grouping operator can convert a stream into a table through windows.

Table to Stream

If windows convert streams into tables, what converts tables into streams? The answer is triggers! Triggers can send the data in a window downstream, forming a stream. The simplest trigger, such as map-read and reduce-read, uses per-record triggers, where each record in the table is sent as a stream record.

Architecture

The components of Flink are as follows:

  • Users program with the DataStream API. The program is compiled and optimized into a JobGraph through a graph optimizer. The JobGraph is submitted to the JobManager through the Client.
  • The JobManager converts the JobGraph into an ExecutionGraph, a series of tasks that can be run in parallel. The JobManager then requests task slots from TaskManagers to execute these tasks.
  • The TaskManagers execute the tasks assigned by the JobManager.

Data Transfer

Each TaskManager has a network buffer to buffer network requests. Here’s how data transfer works in Flink:

  • If the sender and receiver are on the same TaskManager, no RPC is required. The data can be written directly into the buffer for the corresponding Task Slot.
  • The sender must receive permission from the receiver before sending data. If the receiver’s buffer is full, the sender cannot send data, similar to how a token rate limiter works.

Back Pressure

In message queues, if the consumer’s speed is slower than the producer’s speed, messages start to pile up. The same happens in Flink at the operator level: if the output rate of one operator is slower than the input rate, the input data starts piling up in the operator’s buffer.

To ensure system stability and accurate data processing, Flink implements backpressure. Backpressure is managed through the network buffer: if the receiver’s buffer is full and cannot accept more messages, the backpressure propagates upstream, ultimately reducing the source’s data production rate.

Watermarks

In Flink, watermarks are special records containing timestamps that flow through the data stream along with normal records.

  • Upstream tasks send watermarks to the current task.
  • The current task, upon receiving watermarks, updates and selects the minimum watermark received so far and sends it to downstream tasks.

State Management

Flink is a Stateful Stream Processing system.

  • Stateless systems only process element-wise operators, like filtering.
  • Stateful systems store local state for operations like grouping or aggregation.

Flink’s state is divided into two types:

  • Operator State: The state is tied to an operator. For example, in the watermark example, each operator maintains its state.
  • Key State: A global state partitioned based on a key in the record, similar to a global KV Store.

Checkpoint

Checkpointing in Flink ensures both high performance and strong consistency.

A Checkpoint is essentially a snapshot of Flink’s state at a specific point in time. If an error occurs (such as an RPC error), Flink can roll back to the most recent checkpoint to ensure exactly-once semantics.

Checkpointing is implemented as follows:

  • Like watermarks, a new type of record called checkpoint barrier is inserted into the data stream. Each checkpoint barrier has a unique checkpoint ID.

  • When an operator encounters a checkpoint barrier, it backs up its local state to a state backend under the corresponding checkpoint ID.

  • Once all operators have backed up their state, the checkpoint barrier flows into the sink, marking the completion of the checkpoint.

Savepoint

A Savepoint is similar to a checkpoint, but it also stores metadata for restarting the Flink job (rather than rolling back). Given an application and a compatible savepoint, the application can be fully restarted from the savepoint, allowing for features like pausing the job.

Part 3. Streaming Systems Landscape

Since Google released MapReduce in 2003, streaming systems have undergone many iterations and evolutions:

  • MapReduce’s biggest contribution was simplifying big data processing with two standard APIs, reducing the complexity of programming and scaling.
  • Flume is Google’s second-generation system built on MapReduce. As Google scaled, some issues with MapReduce became evident, such as the lack of compatibility between independently developed tasks and the strictness of the API leading to unnecessary phases.
  • Storm was the first widely adopted streaming engine in the industry. Its focus on low-latency processing led to the lambda architecture, where an inaccurate real-time stream system is paired with a batch process to ensure accuracy.
  • Spark initially gained fame as a high-speed batch processor. Spark Streaming uses micro-batch processing to simulate streaming. The advantage is that Spark’s batch engine doesn’t require an additional batch process to ensure accuracy.
  • MillWheel, developed by Google, improved on Storm’s shortcomings with better accuracy guarantees, including watermarks and exactly-once semantics.
  • Kafka is a transport layer rather than a stream processing engine but has significantly influenced streaming systems:
    • Kafka introduced stream data replayability, enhancing robustness and consistency.
    • Kafka also promoted the stream-table duality, laying the foundation for developments like Streaming SQL.
  • Flink is currently one of the strongest open-source streaming frameworks. It supports event-time processing natively and uses the Chandy-Lamport snapshot algorithm to ensure consistency, offering better performance.
  • Beam is an API and SDK that can run on any compatible engine, similar to SQL in the streaming world.

Reference

Streaming System

Stream Processing With Apache Flink