Data Lakes & Lakehouses • Apache Hudi for Incremental ProcessingMedium⏱️ ~3 min
The Timeline and Incremental Queries
Understanding the Timeline: Every operation in Hudi (insert, upsert, delete, compaction) gets assigned an instant time that acts like a commit identifier. These instants form an immutable timeline stored as metadata in the table directory.
The timeline provides two critical capabilities: snapshot isolation for readers and the foundation for incremental processing.
Snapshot Isolation: Writers follow a two phase protocol. First, they write data files and create a pending instant on the timeline. Then they atomically mark that instant as committed. Readers only see completed commits, so they always get a consistent snapshot even while concurrent writes are happening.
This is similar to Multi Version Concurrency Control (MVCC) in databases, but implemented on top of object storage using instant markers in the timeline.
Incremental Queries Explained: This is where Hudi becomes transformative for downstream processing. Instead of reading the entire table, you query only the changes between two commit instants.
A downstream consumer stores its last processed commit instant (for example, timestamp 20240115100000). On the next run, it issues an incremental query: "Give me all rows that changed between 20240115100000 and now." Hudi scans the timeline, identifies which files were modified in commits after that instant, and returns only those changed rows.
Production Impact: For the Uber Eats example, weekly aggregation jobs that previously scanned 100 percent of the multi terabyte menu table now use incremental queries to process only the 5 percent that changed. Job time dropped from 4 to 5 hours down to 45 minutes.
This pattern cascades through data pipelines. If Table A feeds Table B feeds Table C, each stage can use incremental processing. A change to 1000 rows in Table A propagates through the entire pipeline touching only those 1000 rows plus derived results, not billions of unchanged rows.
1
Consumer reads checkpoint: Last processed instant is 20240115100000
2
Query timeline: Fetch changes from 20240115100000 to latest commit 20240115103000
3
Process deltas: Read only modified files, about 2 GB instead of 500 GB full table
4
Update checkpoint: Atomically save 20240115103000 as new last processed instant
⚠️ Common Pitfall: Incremental queries depend on checkpoint management. If a consumer loses its checkpoint or rewinds incorrectly, it can miss commits (data loss) or reprocess duplicates. Store checkpoints in durable storage like a metadata database and make downstream processing idempotent.
Query Modes Summary: Hudi exposes three ways to read data. Snapshot queries give the latest complete view, perfect for dashboards. Read optimized queries read only compacted base files, tolerating slight staleness for maximum speed. Incremental queries return only changes, the key to efficient pipelines.💡 Key Takeaways
✓The timeline tracks all commits as immutable instants, providing snapshot isolation where readers only see completed commits
✓Incremental queries return only rows changed between two commit instants, dramatically reducing data scanned by downstream pipelines
✓Uber Eats reduced weekly aggregation from 4 to 5 hours to 45 minutes by processing only the 5 percent of rows that changed, not rescanning 100 percent
✓Checkpoint management is critical. Consumers store last processed instant and must handle checkpoint loss to avoid missing data or duplicate processing
✓Three query modes serve different needs: snapshot for latest complete view, read optimized for fast scans tolerating staleness, incremental for efficient delta processing
📌 Examples
1A 500 GB table with 2 GB of daily changes: incremental query reads 2 GB vs 500 GB full scan, reducing processing from 30 minutes to under 1 minute
2Consumer stores checkpoint 20240115100000, queries changes to 20240115103000, processes deltas, atomically updates checkpoint to 20240115103000
3If consumer loses checkpoint and rewinds to old instant, it might reprocess 1 week of commits (7 GB) instead of 1 day (2 GB), causing duplicate downstream results