Implementing Temporal As Of Joins for PIT Correctness
The As Of Join Algorithm
The as of join is the fundamental algorithm for Point in Time (PIT) correctness, solving the problem of matching each training label to the exact feature values available at label time. For a label row with entity ID and label time, the join selects the latest feature record where event timestamp is less than or equal to label time. This differs from standard joins which match on exact equality or use current values.
Naive vs Optimized Implementation
The naive approach of sorting all feature history and scanning for each label results in O(n * m) complexity for n labels and m feature records. Production systems optimize using range partitioning by entity ID, then merge joining sorted streams. This achieves O((n + m) * log(n + m)) complexity. For billion row datasets, this difference means hours versus days of compute time.
Spark SQL Implementation
Spark implements as of joins using window functions with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, partitioned by entity ID and ordered by timestamp. The query selects the most recent feature row for each label timestamp using LAST_VALUE or row numbering with rank = 1. Optimizations include broadcast joining small dimension tables and using bucketed storage for large feature tables.
Streaming Implementation
In Flink and Kafka Streams, temporal joins use versioned state tables keyed by entity ID. Each lookup retrieves the latest state as of the event timestamp. Watermarks define how long to wait for late events before finalizing joins. The challenge is balancing state retention (memory cost) against late event handling (correctness).
Validation
Always validate that no feature timestamp exceeds the label timestamp in joined output. Run anti join checks in CI pipelines to catch temporal violations before training begins.