Feature Engineering & Feature StoresPoint-in-Time CorrectnessHard⏱️ ~3 min

Implementing Temporal As Of Joins for PIT Correctness

The As Of Join Algorithm

The as of join is the fundamental algorithm for Point in Time (PIT) correctness, solving the problem of matching each training label to the exact feature values available at label time. For a label row with entity ID and label time, the join selects the latest feature record where event timestamp is less than or equal to label time. This differs from standard joins which match on exact equality or use current values.

Naive vs Optimized Implementation

The naive approach of sorting all feature history and scanning for each label results in O(n * m) complexity for n labels and m feature records. Production systems optimize using range partitioning by entity ID, then merge joining sorted streams. This achieves O((n + m) * log(n + m)) complexity. For billion row datasets, this difference means hours versus days of compute time.

Spark SQL Implementation

Spark implements as of joins using window functions with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, partitioned by entity ID and ordered by timestamp. The query selects the most recent feature row for each label timestamp using LAST_VALUE or row numbering with rank = 1. Optimizations include broadcast joining small dimension tables and using bucketed storage for large feature tables.

Streaming Implementation

In Flink and Kafka Streams, temporal joins use versioned state tables keyed by entity ID. Each lookup retrieves the latest state as of the event timestamp. Watermarks define how long to wait for late events before finalizing joins. The challenge is balancing state retention (memory cost) against late event handling (correctness).

Validation

Always validate that no feature timestamp exceeds the label timestamp in joined output. Run anti join checks in CI pipelines to catch temporal violations before training begins.

💡 Key Takeaways
Partition by entity ID then sort by event time within partitions for last observation carried forward, avoiding full table scans at 100 million plus row scale
Pre bucket by day or hour to bound scan sizes, with caching of frequent entity ID and day lookups to reduce repeated temporal scans in production pipelines
Window semantics must be explicit with clear inclusive/exclusive boundaries like [t minus 7 days, t) to avoid off by one errors that inflate offline metrics but degrade online serving
Late arriving data requires watermarks (for example, p99.9 within 2 hours) and acceptance windows (24 to 72 hours), with events beyond window applied retroactively offline only
Offline PIT join throughput typically 50 to 200 million joined rows per hour per node for wide 50 to 200 feature datasets with proper partitioning
All timestamps stored in single standard like UTC with monotonicity validation per entity to prevent clock skew causing leakage or missing joins at boundaries
📌 Interview Tips
1Uber Palette point in time joiner service builds training sets by partitioning 1 billion row datasets by entity, sorting by event time, and caching frequent temporal lookups to achieve hour scale dataset generation
2Airbnb Zipline computes windowed aggregates like 7/30/90 day counts with explicit window end at event time, enforcing UTC timestamps and inclusive/exclusive semantics to prevent boundary bugs
3Real join query: SELECT label.*, features.clicks FROM labels LEFT JOIN LATERAL (SELECT clicks FROM feature_history WHERE entity_id = labels.entity_id AND event_time <= labels.label_time ORDER BY event_time DESC LIMIT 1) features
← Back to Point-in-Time Correctness Overview
Implementing Temporal As Of Joins for PIT Correctness | Point-in-Time Correctness - System Overflow