Skip to content

Commit fb0097c

Browse files
jiang95-devLevi Jiang
andauthored
[RTAS]: Fix bug - remove fs scheme from tableLocation in commit (cont) (#594)
## Summary This is the follow up PR of #542. After the fix, we were able to replace the table once but not multiple times. RC anslysis: 1. `OpenHouseInternalTableOperations` uses `metadata.location()` for the new HTS tableLocation, and that value comes from `.withLocation(tableLocation)` that we pass, so we need to make sure it's schemeless. 2. We used `tableDto.getTableVersion()` to populate `.withLocation(tableLocation)`, but tableDto always contains scheme. (Why is tableLocation = tableVersion in tableDto? Because tableVersion comes from client request, and client request uses the tableLocation from the server response, so they will always be same and contain scheme) 3. The last PR fixes the tableLocation in table properties, so the new tableVersion for the first replace will be schemeless. But since the new tableLocation has scheme, the next replace will fail. Therefore, in this PR, we use the tableLocation from the table properties that we had just stripped in the last PR for `.withLocation(tableLocation)`. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [x] Bug Fixes - [ ] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [x] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [x] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. **Unit tests** I removed `stripPathScheme` in the spark E2E tests to gurantee that scheme issues can be captured by unit tests. **Test 1: RTAS** ``` scala> spark.sql(s"CREATE TABLE $tableName TBLPROPERTIES ('prop1'='val1', 'prop2'='val2') AS SELECT * FROM $sourceName"); res3: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"INSERT INTO $tableName values (4, 'd')"); res4: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"REPLACE TABLE $tableName PARTITIONED BY (part) TBLPROPERTIES ('prop1'='newval1', 'prop3'='val3') AS SELECT id, data, CASE WHEN (id % 2) = 0 THEN 'even' ELSE 'odd' END AS part FROM $sourceName ORDER BY 3, 1"); res5: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"REPLACE TABLE $tableName PARTITIONED BY (part) AS SELECT 2 * id as id, data, CASE WHEN ((2 * id) % 2) = 0 THEN 'even' ELSE 'odd' END AS part FROM $sourceName ORDER BY 3, 1"); res12: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"SELECT * FROM $tableName").show(false) +---+----+----+ |id |data|part| +---+----+----+ |2 |a |even| |4 |b |even| |6 |c |even| +---+----+----+ ``` **Test 2: CRTAS** ``` scala> spark.sql(s"CREATE OR REPLACE TABLE $tableName TBLPROPERTIES ('prop1'='val1', 'prop2'='val2') AS SELECT * FROM $sourceName"); res19: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"INSERT INTO $tableName values (4, 'd')"); res20: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"CREATE OR REPLACE TABLE $tableName PARTITIONED BY (part) AS SELECT id, data, CASE WHEN id % 2 = 0 THEN 'even' ELSE 'odd' END AS part FROM $sourceName ORDER BY 3, 1"); res21: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"CREATE OR REPLACE TABLE $tableName PARTITIONED BY (part) AS SELECT 2 * id as id, data, CASE WHEN ((2 * id) % 2) = 0 THEN 'even' ELSE 'odd' END AS part FROM $sourceName ORDER BY 3, 1"); res22: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"SELECT * FROM $tableName").show(false) +---+----+----+ |id |data|part| +---+----+----+ |2 |a |even| |4 |b |even| |6 |c |even| +---+----+----+ ``` # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --------- Co-authored-by: Levi Jiang <lejiang@lejiang-mn2962.linkedin.biz>
1 parent 7a1eb61 commit fb0097c

4 files changed

Lines changed: 9 additions & 27 deletions

File tree

apps/spark/src/test/java/com/linkedin/openhouse/catalog/e2e/RTASJavaTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ private void verifyReplacedTable(
141141
Table replacedTable = catalog.loadTable(TABLE_IDENT);
142142

143143
assertEquals(
144-
stripPathScheme(originalLocation),
145-
stripPathScheme(replacedTable.location()),
144+
originalLocation,
145+
replacedTable.location(),
146146
"Table location should be preserved after replace");
147147
assertEquals(
148148
REPLACE_SCHEMA.asStruct(),

integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/RTASTest.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,7 @@ public void testRTAS() throws Exception {
8383
Table rtasTable = catalog.loadTable(tableIdent);
8484

8585
// verify table location is unchanged
86-
assertEquals(
87-
stripPathScheme(expectedTableLocation),
88-
stripPathScheme(rtasTable.location()),
89-
"Should have same table location");
86+
assertEquals(expectedTableLocation, rtasTable.location(), "Should have same table location");
9087
// verify schema and spec are changed
9188
assertEquals(
9289
expectedSchema.asStruct(),
@@ -151,10 +148,7 @@ public void testCreateRTAS() throws Exception {
151148
Table rtasTable = catalog.loadTable(tableIdent);
152149

153150
// verify table location is unchanged
154-
assertEquals(
155-
stripPathScheme(expectedTableLocation),
156-
stripPathScheme(rtasTable.location()),
157-
"Should have same table location");
151+
assertEquals(expectedTableLocation, rtasTable.location(), "Should have same table location");
158152
// verify schema and spec are changed
159153
assertEquals(
160154
expectedSchema.asStruct(),
@@ -201,10 +195,7 @@ public void testDataFrameV2Replace() throws Exception {
201195
Table rtasTable = catalog.loadTable(tableIdent);
202196

203197
// verify table location is unchanged
204-
assertEquals(
205-
stripPathScheme(expectedTableLocation),
206-
stripPathScheme(rtasTable.location()),
207-
"Should have same table location");
198+
assertEquals(expectedTableLocation, rtasTable.location(), "Should have same table location");
208199
// verify schema and spec are changed
209200
assertEquals(
210201
expectedSchema.asStruct(),
@@ -265,10 +256,7 @@ public void testDataFrameV2CreateOrReplace() throws Exception {
265256
Table rtasTable = catalog.loadTable(tableIdent);
266257

267258
// verify table location is unchanged
268-
assertEquals(
269-
stripPathScheme(expectedTableLocation),
270-
stripPathScheme(rtasTable.location()),
271-
"Should have same table location");
259+
assertEquals(expectedTableLocation, rtasTable.location(), "Should have same table location");
272260
// verify schema and spec are changed
273261
assertEquals(
274262
expectedSchema.asStruct(),

services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.linkedin.openhouse.internal.catalog.CatalogConstants.*;
44
import static com.linkedin.openhouse.internal.catalog.mapper.HouseTableSerdeUtils.*;
5+
import static com.linkedin.openhouse.internal.catalog.mapper.HouseTableSerdeUtils.getCanonicalFieldName;
56
import static com.linkedin.openhouse.tables.repository.impl.InternalRepositoryUtils.*;
67

78
import com.google.common.annotations.VisibleForTesting;
@@ -158,8 +159,8 @@ public TableDto save(TableDto tableDto) {
158159
Map<String, String> tableProps = computePropsForTableCreation(tableDto);
159160
tablePolicyManager.managePoliciesOnCreateIfNeeded(tableDto);
160161
SortOrder sortOrder = getIcebergSortOrder(tableDto, writeSchema);
161-
String tableLocation =
162-
tableDto.getTableVersion().substring(0, tableDto.getTableVersion().lastIndexOf("/"));
162+
String metadataLocation = tableProps.get(getCanonicalFieldName("tableLocation"));
163+
String tableLocation = metadataLocation.substring(0, metadataLocation.lastIndexOf("/"));
163164
table =
164165
replaceTable(
165166
tableIdentifier, writeSchema, partitionSpec, tableLocation, tableProps, sortOrder);

tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/OpenHouseSparkITest.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,4 @@ protected Catalog getOpenHouseCatalog(SparkSession spark) {
131131
catalogProperties,
132132
spark.sparkContext().hadoopConfiguration());
133133
}
134-
135-
/**
136-
* Getting rid of "file:" part if needed for ease of comparison of tableLocation / tableVersion
137-
*/
138-
protected String stripPathScheme(String path) {
139-
return path.startsWith("file:") ? path.split("file:")[1] : path;
140-
}
141134
}

0 commit comments

Comments
 (0)