Skip to content

Commit 6d6ff0d

Browse files
authored
Fail retention app when the columnPattern mismatch partition spec for… (#552)
1 parent 047b5e9 commit 6d6ff0d

2 files changed

Lines changed: 60 additions & 0 deletions

File tree

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.iceberg.catalog.Catalog;
5151
import org.apache.iceberg.catalog.TableIdentifier;
5252
import org.apache.iceberg.expressions.Expression;
53+
import org.apache.iceberg.expressions.Expressions;
5354
import org.apache.iceberg.io.CloseableIterable;
5455
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
5556
import org.apache.iceberg.spark.actions.SparkActions;
@@ -339,6 +340,17 @@ private Map<String, List<String>> prepareBackupDataManifests(
339340
TableScan scan = table.newScan().filter(filter);
340341
try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) {
341342
List<FileScanTask> filesList = Lists.newArrayList(filesIterable);
343+
filesList.stream()
344+
.filter(task -> !Expressions.alwaysTrue().isEquivalentTo(task.residual()))
345+
.findFirst()
346+
.ifPresent(
347+
task -> {
348+
throw new IllegalStateException(
349+
String.format(
350+
"Retention with backup enabled requires a metadata-only delete for table %s, "
351+
+ "but file %s has residual filter %s, which would require a row-level rewrite.",
352+
fqtn, task.file().path(), task.residual()));
353+
});
342354
return filesList.stream()
343355
.collect(
344356
Collectors.groupingBy(

apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,54 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
309309
}
310310
}
311311

312+
@Test
313+
public void testRetentionWithBackupFailsWhenColumnPatternMismatchesPartition() throws Exception {
314+
final String tableName = "db.test_retention_backup_pattern_mismatch";
315+
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
316+
// The table is partitioned on `datepartition`, but retention will filter
317+
// on `time_col` using a pattern unrelated to the partitioning. For each
318+
// file's per-file min/max to actually straddle the cutoff (and produce
319+
// a non-trivial residual), both `time_col` values within a partition
320+
// must live in the same data file — so we force a single writer task
321+
// via the COALESCE(1) hint.
322+
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
323+
ops.spark()
324+
.sql(
325+
String.format(
326+
"CREATE TABLE %s (data string, datepartition string, time_col string) "
327+
+ "PARTITIONED BY (datepartition)",
328+
tableName))
329+
.show();
330+
ops.spark()
331+
.sql(
332+
"SELECT data, datepartition, time_col FROM VALUES "
333+
+ "('a', '2024-01', '2020-01-01-00'), "
334+
+ "('b', '2024-01', '2030-01-01-00'), "
335+
+ "('c', '2024-02', '2020-01-01-00'), "
336+
+ "('d', '2024-02', '2030-01-01-00') "
337+
+ "AS t(data, datepartition, time_col)")
338+
.coalesce(1)
339+
.writeTo(tableName)
340+
.append();
341+
342+
// Fix `now` so the cutoff (now - 1 day, formatted yyyy-MM-dd-HH) falls
343+
// strictly between each file's min ("2020-01-01-00") and max
344+
// ("2030-01-01-00") — forcing a non-trivial residual on every file.
345+
ZonedDateTime now = ZonedDateTime.of(2025, 6, 15, 10, 0, 0, 0, ZoneOffset.UTC);
346+
IllegalStateException ex =
347+
Assertions.assertThrows(
348+
IllegalStateException.class,
349+
() ->
350+
ops.runRetention(
351+
tableName, "time_col", "yyyy-MM-dd-HH", "day", 1, true, ".backup", now));
352+
Assertions.assertTrue(
353+
ex.getMessage().contains("metadata-only delete"),
354+
"Expected metadata-only delete error, got: " + ex.getMessage());
355+
// DELETE should not have executed: all 4 rows remain.
356+
verifyRowCount(ops, tableName, 4);
357+
}
358+
}
359+
312360
@Test
313361
public void testOrphanFilesDeletionJavaAPI() throws Exception {
314362
final String tableName = "db.test_ofd_java";

0 commit comments

Comments
 (0)