diff --git a/services/optimizer/build.gradle b/services/optimizer/build.gradle
index 2de8fd5c7..c05c7f9c3 100644
--- a/services/optimizer/build.gradle
+++ b/services/optimizer/build.gradle
@@ -4,7 +4,11 @@ plugins {
}
dependencies {
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
+ implementation 'com.vladmihalcea:hibernate-types-55:2.21.1'
implementation 'org.springframework.boot:spring-boot-starter-web:2.7.8'
+ implementation 'mysql:mysql-connector-java:8.+'
+ testImplementation 'com.h2database:h2:2.2.224'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/CommitDeltaMetrics.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/CommitDeltaMetrics.java
new file mode 100644
index 000000000..5a30c9afd
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/CommitDeltaMetrics.java
@@ -0,0 +1,28 @@
+package com.linkedin.openhouse.optimizer.db;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Per-commit incremental counters. Serialized as JSON into the {@code delta} column. */
+@Data
+@Builder(toBuilder = true)
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CommitDeltaMetrics {
+
+ /** Number of data files this commit added to the table. */
+ private Long numFilesAdded;
+
+ /** Number of data files this commit removed from the table. */
+ private Long numFilesDeleted;
+
+ /** Total bytes added by this commit. */
+ private Long addedSizeBytes;
+
+ /** Total bytes removed by this commit. */
+ private Long deletedSizeBytes;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/HistoryStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/HistoryStatus.java
new file mode 100644
index 000000000..3680735f4
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/HistoryStatus.java
@@ -0,0 +1,15 @@
+package com.linkedin.openhouse.optimizer.db;
+
+/**
+ * DB-layer enum for the {@code status} column of {@code table_operations_history}.
+ *
+ *
Self-contained: no references to api/ or model/ types.
+ */
+public enum HistoryStatus {
+
+ /** The Spark job for this operation completed successfully. */
+ SUCCESS,
+
+ /** The Spark job for this operation failed. */
+ FAILED
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/OperationStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/OperationStatus.java
new file mode 100644
index 000000000..0a2e07483
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/OperationStatus.java
@@ -0,0 +1,21 @@
+package com.linkedin.openhouse.optimizer.db;
+
+/**
+ * DB-layer enum for the {@code status} column of {@code table_operations}.
+ *
+ *
Self-contained: no references to api/ or model/ types.
+ */
+public enum OperationStatus {
+
+ /** Analyzer has written the row; not yet claimed by the scheduler. */
+ PENDING,
+
+ /** Scheduler has claimed the row and is launching a job; jobId not yet recorded. */
+ SCHEDULING,
+
+ /** Job has been submitted to the Jobs Service; the row carries a {@code jobId}. */
+ SCHEDULED,
+
+ /** Scheduler marked this row as a duplicate of another PENDING row; not claimable. */
+ CANCELED
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/OperationType.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/OperationType.java
new file mode 100644
index 000000000..e4caf549b
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/OperationType.java
@@ -0,0 +1,14 @@
+package com.linkedin.openhouse.optimizer.db;
+
+/**
+ * DB-layer enum for the operation types persisted in {@code table_operations.operation_type} and
+ * {@code table_operations_history.operation_type}.
+ *
+ *
Self-contained: no references to api/ or model/ types. JPA binds this via
+ * {@code @Enumerated(EnumType.STRING)}.
+ */
+public enum OperationType {
+
+ /** Removes orphaned data files no longer referenced by table metadata. */
+ ORPHAN_FILES_DELETION
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/SnapshotMetrics.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/SnapshotMetrics.java
new file mode 100644
index 000000000..452b35097
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/SnapshotMetrics.java
@@ -0,0 +1,28 @@
+package com.linkedin.openhouse.optimizer.db;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Point-in-time snapshot fields. Serialized as JSON into the {@code snapshot} column. */
+@Data
+@Builder(toBuilder = true)
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SnapshotMetrics {
+
+ /** Iceberg metadata version pointer for this snapshot. */
+ private String tableVersion;
+
+ /** Filesystem path (or URI) of the table's storage root. */
+ private String tableLocation;
+
+ /** Total on-disk size of the table at this snapshot, in bytes. */
+ private Long tableSizeBytes;
+
+ /** Total number of data files as of the latest snapshot — used for bin-packing. */
+ private Long numCurrentFiles;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java
new file mode 100644
index 000000000..5f4a598d9
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsHistoryRow.java
@@ -0,0 +1,75 @@
+package com.linkedin.openhouse.optimizer.db;
+
+import java.time.Instant;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+/**
+ * Append-only record of a completed maintenance operation.
+ *
+ *
Written when the operation-complete endpoint is called. The {@code id} is the same UUID as the
+ * originating live-operations row, tying each history entry back to the operation cycle that
+ * produced it. Multiple runs of the same operation on the same table produce multiple rows.
+ *
+ *
Self-contained DB-layer type: enums are {@link OperationType} / {@link HistoryStatus} from the
+ * same package, JPA-bound as strings.
+ */
+@Entity
+@Table(
+ name = "table_operations_history",
+ indexes = {
+ @Index(name = "idx_table_uuid_hist", columnList = "table_uuid"),
+ @Index(name = "idx_op_type_hist", columnList = "operation_type"),
+ @Index(name = "idx_completed_at", columnList = "completed_at"),
+ @Index(name = "idx_status_hist", columnList = "status"),
+ @Index(name = "idx_toph_db_table", columnList = "database_name, table_name")
+ })
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableOperationsHistoryRow {
+
+ /** Same UUID as the originating live-operations row. Set by the caller; not generated. */
+ @Id
+ @Column(name = "id", nullable = false, length = 36)
+ private String id;
+
+ /** Stable table identity from the Tables Service. */
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ /** Denormalized database name. */
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ /** Denormalized table name. */
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ /** The type of maintenance operation this history row records. */
+ @Enumerated(EnumType.STRING)
+ @Column(name = "operation_type", nullable = false, length = 50)
+ private OperationType operationType;
+
+ /** When the operation completed, as recorded by the complete endpoint. */
+ @Column(name = "completed_at", nullable = false)
+ private Instant completedAt;
+
+ /** Terminal outcome: {@link HistoryStatus#SUCCESS} or {@link HistoryStatus#FAILED}. */
+ @Enumerated(EnumType.STRING)
+ @Column(name = "status", nullable = false, length = 20)
+ private HistoryStatus status;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsRow.java
new file mode 100644
index 000000000..dfe40d402
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableOperationsRow.java
@@ -0,0 +1,85 @@
+package com.linkedin.openhouse.optimizer.db;
+
+import java.time.Instant;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+/**
+ * JPA entity representing an Analyzer recommendation for a table maintenance operation.
+ *
+ *
Each row is identified by a client-generated UUID ({@code id}). The Analyzer creates a new row
+ * when it first recommends an operation for a table, or when re-recommending after a prior terminal
+ * state. {@code table_uuid} is the stable identity for the table (survives renames; rotates on
+ * drop+recreate). The application enforces one active (PENDING / SCHEDULING / SCHEDULED) row per
+ * {@code (table_uuid, operation_type)} at a time.
+ *
+ *
Self-contained DB-layer type: enums are {@link OperationType} / {@link OperationStatus} from
+ * the same package, JPA-bound as strings.
+ */
+@Entity
+@Table(
+ name = "table_operations",
+ indexes = {
+ @Index(name = "idx_table_uuid", columnList = "table_uuid"),
+ @Index(name = "idx_op_type", columnList = "operation_type"),
+ @Index(name = "idx_status", columnList = "status"),
+ @Index(name = "idx_created_at", columnList = "created_at"),
+ @Index(name = "idx_scheduled_at", columnList = "scheduled_at")
+ })
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableOperationsRow {
+
+ /** Client-generated UUID identifying this specific operation recommendation. */
+ @Id
+ @Column(name = "id", nullable = false, length = 36)
+ private String id;
+
+ /** Stable table identity from the Tables Service. Survives renames; rotates on drop+recreate. */
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ /** Denormalized database name. */
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ /** Denormalized table name. */
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ /** The type of maintenance operation this row recommends. */
+ @Enumerated(EnumType.STRING)
+ @Column(name = "operation_type", nullable = false, length = 50)
+ private OperationType operationType;
+
+ /** Lifecycle state — drives the scheduler's CAS claim and the analyzer's eligibility check. */
+ @Enumerated(EnumType.STRING)
+ @Column(name = "status", nullable = false, length = 20)
+ private OperationStatus status;
+
+ /** When the analyzer first created this row. Set on insert; never updated. */
+ @Column(name = "created_at", nullable = false)
+ private Instant createdAt;
+
+ /** When the scheduler last submitted a job for this row. {@code null} while {@code PENDING}. */
+ @Column(name = "scheduled_at")
+ private Instant scheduledAt;
+
+ /** Spark job ID written by the scheduler at claim time. Internal-only; never exposed on wire. */
+ @Column(name = "job_id", length = 255)
+ private String jobId;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableStatsHistoryRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableStatsHistoryRow.java
new file mode 100644
index 000000000..4eaee2a6f
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableStatsHistoryRow.java
@@ -0,0 +1,74 @@
+package com.linkedin.openhouse.optimizer.db;
+
+import com.vladmihalcea.hibernate.type.json.JsonStringType;
+import java.time.Instant;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.hibernate.annotations.Type;
+import org.hibernate.annotations.TypeDef;
+
+/**
+ * Append-only record of per-commit stats reported by the Tables Service.
+ *
+ *
Each Iceberg commit produces one row. Consumers can query this table to reconstruct change
+ * rates over arbitrary time windows.
+ *
+ *
Self-contained DB-layer type. The stats payload is split across two JSON columns — {@link
+ * SnapshotMetrics} (point-in-time fields at commit time) and {@link CommitDeltaMetrics} (per-commit
+ * counters).
+ */
+@TypeDef(name = "json", typeClass = JsonStringType.class)
+@Entity
+@Table(
+ name = "table_stats_history",
+ indexes = {
+ @Index(name = "idx_tsh_table_uuid", columnList = "table_uuid"),
+ @Index(name = "idx_tsh_recorded_at", columnList = "recorded_at")
+ })
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableStatsHistoryRow {
+
+ /** UUID primary key — set by the caller, not generated server-side. */
+ @Id
+ @Column(name = "id", nullable = false, length = 36)
+ private String id;
+
+ /** Stable Iceberg table UUID. */
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ /** Denormalized database name. */
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ /** Denormalized table name. */
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ /** Snapshot fields at commit time. Stored as a JSON blob in the {@code snapshot} column. */
+ @Type(type = "json")
+ @Column(name = "snapshot", columnDefinition = "TEXT")
+ private SnapshotMetrics snapshot;
+
+ /** Per-commit delta counters. Stored as a JSON blob in the {@code delta} column. */
+ @Type(type = "json")
+ @Column(name = "delta", columnDefinition = "TEXT")
+ private CommitDeltaMetrics delta;
+
+ /** When this history row was recorded (commit time). */
+ @Column(name = "recorded_at", nullable = false)
+ private Instant recordedAt;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableStatsRow.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableStatsRow.java
new file mode 100644
index 000000000..165247b6a
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/db/TableStatsRow.java
@@ -0,0 +1,64 @@
+package com.linkedin.openhouse.optimizer.db;
+
+import com.vladmihalcea.hibernate.type.json.JsonStringType;
+import java.time.Instant;
+import java.util.Map;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.hibernate.annotations.Type;
+import org.hibernate.annotations.TypeDef;
+
+/**
+ * JPA entity representing a per-table stats snapshot in the optimizer DB.
+ *
+ *
Written by the Tables Service on every Iceberg commit. Read by the Analyzer directly via JPA
+ * to enumerate tables and check scheduling eligibility.
+ *
+ *
Self-contained DB-layer type. Holds only the point-in-time {@link SnapshotMetrics} —
+ * per-commit deltas live exclusively on {@link TableStatsHistoryRow} and are not aggregated here.
+ */
+@TypeDef(name = "json", typeClass = JsonStringType.class)
+@Entity
+@Table(name = "table_stats")
+@Getter
+@EqualsAndHashCode
+@Builder(toBuilder = true)
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PROTECTED)
+public class TableStatsRow {
+
+ /** Stable Iceberg table UUID. Primary key. */
+ @Id
+ @Column(name = "table_uuid", nullable = false, length = 36)
+ private String tableUuid;
+
+ /** Denormalized database name. */
+ @Column(name = "database_name", nullable = false, length = 128)
+ private String databaseName;
+
+ /** Denormalized table name. */
+ @Column(name = "table_name", nullable = false, length = 128)
+ private String tableName;
+
+ /** Latest snapshot fields. Stored as a JSON blob in the {@code snapshot} column. */
+ @Type(type = "json")
+ @Column(name = "snapshot", columnDefinition = "TEXT")
+ private SnapshotMetrics snapshot;
+
+ /** Current table-property map (e.g. maintenance opt-in flags). Stored as JSON. */
+ @Type(type = "json")
+ @Column(name = "table_properties", columnDefinition = "TEXT")
+ private Map tableProperties;
+
+ /** Set on every upsert. Used for stats pipeline staleness monitoring. */
+ @Column(name = "updated_at", nullable = false)
+ private Instant updatedAt;
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/HistoryStatusDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/HistoryStatusDto.java
index 463c62605..af622d3ce 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/HistoryStatusDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/HistoryStatusDto.java
@@ -13,5 +13,15 @@ public enum HistoryStatusDto {
SUCCESS,
/** The operation failed. */
- FAILED
+ FAILED;
+
+ /** Convert to the DB-layer counterpart. */
+ public com.linkedin.openhouse.optimizer.db.HistoryStatus toDb() {
+ return com.linkedin.openhouse.optimizer.db.HistoryStatus.valueOf(name());
+ }
+
+ /** Build the internal-model enum from the DB-layer counterpart. */
+ public static HistoryStatusDto fromDb(com.linkedin.openhouse.optimizer.db.HistoryStatus v) {
+ return v == null ? null : HistoryStatusDto.valueOf(v.name());
+ }
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationStatusDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationStatusDto.java
index b766f7dbe..2963f120f 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationStatusDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationStatusDto.java
@@ -19,5 +19,15 @@ public enum OperationStatusDto {
SCHEDULED,
/** Scheduler marked this row as a duplicate of another PENDING row; not claimable. */
- CANCELED
+ CANCELED;
+
+ /** Convert to the DB-layer counterpart. */
+ public com.linkedin.openhouse.optimizer.db.OperationStatus toDb() {
+ return com.linkedin.openhouse.optimizer.db.OperationStatus.valueOf(name());
+ }
+
+ /** Build the internal-model enum from the DB-layer counterpart. */
+ public static OperationStatusDto fromDb(com.linkedin.openhouse.optimizer.db.OperationStatus v) {
+ return v == null ? null : OperationStatusDto.valueOf(v.name());
+ }
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationTypeDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationTypeDto.java
index 39b299806..e2eb1158b 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationTypeDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationTypeDto.java
@@ -8,5 +8,15 @@
public enum OperationTypeDto {
/** Removes orphaned data files no longer referenced by table metadata. */
- ORPHAN_FILES_DELETION
+ ORPHAN_FILES_DELETION;
+
+ /** Convert to the DB-layer counterpart. */
+ public com.linkedin.openhouse.optimizer.db.OperationType toDb() {
+ return com.linkedin.openhouse.optimizer.db.OperationType.valueOf(name());
+ }
+
+ /** Build the internal-model enum from the DB-layer counterpart. */
+ public static OperationTypeDto fromDb(com.linkedin.openhouse.optimizer.db.OperationType v) {
+ return v == null ? null : OperationTypeDto.valueOf(v.name());
+ }
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableDto.java
index 408bc4fc7..db68fb3c1 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableDto.java
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.optimizer.model;
+import com.linkedin.openhouse.optimizer.db.TableStatsRow;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
@@ -13,8 +14,8 @@
* by the analyzer (decides whether to produce a {@link TableOperationDto}) and the scheduler (reads
* stats for bin-packing).
*
- * Pure internal-model type — no references to wire-API or DB types. Construct via {@link
- * com.linkedin.openhouse.optimizer.model.mapper.ModelDbMapper#toTable} at the DB boundary.
+ *
Conversion methods cross into the DB layer one-way; the inverse lives on the api side. db/
+ * types know nothing about model/ or api/.
*/
@Data
@Builder(toBuilder = true)
@@ -39,4 +40,36 @@ public class TableDto {
/** When the current snapshot was last written. Stamped server-side on every upsert. */
private Instant updatedAt;
+
+ /**
+ * Project to the current-state DB row. {@code table_stats} carries the snapshot only — per-commit
+ * deltas live on {@code table_stats_history} (see {@link TableStatsHistoryDto#toRow()}).
+ */
+ public TableStatsRow toRow() {
+ return TableStatsRow.builder()
+ .tableUuid(tableUuid)
+ .databaseName(databaseName)
+ .tableName(tableId)
+ .snapshot(stats == null ? null : stats.toSnapshotRow())
+ .tableProperties(tableProperties)
+ .updatedAt(updatedAt)
+ .build();
+ }
+
+ /** Build a {@link TableDto} from a current-state DB row. */
+ public static TableDto fromRow(TableStatsRow row) {
+ if (row == null) {
+ return null;
+ }
+ return TableDto.builder()
+ .tableUuid(row.getTableUuid())
+ .databaseName(row.getDatabaseName())
+ .tableId(row.getTableName())
+ .tableProperties(
+ row.getTableProperties() != null ? row.getTableProperties() : Collections.emptyMap())
+ // table_stats holds only the snapshot — deltas live on the history table.
+ .stats(TableStatsDto.fromRows(row.getSnapshot(), null))
+ .updatedAt(row.getUpdatedAt())
+ .build();
+ }
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationDto.java
index 4cac14187..18d57ce66 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationDto.java
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.optimizer.model;
+import com.linkedin.openhouse.optimizer.db.TableOperationsRow;
import java.time.Instant;
import java.util.Comparator;
import java.util.UUID;
@@ -12,9 +13,8 @@
* An operation the analyzer has decided to schedule for a table, and that the scheduler later picks
* up and submits.
*
- *
Pure internal-model type — no references to wire-API or DB types. Cross-layer construction
- * happens via {@link com.linkedin.openhouse.optimizer.model.mapper.ModelDbMapper} (DB boundary) or
- * {@link com.linkedin.openhouse.optimizer.model.mapper.ApiModelMapper} (API boundary).
+ *
Conversion methods cross into the DB layer one-way; the inverse lives on the api side. db/
+ * types know nothing about model/ or api/.
*/
@Data
@Builder
@@ -68,4 +68,37 @@ public static TableOperationDto mostRecent(TableOperationDto a, TableOperationDt
Comparator.comparing(r -> r.getCreatedAt() != null ? r.getCreatedAt() : Instant.EPOCH);
return byCreatedAt.compare(a, b) >= 0 ? a : b;
}
+
+ /** Convert to the corresponding DB row. */
+ public TableOperationsRow toRow() {
+ return TableOperationsRow.builder()
+ .id(id)
+ .tableUuid(tableUuid)
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .operationType(operationType == null ? null : operationType.toDb())
+ .status(status == null ? null : status.toDb())
+ .createdAt(createdAt)
+ .scheduledAt(scheduledAt)
+ .jobId(jobId)
+ .build();
+ }
+
+ /** Build a {@link TableOperationDto} from a DB row. */
+ public static TableOperationDto fromRow(TableOperationsRow row) {
+ if (row == null) {
+ return null;
+ }
+ return TableOperationDto.builder()
+ .id(row.getId())
+ .tableUuid(row.getTableUuid())
+ .databaseName(row.getDatabaseName())
+ .tableName(row.getTableName())
+ .operationType(OperationTypeDto.fromDb(row.getOperationType()))
+ .status(OperationStatusDto.fromDb(row.getStatus()))
+ .createdAt(row.getCreatedAt())
+ .scheduledAt(row.getScheduledAt())
+ .jobId(row.getJobId())
+ .build();
+ }
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java
index e05bb641e..f063406b1 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.optimizer.model;
+import com.linkedin.openhouse.optimizer.db.TableOperationsHistoryRow;
import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -38,4 +39,33 @@ public class TableOperationsHistoryDto {
/** Terminal outcome: {@link HistoryStatusDto#SUCCESS} or {@link HistoryStatusDto#FAILED}. */
private HistoryStatusDto status;
+
+ /** Convert to the corresponding DB row. */
+ public TableOperationsHistoryRow toRow() {
+ return TableOperationsHistoryRow.builder()
+ .id(id)
+ .tableUuid(tableUuid)
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .operationType(operationType == null ? null : operationType.toDb())
+ .completedAt(completedAt)
+ .status(status == null ? null : status.toDb())
+ .build();
+ }
+
+ /** Build a {@link TableOperationsHistoryDto} from a DB row. */
+ public static TableOperationsHistoryDto fromRow(TableOperationsHistoryRow row) {
+ if (row == null) {
+ return null;
+ }
+ return TableOperationsHistoryDto.builder()
+ .id(row.getId())
+ .tableUuid(row.getTableUuid())
+ .databaseName(row.getDatabaseName())
+ .tableName(row.getTableName())
+ .operationType(OperationTypeDto.fromDb(row.getOperationType()))
+ .completedAt(row.getCompletedAt())
+ .status(HistoryStatusDto.fromDb(row.getStatus()))
+ .build();
+ }
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsDto.java
index d142dcc8b..6dc52492c 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsDto.java
@@ -47,6 +47,63 @@ public class TableStatsDto {
/** When the current snapshot was last written. Stamped server-side on every upsert. */
private Instant updatedAt;
+ /**
+ * Project to the current-state {@code table_stats} row. Snapshot only; deltas live on history.
+ */
+ public com.linkedin.openhouse.optimizer.db.TableStatsRow toRow() {
+ return com.linkedin.openhouse.optimizer.db.TableStatsRow.builder()
+ .tableUuid(tableUuid)
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .snapshot(snapshot == null ? null : snapshot.toDb())
+ .tableProperties(tableProperties != null ? tableProperties : Collections.emptyMap())
+ .updatedAt(updatedAt)
+ .build();
+ }
+
+ /**
+ * Build a {@link TableStatsDto} from a current-state DB row. {@link #delta} is left {@code null}.
+ */
+ public static TableStatsDto fromRow(com.linkedin.openhouse.optimizer.db.TableStatsRow row) {
+ if (row == null) {
+ return null;
+ }
+ return TableStatsDto.builder()
+ .tableUuid(row.getTableUuid())
+ .databaseName(row.getDatabaseName())
+ .tableName(row.getTableName())
+ .tableProperties(
+ row.getTableProperties() != null ? row.getTableProperties() : Collections.emptyMap())
+ .snapshot(SnapshotMetrics.fromDb(row.getSnapshot()))
+ .updatedAt(row.getUpdatedAt())
+ .build();
+ }
+
+ /** Project to the DB-layer {@link com.linkedin.openhouse.optimizer.db.SnapshotMetrics} object. */
+ public com.linkedin.openhouse.optimizer.db.SnapshotMetrics toSnapshotRow() {
+ return snapshot == null ? null : snapshot.toDb();
+ }
+
+ /**
+ * Project to the DB-layer {@link com.linkedin.openhouse.optimizer.db.CommitDeltaMetrics} object.
+ */
+ public com.linkedin.openhouse.optimizer.db.CommitDeltaMetrics toDeltaRow() {
+ return delta == null ? null : delta.toDb();
+ }
+
+ /** Join the two DB-side columns back into a single internal-model {@link TableStatsDto}. */
+ public static TableStatsDto fromRows(
+ com.linkedin.openhouse.optimizer.db.SnapshotMetrics dbSnapshot,
+ com.linkedin.openhouse.optimizer.db.CommitDeltaMetrics dbDelta) {
+ if (dbSnapshot == null && dbDelta == null) {
+ return null;
+ }
+ return TableStatsDto.builder()
+ .snapshot(SnapshotMetrics.fromDb(dbSnapshot))
+ .delta(CommitDelta.fromDb(dbDelta))
+ .build();
+ }
+
/** Point-in-time metadata read from Iceberg at scan time. */
@Data
@Builder(toBuilder = true)
@@ -66,6 +123,29 @@ public static class SnapshotMetrics {
/** Total number of data files as of the latest snapshot — used for bin-packing. */
private Long numCurrentFiles;
+
+ /** Convert to the DB-layer counterpart. */
+ public com.linkedin.openhouse.optimizer.db.SnapshotMetrics toDb() {
+ return com.linkedin.openhouse.optimizer.db.SnapshotMetrics.builder()
+ .tableVersion(tableVersion)
+ .tableLocation(tableLocation)
+ .tableSizeBytes(tableSizeBytes)
+ .numCurrentFiles(numCurrentFiles)
+ .build();
+ }
+
+ /** Build the internal-model inner object from the DB-layer counterpart. */
+ public static SnapshotMetrics fromDb(com.linkedin.openhouse.optimizer.db.SnapshotMetrics v) {
+ if (v == null) {
+ return null;
+ }
+ return SnapshotMetrics.builder()
+ .tableVersion(v.getTableVersion())
+ .tableLocation(v.getTableLocation())
+ .tableSizeBytes(v.getTableSizeBytes())
+ .numCurrentFiles(v.getNumCurrentFiles())
+ .build();
+ }
}
/** Per-commit incremental counters; accumulated across all recorded commit events. */
@@ -87,5 +167,28 @@ public static class CommitDelta {
/** Total bytes removed by this commit. */
private Long deletedSizeBytes;
+
+ /** Convert to the DB-layer counterpart. */
+ public com.linkedin.openhouse.optimizer.db.CommitDeltaMetrics toDb() {
+ return com.linkedin.openhouse.optimizer.db.CommitDeltaMetrics.builder()
+ .numFilesAdded(numFilesAdded)
+ .numFilesDeleted(numFilesDeleted)
+ .addedSizeBytes(addedSizeBytes)
+ .deletedSizeBytes(deletedSizeBytes)
+ .build();
+ }
+
+ /** Build the internal-model inner object from the DB-layer counterpart. */
+ public static CommitDelta fromDb(com.linkedin.openhouse.optimizer.db.CommitDeltaMetrics v) {
+ if (v == null) {
+ return null;
+ }
+ return CommitDelta.builder()
+ .numFilesAdded(v.getNumFilesAdded())
+ .numFilesDeleted(v.getNumFilesDeleted())
+ .addedSizeBytes(v.getAddedSizeBytes())
+ .deletedSizeBytes(v.getDeletedSizeBytes())
+ .build();
+ }
}
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsHistoryDto.java
index 5579c95ed..069944e59 100644
--- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsHistoryDto.java
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsHistoryDto.java
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.optimizer.model;
+import com.linkedin.openhouse.optimizer.db.TableStatsHistoryRow;
import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -11,8 +12,6 @@
*
*
One per Iceberg commit. {@link #stats} carries both the snapshot at commit time and the commit
* delta — consumers can reconstruct change rates over arbitrary time windows.
- *
- *
Pure internal-model type — no references to wire-API or DB types.
*/
@Data
@Builder
@@ -37,4 +36,32 @@ public class TableStatsHistoryDto {
/** When this history row was recorded. */
private Instant recordedAt;
+
+ /** Convert to the corresponding DB row. */
+ public TableStatsHistoryRow toRow() {
+ return TableStatsHistoryRow.builder()
+ .id(id)
+ .tableUuid(tableUuid)
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .snapshot(stats == null ? null : stats.toSnapshotRow())
+ .delta(stats == null ? null : stats.toDeltaRow())
+ .recordedAt(recordedAt)
+ .build();
+ }
+
+ /** Build a {@link TableStatsHistoryDto} from a DB row. */
+ public static TableStatsHistoryDto fromRow(TableStatsHistoryRow row) {
+ if (row == null) {
+ return null;
+ }
+ return TableStatsHistoryDto.builder()
+ .id(row.getId())
+ .tableUuid(row.getTableUuid())
+ .databaseName(row.getDatabaseName())
+ .tableName(row.getTableName())
+ .stats(TableStatsDto.fromRows(row.getSnapshot(), row.getDelta()))
+ .recordedAt(row.getRecordedAt())
+ .build();
+ }
}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsHistoryRepository.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsHistoryRepository.java
new file mode 100644
index 000000000..6c08f844a
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsHistoryRepository.java
@@ -0,0 +1,45 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import com.linkedin.openhouse.optimizer.db.OperationType;
+import com.linkedin.openhouse.optimizer.db.TableOperationsHistoryRow;
+import java.util.List;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.query.Param;
+
+/** Repository for reading {@code table_operations_history}. */
+public interface TableOperationsHistoryRepository
+ extends JpaRepository {
+
+ /**
+ * Return history rows for a single {@code tableUuid}, newest first. {@code pageable} is required;
+ * callers pick the row cap (default limit lives in {@code optimizer.repo.default-limit}).
+ */
+ @Query(
+ "SELECT r FROM TableOperationsHistoryRow r "
+ + "WHERE r.tableUuid = :tableUuid "
+ + "ORDER BY r.completedAt DESC")
+ List find(@Param("tableUuid") String tableUuid, Pageable pageable);
+
+ /**
+ * Return the most-recent history row per {@code (table_uuid, operation_type)}, filtered to a
+ * single operation type. Used by the analyzer to evaluate cadence without materializing every
+ * historical row.
+ *
+ * The correlated subquery is portable across MySQL and H2 (MySQL mode). Backed by index {@code
+ * idx_toph_optype_uuid_completed (operation_type, table_uuid, completed_at)} on {@code
+ * table_operations_history}, the subquery becomes an index-only lookup per outer row.
+ *
+ *
Ties on {@code completed_at} for the same {@code (table_uuid, operation_type)} return all
+ * tied rows; callers should dedupe in memory.
+ */
+ @Query(
+ "SELECT r FROM TableOperationsHistoryRow r "
+ + "WHERE r.operationType = :operationType "
+ + "AND r.completedAt = ("
+ + " SELECT MAX(r2.completedAt) FROM TableOperationsHistoryRow r2 "
+ + " WHERE r2.tableUuid = r.tableUuid AND r2.operationType = r.operationType)")
+ List findLatest(
+ @Param("operationType") OperationType operationType, Pageable pageable);
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java
new file mode 100644
index 000000000..e0df2cd21
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepository.java
@@ -0,0 +1,146 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import com.linkedin.openhouse.optimizer.db.OperationStatus;
+import com.linkedin.openhouse.optimizer.db.OperationType;
+import com.linkedin.openhouse.optimizer.db.TableOperationsRow;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.query.Param;
+
+/** Spring Data JPA repository for {@code table_operations} rows in the optimizer DB. */
+public interface TableOperationsRepository extends JpaRepository {
+
+ /**
+ * Find operation rows matching the given filters. Every filter is optional ({@link
+ * Optional#empty()} to skip). {@code pageable} is required; callers pick the row cap (default
+ * limit lives in {@code optimizer.repo.default-limit}).
+ */
+ default List find(
+ Optional operationType,
+ Optional status,
+ Optional tableUuid,
+ Optional databaseName,
+ Optional tableName,
+ Optional scheduledAt,
+ Optional> ids,
+ Pageable pageable) {
+ // List parameters can't share an :ids IS NULL pattern with the IN clause —
+ // Hibernate expands the list inline and the IS NULL check turns ungrammatical.
+ // Two internal queries; dispatch by presence.
+ if (ids.isPresent()) {
+ return findInternalWithIds(
+ operationType.orElse(null),
+ status.orElse(null),
+ tableUuid.orElse(null),
+ databaseName.orElse(null),
+ tableName.orElse(null),
+ scheduledAt.orElse(null),
+ ids.get(),
+ pageable);
+ }
+ return findInternal(
+ operationType.orElse(null),
+ status.orElse(null),
+ tableUuid.orElse(null),
+ databaseName.orElse(null),
+ tableName.orElse(null),
+ scheduledAt.orElse(null),
+ pageable);
+ }
+
+ /**
+ * Batch CAS: transition rows from {@code fromStatus} to {@code toStatus} for every id in {@code
+ * ids} that is still in {@code fromStatus}. Rows in a different status are skipped silently.
+ * Returns the number of rows transitioned.
+ *
+ * Side-effect columns use COALESCE — {@link Optional#empty()} means "leave unchanged". The
+ * underlying transitions are:
+ *
+ *
+ * - PENDING → SCHEDULING: pass {@code scheduledAt = Optional.of(claimedAt)}; the watermark
+ * lets {@link #find} resolve the precise set of rows this caller claimed.
+ *
- SCHEDULING → SCHEDULED: pass {@code jobId = Optional.of(...)}.
+ *
- SCHEDULING → PENDING: pass both empty; {@code scheduledAt} stays at the prior claim's
+ * watermark (overwritten on the next claim) and {@code jobId} stays null.
+ *
+ */
+ default int updateBatch(
+ List ids,
+ OperationStatus fromStatus,
+ OperationStatus toStatus,
+ Optional scheduledAt,
+ Optional jobId) {
+ return updateBatchInternal(
+ ids, fromStatus, toStatus, scheduledAt.orElse(null), jobId.orElse(null));
+ }
+
+ /**
+ * Delete the specified rows, but only if they are still {@code PENDING}. The status gate is
+ * defensive — never drop a row another instance has claimed. Returns the number of rows actually
+ * removed.
+ */
+ @Modifying(flushAutomatically = true, clearAutomatically = true)
+ @Query(
+ "DELETE FROM TableOperationsRow r "
+ + "WHERE r.id IN :ids "
+ + "AND r.status = com.linkedin.openhouse.optimizer.db.OperationStatus.PENDING")
+ int cancel(@Param("ids") List ids);
+
+ // ---- Internals. Use the Optional-typed default methods above. ----
+
+ @Query(
+ "SELECT r FROM TableOperationsRow r "
+ + "WHERE (:operationType IS NULL OR r.operationType = :operationType) "
+ + "AND (:status IS NULL OR r.status = :status) "
+ + "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid) "
+ + "AND (:databaseName IS NULL OR r.databaseName = :databaseName) "
+ + "AND (:tableName IS NULL OR r.tableName = :tableName) "
+ + "AND (:scheduledAt IS NULL OR r.scheduledAt = :scheduledAt)")
+ List findInternal(
+ @Param("operationType") OperationType operationType,
+ @Param("status") OperationStatus status,
+ @Param("tableUuid") String tableUuid,
+ @Param("databaseName") String databaseName,
+ @Param("tableName") String tableName,
+ @Param("scheduledAt") Instant scheduledAt,
+ Pageable pageable);
+
+ @Query(
+ "SELECT r FROM TableOperationsRow r "
+ + "WHERE (:operationType IS NULL OR r.operationType = :operationType) "
+ + "AND (:status IS NULL OR r.status = :status) "
+ + "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid) "
+ + "AND (:databaseName IS NULL OR r.databaseName = :databaseName) "
+ + "AND (:tableName IS NULL OR r.tableName = :tableName) "
+ + "AND (:scheduledAt IS NULL OR r.scheduledAt = :scheduledAt) "
+ + "AND r.id IN :ids")
+ List findInternalWithIds(
+ @Param("operationType") OperationType operationType,
+ @Param("status") OperationStatus status,
+ @Param("tableUuid") String tableUuid,
+ @Param("databaseName") String databaseName,
+ @Param("tableName") String tableName,
+ @Param("scheduledAt") Instant scheduledAt,
+ @Param("ids") List ids,
+ Pageable pageable);
+
+ @Modifying(flushAutomatically = true, clearAutomatically = true)
+ @Query(
+ "UPDATE TableOperationsRow r "
+ + "SET r.status = :toStatus, "
+ + " r.scheduledAt = COALESCE(:scheduledAt, r.scheduledAt), "
+ + " r.jobId = COALESCE(:jobId, r.jobId) "
+ + "WHERE r.id IN :ids "
+ + "AND r.status = :fromStatus")
+ int updateBatchInternal(
+ @Param("ids") List ids,
+ @Param("fromStatus") OperationStatus fromStatus,
+ @Param("toStatus") OperationStatus toStatus,
+ @Param("scheduledAt") Instant scheduledAt,
+ @Param("jobId") String jobId);
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsHistoryRepository.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsHistoryRepository.java
new file mode 100644
index 000000000..9b603f265
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsHistoryRepository.java
@@ -0,0 +1,34 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import com.linkedin.openhouse.optimizer.db.TableStatsHistoryRow;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.query.Param;
+
+/** Append-only repository for per-commit stats history rows. */
+public interface TableStatsHistoryRepository extends JpaRepository {
+
+ /**
+ * Return history rows for a table, newest first. {@code since} is optional ({@link
+ * Optional#empty()} to skip the time filter). {@code pageable} is required; callers pick the row
+ * cap (default limit lives in {@code optimizer.repo.default-limit}).
+ */
+ default List find(
+ String tableUuid, Optional since, Pageable pageable) {
+ return findInternal(tableUuid, since.orElse(null), pageable);
+ }
+
+ // ---- Internals. Use the Optional-typed default method above. ----
+
+ @Query(
+ "SELECT r FROM TableStatsHistoryRow r "
+ + "WHERE r.tableUuid = :tableUuid "
+ + "AND (:since IS NULL OR r.recordedAt >= :since) "
+ + "ORDER BY r.recordedAt DESC")
+ List findInternal(
+ @Param("tableUuid") String tableUuid, @Param("since") Instant since, Pageable pageable);
+}
diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepository.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepository.java
new file mode 100644
index 000000000..1123c0e7a
--- /dev/null
+++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepository.java
@@ -0,0 +1,48 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import com.linkedin.openhouse.optimizer.db.TableStatsRow;
+import java.util.List;
+import java.util.Optional;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.query.Param;
+
+/** Spring Data JPA repository for {@code table_stats} rows in the optimizer DB. */
+public interface TableStatsRepository extends JpaRepository {
+
+ /**
+ * Return stats rows matching the given filters. Every filter is optional ({@link
+ * Optional#empty()} to skip). {@code pageable} is required; callers pick the row cap (default
+ * limit lives in {@code optimizer.repo.default-limit}).
+ */
+ default List find(
+ Optional databaseName,
+ Optional tableName,
+ Optional tableUuid,
+ Pageable pageable) {
+ return findInternal(
+ databaseName.orElse(null), tableName.orElse(null), tableUuid.orElse(null), pageable);
+ }
+
+ /**
+ * Return the distinct {@code database_name} values present in {@code table_stats}. Used by the
+ * Analyzer to enumerate databases when iterating per-db; the result set size is bounded by the
+ * number of databases (small even at million-table scale).
+ */
+ @Query("SELECT DISTINCT r.databaseName FROM TableStatsRow r")
+ List findDistinctDatabaseNames();
+
+ // ---- Internals. Use the Optional-typed default methods above. ----
+
+ @Query(
+ "SELECT r FROM TableStatsRow r "
+ + "WHERE (:databaseName IS NULL OR r.databaseName = :databaseName) "
+ + "AND (:tableName IS NULL OR r.tableName = :tableName) "
+ + "AND (:tableUuid IS NULL OR r.tableUuid = :tableUuid)")
+ List findInternal(
+ @Param("databaseName") String databaseName,
+ @Param("tableName") String tableName,
+ @Param("tableUuid") String tableUuid,
+ Pageable pageable);
+}
diff --git a/services/optimizer/src/main/resources/db/optimizer-schema.sql b/services/optimizer/src/main/resources/db/optimizer-schema.sql
new file mode 100644
index 000000000..892c1c55f
--- /dev/null
+++ b/services/optimizer/src/main/resources/db/optimizer-schema.sql
@@ -0,0 +1,54 @@
+-- Optimizer Service Schema
+-- Compatible with MySQL (production) and H2 in MySQL mode (tests).
+CREATE TABLE IF NOT EXISTS table_operations (
+ id VARCHAR(36) NOT NULL,
+ table_uuid VARCHAR(36) NOT NULL,
+ database_name VARCHAR(128) NOT NULL,
+ table_name VARCHAR(128) NOT NULL,
+ operation_type VARCHAR(50) NOT NULL,
+ status VARCHAR(20) NOT NULL,
+ created_at TIMESTAMP(6) NOT NULL,
+ scheduled_at TIMESTAMP(6),
+ job_id VARCHAR(255),
+ -- TODO: per-operation metric columns will be added as operations are onboarded.
+ PRIMARY KEY (id)
+);
+
+CREATE TABLE IF NOT EXISTS table_stats (
+ table_uuid VARCHAR(36) NOT NULL,
+ database_name VARCHAR(128) NOT NULL,
+ table_name VARCHAR(128) NOT NULL,
+ snapshot TEXT,
+ table_properties TEXT,
+ updated_at TIMESTAMP(6) NOT NULL,
+ PRIMARY KEY (table_uuid)
+);
+
+CREATE TABLE IF NOT EXISTS table_stats_history (
+ id VARCHAR(36) NOT NULL,
+ table_uuid VARCHAR(36) NOT NULL,
+ database_name VARCHAR(128) NOT NULL,
+ table_name VARCHAR(128) NOT NULL,
+ snapshot TEXT,
+ delta TEXT,
+ recorded_at TIMESTAMP(6) NOT NULL,
+ PRIMARY KEY (id),
+ INDEX idx_tsh_table_uuid (table_uuid),
+ INDEX idx_tsh_recorded_at (recorded_at)
+);
+
+CREATE TABLE IF NOT EXISTS table_operations_history (
+ id VARCHAR(36) NOT NULL,
+ table_uuid VARCHAR(36) NOT NULL,
+ database_name VARCHAR(128) NOT NULL,
+ table_name VARCHAR(128) NOT NULL,
+ operation_type VARCHAR(50) NOT NULL,
+ completed_at TIMESTAMP(6) NOT NULL,
+ status VARCHAR(20) NOT NULL,
+ PRIMARY KEY (id),
+ INDEX idx_toph_db_table (database_name, table_name),
+ -- Drives TableOperationHistoryRepository.findLatestPerTable: the correlated
+ -- MAX(completed_at) subquery becomes an index-only lookup per (operation_type,
+ -- table_uuid) instead of an O(N²) scan.
+ INDEX idx_toph_optype_uuid_completed (operation_type, table_uuid, completed_at)
+);
diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/OptimizerServiceContextTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/OptimizerServiceContextTest.java
new file mode 100644
index 000000000..fa373c57d
--- /dev/null
+++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/OptimizerServiceContextTest.java
@@ -0,0 +1,25 @@
+package com.linkedin.openhouse.optimizer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.ApplicationContext;
+import org.springframework.test.context.ActiveProfiles;
+
+/**
+ * Validates that the Spring application context loads successfully against the H2 schema. This test
+ * exercises schema-SQL-init, JPA entity scanning, and repository wiring.
+ */
+@SpringBootTest
+@ActiveProfiles("test")
+class OptimizerServiceContextTest {
+
+ @Autowired ApplicationContext context;
+
+ @Test
+ void contextLoads() {
+ assertThat(context).isNotNull();
+ }
+}
diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableOperationsHistoryRepositoryTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableOperationsHistoryRepositoryTest.java
new file mode 100644
index 000000000..9f1de0c0c
--- /dev/null
+++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableOperationsHistoryRepositoryTest.java
@@ -0,0 +1,130 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.optimizer.db.HistoryStatus;
+import com.linkedin.openhouse.optimizer.db.OperationType;
+import com.linkedin.openhouse.optimizer.db.TableOperationsHistoryRow;
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.transaction.annotation.Transactional;
+
+@SpringBootTest
+@ActiveProfiles("test")
+@Transactional
+class TableOperationsHistoryRepositoryTest {
+
+ @Autowired TableOperationsHistoryRepository repository;
+
+ @Test
+ void findByTableUuid_returnsRowsNewestFirst() {
+ Instant t1 = Instant.parse("2024-01-01T10:00:00Z");
+ Instant t2 = Instant.parse("2024-01-02T10:00:00Z");
+ String tableUuid = UUID.randomUUID().toString();
+ String idOlder = UUID.randomUUID().toString();
+ String idNewer = UUID.randomUUID().toString();
+
+ repository.save(
+ TableOperationsHistoryRow.builder()
+ .id(idOlder)
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .completedAt(t1)
+ .status(HistoryStatus.SUCCESS)
+ .build());
+
+ repository.save(
+ TableOperationsHistoryRow.builder()
+ .id(idNewer)
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .completedAt(t2)
+ .status(HistoryStatus.FAILED)
+ .build());
+
+ List rows = repository.find(tableUuid, PageRequest.of(0, 10));
+
+ assertThat(rows).hasSize(2);
+ assertThat(rows.get(0).getId()).isEqualTo(idNewer);
+ assertThat(rows.get(1).getId()).isEqualTo(idOlder);
+ }
+
+ @Test
+ void findByTableUuid_respectsLimit() {
+ Instant now = Instant.now();
+ String tableUuid = UUID.randomUUID().toString();
+ for (int i = 0; i < 5; i++) {
+ repository.save(
+ TableOperationsHistoryRow.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl3")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .completedAt(now.plusSeconds(i))
+ .status(HistoryStatus.SUCCESS)
+ .build());
+ }
+
+ List rows = repository.find(tableUuid, PageRequest.of(0, 3));
+ assertThat(rows).hasSize(3);
+ }
+
+ @Test
+ void findLatestPerTable_returnsOneRowPerTableUuid() {
+ Instant t1 = Instant.parse("2024-01-01T10:00:00Z");
+ Instant t2 = Instant.parse("2024-02-01T10:00:00Z");
+ String tableUuid = UUID.randomUUID().toString();
+ String otherUuid = UUID.randomUUID().toString();
+
+ repository.save(
+ TableOperationsHistoryRow.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .completedAt(t1)
+ .status(HistoryStatus.SUCCESS)
+ .build());
+ repository.save(
+ TableOperationsHistoryRow.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .completedAt(t2)
+ .status(HistoryStatus.FAILED)
+ .build());
+ repository.save(
+ TableOperationsHistoryRow.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(otherUuid)
+ .databaseName("db1")
+ .tableName("tbl2")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .completedAt(t1)
+ .status(HistoryStatus.SUCCESS)
+ .build());
+
+ List latest =
+ repository.findLatest(OperationType.ORPHAN_FILES_DELETION, PageRequest.of(0, 10_000));
+
+ assertThat(latest).hasSize(2);
+ TableOperationsHistoryRow forTarget =
+ latest.stream().filter(r -> r.getTableUuid().equals(tableUuid)).findFirst().orElseThrow();
+ assertThat(forTarget.getCompletedAt()).isEqualTo(t2);
+ assertThat(forTarget.getStatus()).isEqualTo(HistoryStatus.FAILED);
+ }
+}
diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepositoryTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepositoryTest.java
new file mode 100644
index 000000000..072be5fd9
--- /dev/null
+++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableOperationsRepositoryTest.java
@@ -0,0 +1,312 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.optimizer.db.OperationStatus;
+import com.linkedin.openhouse.optimizer.db.OperationType;
+import com.linkedin.openhouse.optimizer.db.TableOperationsRow;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.transaction.annotation.Transactional;
+
+@SpringBootTest
+@ActiveProfiles("test")
+@Transactional
+class TableOperationsRepositoryTest {
+
+ private static final Pageable PAGE = PageRequest.of(0, 10_000);
+
+ @Autowired TableOperationsRepository repository;
+
+ @Test
+ void saveAndFindById() {
+ String id = UUID.randomUUID().toString();
+
+ repository.save(pendingRow(id, "tbl1"));
+
+ Optional found = repository.findById(id);
+ assertThat(found).isPresent();
+ assertThat(found.get().getStatus()).isEqualTo(OperationStatus.PENDING);
+ }
+
+ @Test
+ void find_noFilters_returnsAll() {
+ repository.save(pendingRow(UUID.randomUUID().toString(), "tbl1"));
+ repository.save(scheduledRow(UUID.randomUUID().toString(), "tbl2"));
+
+ List rows =
+ repository.find(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ PAGE);
+ assertThat(rows).hasSize(2);
+ }
+
+ @Test
+ void find_byStatus() {
+ repository.save(pendingRow(UUID.randomUUID().toString(), "tbl1"));
+ repository.save(scheduledRow(UUID.randomUUID().toString(), "tbl2"));
+
+ List pending =
+ repository.find(
+ Optional.empty(),
+ Optional.of(OperationStatus.PENDING),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ PAGE);
+ assertThat(pending).hasSize(1);
+ assertThat(pending.get(0).getStatus()).isEqualTo(OperationStatus.PENDING);
+
+ List scheduled =
+ repository.find(
+ Optional.empty(),
+ Optional.of(OperationStatus.SCHEDULED),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ PAGE);
+ assertThat(scheduled).hasSize(1);
+ assertThat(scheduled.get(0).getStatus()).isEqualTo(OperationStatus.SCHEDULED);
+ }
+
+ @Test
+ void find_byDatabaseAndTable() {
+ repository.save(pendingRow(UUID.randomUUID().toString(), "tbl1", "db1"));
+ repository.save(pendingRow(UUID.randomUUID().toString(), "tbl2", "db2"));
+
+ assertThat(
+ repository.find(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of("db1"),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ PAGE))
+ .hasSize(1);
+ assertThat(
+ repository.find(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of("db2"),
+ Optional.of("tbl2"),
+ Optional.empty(),
+ Optional.empty(),
+ PAGE))
+ .hasSize(1);
+ assertThat(
+ repository.find(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of("db1"),
+ Optional.of("tbl2"),
+ Optional.empty(),
+ Optional.empty(),
+ PAGE))
+ .isEmpty();
+ }
+
+ @Test
+ void find_byScheduledAtAndIds_resolvesClaimedSubset() {
+ String idA = UUID.randomUUID().toString();
+ String idB = UUID.randomUUID().toString();
+ String idC = UUID.randomUUID().toString();
+ repository.save(pendingRow(idA, "tbl_a"));
+ repository.save(pendingRow(idB, "tbl_b"));
+ // idC is already SCHEDULING with an older watermark — must NOT appear.
+ repository.save(
+ TableOperationsRow.builder()
+ .id(idC)
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName("tbl_c")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .status(OperationStatus.SCHEDULING)
+ .createdAt(Instant.now())
+ .scheduledAt(Instant.now().minusSeconds(60))
+ .build());
+
+ // Truncate to microseconds — MySQL TIMESTAMP(6) (and H2 in MySQL mode) stores microseconds,
+ // so a nano-precision now() round-trips lossily. On Linux CI Instant.now() carries nanos;
+ // truncating here keeps the watermark comparison exact across platforms.
+ Instant now = Instant.now().truncatedTo(ChronoUnit.MICROS);
+ int transitioned =
+ repository.updateBatch(
+ List.of(idA, idB, idC),
+ OperationStatus.PENDING,
+ OperationStatus.SCHEDULING,
+ Optional.of(now),
+ Optional.empty());
+ assertThat(transitioned).isEqualTo(2);
+
+ List claimedIds =
+ repository
+ .find(
+ Optional.empty(),
+ Optional.of(OperationStatus.SCHEDULING),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of(now),
+ Optional.of(List.of(idA, idB, idC)),
+ PAGE)
+ .stream()
+ .map(TableOperationsRow::getId)
+ .collect(Collectors.toList());
+ assertThat(claimedIds).containsExactlyInAnyOrder(idA, idB);
+ }
+
+ @Test
+ void updateBatch_schedulingToScheduled_setsJobIdAndPreservesScheduledAt() {
+ String id = UUID.randomUUID().toString();
+ Instant claimedAt = Instant.parse("2026-05-20T16:42:43Z");
+ repository.save(
+ TableOperationsRow.builder()
+ .id(id)
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .status(OperationStatus.SCHEDULING)
+ .createdAt(Instant.now())
+ .scheduledAt(claimedAt)
+ .build());
+
+ int updated =
+ repository.updateBatch(
+ List.of(id),
+ OperationStatus.SCHEDULING,
+ OperationStatus.SCHEDULED,
+ Optional.empty(),
+ Optional.of("job-123"));
+ assertThat(updated).isEqualTo(1);
+
+ TableOperationsRow row = repository.findById(id).orElseThrow();
+ assertThat(row.getStatus()).isEqualTo(OperationStatus.SCHEDULED);
+ assertThat(row.getJobId()).isEqualTo("job-123");
+ assertThat(row.getScheduledAt()).isEqualTo(claimedAt);
+ }
+
+ @Test
+ void updateBatch_schedulingToPending_leavesScheduledAtUntouched() {
+ // scheduledAt is intentionally NOT cleared on revert. Status is the source of truth; the
+ // stale watermark gets overwritten on the next PENDING → SCHEDULING transition.
+ String id = UUID.randomUUID().toString();
+ Instant claimedAt = Instant.parse("2026-05-20T16:42:43Z");
+ repository.save(
+ TableOperationsRow.builder()
+ .id(id)
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .status(OperationStatus.SCHEDULING)
+ .createdAt(Instant.now())
+ .scheduledAt(claimedAt)
+ .build());
+
+ int reverted =
+ repository.updateBatch(
+ List.of(id),
+ OperationStatus.SCHEDULING,
+ OperationStatus.PENDING,
+ Optional.empty(),
+ Optional.empty());
+ assertThat(reverted).isEqualTo(1);
+
+ TableOperationsRow row = repository.findById(id).orElseThrow();
+ assertThat(row.getStatus()).isEqualTo(OperationStatus.PENDING);
+ assertThat(row.getScheduledAt()).isEqualTo(claimedAt);
+ }
+
+ @Test
+ void updateBatch_skipsRowsNotInFromStatus() {
+ String pendingId = UUID.randomUUID().toString();
+ String scheduledId = UUID.randomUUID().toString();
+ repository.save(pendingRow(pendingId, "tbl_a"));
+ repository.save(scheduledRow(scheduledId, "tbl_b"));
+
+ int transitioned =
+ repository.updateBatch(
+ List.of(pendingId, scheduledId),
+ OperationStatus.PENDING,
+ OperationStatus.SCHEDULING,
+ Optional.of(Instant.now()),
+ Optional.empty());
+ assertThat(transitioned).isEqualTo(1);
+
+ assertThat(repository.findById(pendingId).orElseThrow().getStatus())
+ .isEqualTo(OperationStatus.SCHEDULING);
+ assertThat(repository.findById(scheduledId).orElseThrow().getStatus())
+ .isEqualTo(OperationStatus.SCHEDULED);
+ }
+
+ @Test
+ void cancel_deletesOnlyPendingRows() {
+ String pendingId = UUID.randomUUID().toString();
+ String scheduledId = UUID.randomUUID().toString();
+ repository.save(pendingRow(pendingId, "tbl_p"));
+ repository.save(scheduledRow(scheduledId, "tbl_s"));
+
+ int deleted = repository.cancel(List.of(pendingId, scheduledId));
+ assertThat(deleted).isEqualTo(1);
+
+ assertThat(repository.findById(pendingId)).isEmpty();
+ assertThat(repository.findById(scheduledId)).isPresent();
+ }
+
+ // --- helpers ---
+
+ private TableOperationsRow pendingRow(String id, String tableName) {
+ return pendingRow(id, tableName, "db1");
+ }
+
+ private TableOperationsRow pendingRow(String id, String tableName, String databaseName) {
+ return TableOperationsRow.builder()
+ .id(id)
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .status(OperationStatus.PENDING)
+ .createdAt(Instant.now())
+ .build();
+ }
+
+ private TableOperationsRow scheduledRow(String id, String tableName) {
+ return TableOperationsRow.builder()
+ .id(id)
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName(tableName)
+ .operationType(OperationType.ORPHAN_FILES_DELETION)
+ .status(OperationStatus.SCHEDULED)
+ .createdAt(Instant.now())
+ .scheduledAt(Instant.now())
+ .jobId("job-" + id)
+ .build();
+ }
+}
diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableStatsHistoryRepositoryTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableStatsHistoryRepositoryTest.java
new file mode 100644
index 000000000..cddec50c9
--- /dev/null
+++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableStatsHistoryRepositoryTest.java
@@ -0,0 +1,148 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.optimizer.db.CommitDeltaMetrics;
+import com.linkedin.openhouse.optimizer.db.SnapshotMetrics;
+import com.linkedin.openhouse.optimizer.db.TableStatsHistoryRow;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.transaction.annotation.Transactional;
+
+@SpringBootTest
+@ActiveProfiles("test")
+@Transactional
+class TableStatsHistoryRepositoryTest {
+
+ @Autowired TableStatsHistoryRepository repository;
+
+ @Test
+ void saveAndFind() {
+ String tableUuid = UUID.randomUUID().toString();
+ Instant now = Instant.now();
+
+ repository.save(buildRow(tableUuid, "db1", "tbl1", 10L, 2L, now.minus(2, ChronoUnit.HOURS)));
+ repository.save(buildRow(tableUuid, "db1", "tbl1", 5L, 1L, now.minus(1, ChronoUnit.HOURS)));
+ repository.save(buildRow(tableUuid, "db1", "tbl1", 3L, 0L, now));
+
+ List rows =
+ repository.find(tableUuid, Optional.empty(), PageRequest.of(0, 100));
+
+ assertThat(rows).hasSize(3);
+ // newest first
+ assertThat(rows.get(0).getDelta().getNumFilesAdded()).isEqualTo(3L);
+ assertThat(rows.get(2).getDelta().getNumFilesAdded()).isEqualTo(10L);
+ }
+
+ @Test
+ void find_respectsLimit() {
+ String tableUuid = UUID.randomUUID().toString();
+ Instant now = Instant.now();
+
+ for (int i = 0; i < 5; i++) {
+ repository.save(buildRow(tableUuid, "db1", "tbl1", i, 0L, now.minus(i, ChronoUnit.HOURS)));
+ }
+
+ List rows =
+ repository.find(tableUuid, Optional.empty(), PageRequest.of(0, 3));
+
+ assertThat(rows).hasSize(3);
+ }
+
+ @Test
+ void find_withSince_filtersOlderRows() {
+ String tableUuid = UUID.randomUUID().toString();
+ Instant now = Instant.now();
+ Instant cutoff = now.minus(90, ChronoUnit.MINUTES);
+
+ repository.save(buildRow(tableUuid, "db1", "tbl1", 10L, 2L, now.minus(2, ChronoUnit.HOURS)));
+ repository.save(buildRow(tableUuid, "db1", "tbl1", 5L, 1L, now.minus(1, ChronoUnit.HOURS)));
+ repository.save(buildRow(tableUuid, "db1", "tbl1", 3L, 0L, now));
+
+ List rows =
+ repository.find(tableUuid, Optional.of(cutoff), PageRequest.of(0, 100));
+
+ // only the 2 rows within the last 90 minutes
+ assertThat(rows).hasSize(2);
+ assertThat(rows.get(0).getDelta().getNumFilesAdded()).isEqualTo(3L);
+ }
+
+ @Test
+ void find_isolatesByTableUuid() {
+ String uuid1 = UUID.randomUUID().toString();
+ String uuid2 = UUID.randomUUID().toString();
+ Instant now = Instant.now();
+
+ repository.save(buildRow(uuid1, "db1", "tbl1", 10L, 0L, now));
+ repository.save(buildRow(uuid2, "db2", "tbl2", 20L, 0L, now));
+
+ assertThat(repository.find(uuid1, Optional.empty(), PageRequest.of(0, 100))).hasSize(1);
+ assertThat(repository.find(uuid2, Optional.empty(), PageRequest.of(0, 100))).hasSize(1);
+ }
+
+ @Test
+ void callerSetIdIsPreserved() {
+ String tableUuid = UUID.randomUUID().toString();
+ String id1 = UUID.randomUUID().toString();
+ String id2 = UUID.randomUUID().toString();
+ Instant now = Instant.now();
+
+ TableStatsHistoryRow row1 =
+ repository.save(buildRow(id1, tableUuid, "db1", "tbl1", 1L, 0L, now));
+ TableStatsHistoryRow row2 =
+ repository.save(buildRow(id2, tableUuid, "db1", "tbl1", 2L, 0L, now));
+
+ assertThat(row1.getId()).isEqualTo(id1);
+ assertThat(row2.getId()).isEqualTo(id2);
+ assertThat(repository.findById(id1)).isPresent();
+ assertThat(repository.findById(id2)).isPresent();
+ }
+
+ private static TableStatsHistoryRow buildRow(
+ String tableUuid,
+ String databaseName,
+ String tableName,
+ long numFilesAdded,
+ long numFilesDeleted,
+ Instant recordedAt) {
+ return buildRow(
+ UUID.randomUUID().toString(),
+ tableUuid,
+ databaseName,
+ tableName,
+ numFilesAdded,
+ numFilesDeleted,
+ recordedAt);
+ }
+
+ private static TableStatsHistoryRow buildRow(
+ String id,
+ String tableUuid,
+ String databaseName,
+ String tableName,
+ long numFilesAdded,
+ long numFilesDeleted,
+ Instant recordedAt) {
+ return TableStatsHistoryRow.builder()
+ .id(id)
+ .tableUuid(tableUuid)
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .snapshot(SnapshotMetrics.builder().tableSizeBytes(1024L).build())
+ .delta(
+ CommitDeltaMetrics.builder()
+ .numFilesAdded(numFilesAdded)
+ .numFilesDeleted(numFilesDeleted)
+ .build())
+ .recordedAt(recordedAt)
+ .build();
+ }
+}
diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepositoryTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepositoryTest.java
new file mode 100644
index 000000000..e73ac0cb4
--- /dev/null
+++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/repository/TableStatsRepositoryTest.java
@@ -0,0 +1,129 @@
+package com.linkedin.openhouse.optimizer.repository;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.optimizer.db.SnapshotMetrics;
+import com.linkedin.openhouse.optimizer.db.TableStatsRow;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.transaction.annotation.Transactional;
+
+@SpringBootTest
+@ActiveProfiles("test")
+@Transactional
+class TableStatsRepositoryTest {
+
+ private static final Pageable PAGE = PageRequest.of(0, 10_000);
+
+ @Autowired TableStatsRepository repository;
+
+ @Test
+ void saveAndFindById() {
+ String tableUuid = UUID.randomUUID().toString();
+ SnapshotMetrics snapshot = SnapshotMetrics.builder().tableSizeBytes(1024L).build();
+
+ repository.save(
+ TableStatsRow.builder()
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl1")
+ .snapshot(snapshot)
+ .tableProperties(Map.of("maintenance.optimizer.ofd.enabled", "true"))
+ .updatedAt(Instant.now())
+ .build());
+
+ Optional found = repository.findById(tableUuid);
+ assertThat(found).isPresent();
+ assertThat(found.get().getDatabaseName()).isEqualTo("db1");
+ assertThat(found.get().getSnapshot().getTableSizeBytes()).isEqualTo(1024L);
+ assertThat(found.get().getTableProperties())
+ .containsEntry("maintenance.optimizer.ofd.enabled", "true");
+ }
+
+ @Test
+ void upsert_overwritesPreviousStats() {
+ String tableUuid = UUID.randomUUID().toString();
+
+ repository.save(
+ TableStatsRow.builder()
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl1")
+ .snapshot(SnapshotMetrics.builder().tableSizeBytes(100L).build())
+ .updatedAt(Instant.now())
+ .build());
+
+ repository.save(
+ TableStatsRow.builder()
+ .tableUuid(tableUuid)
+ .databaseName("db1")
+ .tableName("tbl1")
+ .snapshot(SnapshotMetrics.builder().tableSizeBytes(200L).build())
+ .updatedAt(Instant.now())
+ .build());
+
+ assertThat(repository.findAll()).hasSize(1);
+ assertThat(repository.findById(tableUuid).get().getSnapshot().getTableSizeBytes())
+ .isEqualTo(200L);
+ }
+
+ @Test
+ void find_noParams_returnsAll() {
+ repository.save(
+ TableStatsRow.builder()
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName("tbl1")
+ .snapshot(SnapshotMetrics.builder().tableSizeBytes(100L).build())
+ .updatedAt(Instant.now())
+ .build());
+ repository.save(
+ TableStatsRow.builder()
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db2")
+ .tableName("tbl2")
+ .snapshot(SnapshotMetrics.builder().tableSizeBytes(200L).build())
+ .updatedAt(Instant.now())
+ .build());
+
+ assertThat(repository.find(Optional.empty(), Optional.empty(), Optional.empty(), PAGE))
+ .hasSize(2);
+ }
+
+ @Test
+ void find_byDatabase() {
+ repository.save(
+ TableStatsRow.builder()
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName("tbl1")
+ .snapshot(SnapshotMetrics.builder().tableSizeBytes(100L).build())
+ .updatedAt(Instant.now())
+ .build());
+ repository.save(
+ TableStatsRow.builder()
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db2")
+ .tableName("tbl2")
+ .snapshot(SnapshotMetrics.builder().tableSizeBytes(200L).build())
+ .updatedAt(Instant.now())
+ .build());
+
+ assertThat(repository.find(Optional.of("db1"), Optional.empty(), Optional.empty(), PAGE))
+ .hasSize(1);
+ assertThat(
+ repository
+ .find(Optional.of("db1"), Optional.empty(), Optional.empty(), PAGE)
+ .get(0)
+ .getDatabaseName())
+ .isEqualTo("db1");
+ }
+}
diff --git a/services/optimizer/src/test/resources/application-test.properties b/services/optimizer/src/test/resources/application-test.properties
new file mode 100644
index 000000000..97b7841dc
--- /dev/null
+++ b/services/optimizer/src/test/resources/application-test.properties
@@ -0,0 +1,12 @@
+spring.datasource.url=jdbc:h2:mem:optimizer_test;MODE=MySQL;DATABASE_TO_LOWER=TRUE;DB_CLOSE_DELAY=-1
+spring.datasource.driver-class-name=org.h2.Driver
+spring.datasource.username=sa
+spring.datasource.password=
+
+spring.jpa.hibernate.ddl-auto=none
+spring.sql.init.mode=always
+spring.jpa.defer-datasource-initialization=true
+spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect
+spring.jpa.properties.hibernate.physical_naming_strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
+
+spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql