diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/IteratorResultSet.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/IteratorResultSet.java index c92fd9a6e..4609839ad 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/IteratorResultSet.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/IteratorResultSet.java @@ -60,6 +60,21 @@ public IteratorResultSet(String query, this.useInternalLimit = !query.contains("LIMIT"); } + public IteratorResultSet(String query, + ConnectionManager connectionManager, + long limit, + long startOffset, + Function parsingFn, + Function predicateValidator + ) { + this.query = query; + this.limit = limit; + this.parsingFn = parsingFn; + this.connectionManager = connectionManager; + this.predicateValidator = predicateValidator; + this.useInternalLimit = !query.contains("LIMIT"); + } + @Override public boolean hasNext() { if(!records.isEmpty()) { @@ -91,9 +106,10 @@ public T next() { private void makeRequest() { long countRes = 0; + String nextQuery=""; try (Connection connection = connectionManager.getConnection()) { try(Statement statement = connection.createStatement()) { - String nextQuery = getQuery(); + nextQuery = getQuery(); if(logger.isDebugEnabled()) { logger.debug(nextQuery); } @@ -112,7 +128,7 @@ private void makeRequest() { ended = true; } } catch (SQLException e) { - throw new RuntimeException(e); + logger.error("Cannot Execute the request {}",query); } } } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderBaseFeatureStore.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderBaseFeatureStore.java index ee7e1c6b1..e50c06eca 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderBaseFeatureStore.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderBaseFeatureStore.java @@ -47,6 +47,14 @@ public String createTableQuery() { "data JSONB)"; } + public String createSpatialIndex() { + return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_features_geometry ON "+this.getStoreTableName()+" USING GIST ("+GEOMETRY+");"; + } + + public String createParentIdIndex() { + return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_features_parentId ON "+this.getStoreTableName()+" (parentId);"; + } + public String insertFeatureQuery() { return "INSERT INTO "+this.getStoreTableName()+" (parentId,"+GEOMETRY+", "+VALID_TIME+", data) VALUES (?,?,?,?)"; } @@ -123,6 +131,10 @@ public String createValidTimeIndexQuery() { return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_valid_time_0_idx ON "+this.getStoreTableName()+ " using GIST (validTime)"; } + public String createLowerValidTimeIndexQuery() { + return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_valid_time_lower_desc_idx ON "+this.getStoreTableName()+ " ((lower(validTime)) DESC, id DESC)"; + } + public String createIdIndexQuery() { return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_id_idx ON "+this.getStoreTableName()+" (id)"; } @@ -132,11 +144,7 @@ public String createTrigramExtensionQuery() { } public String createTrigramDescriptionFullTextIndexQuery() { - return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_desc_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->'properties'->>'description') gin_trgm_ops)"; - } - - public String createTrigramUidFullTextIndexQuery() { - return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->'properties'->>'uid') gin_trgm_ops)"; + return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_desc_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data::text) gin_trgm_ops)"; } public abstract String createSelectEntriesQuery(F filter, Set fields); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderDataStreamStore.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderDataStreamStore.java index a5b966b67..884032a91 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderDataStreamStore.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderDataStreamStore.java @@ -38,7 +38,7 @@ public String createTableQuery() { } public String createIndexQuery() { - return "CREATE INDEX IF NOT EXISTS " + this.getStoreTableName() + "_data_idx on " + this.getStoreTableName() + " USING GIN(data)"; + return "CREATE INDEX IF NOT EXISTS " + this.getStoreTableName() + "_data_idx on " + this.getStoreTableName() + " USING GIN(data jsonb_ops)"; } public String createUniqueIndexQuery() { @@ -46,20 +46,18 @@ public String createUniqueIndexQuery() { + " USING BTREE((data->'name'), (data->'system@id'), (data->'validTime'))"; } - public String createDateRangeIndexQuery() { - return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_idx ON " + this.getStoreTableName() + - " USING gist (int8range( " + - "(data->'validTime'->'begin')::bigint, " + - "(data->'validTime'->'end')::bigint" + - ") )"; + public String createImmutubleFunctionForValidTime() { + return "CREATE OR REPLACE FUNCTION parse_utc_timestamp(txt text) " + + "RETURNS timestamp " + + "IMMUTABLE " + + "LANGUAGE sql " + + "AS $$ " + + " SELECT (txt::timestamptz AT TIME ZONE 'UTC')::timestamp " + + "$$;"; } - - public String createValidTimeBeginIndexQuery() { - return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_begin_idx ON " + this.getStoreTableName() + " USING GIN((data->'validTime'->'begin'))"; - } - - public String createValidTimeEndIndexQuery() { - return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_end_idx ON " + this.getStoreTableName() + " USING GIN((data->'validTime'->'end'))"; + public String createValidTimeIndexQuery() { + return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_begin_idx ON " + this.getStoreTableName() +" "+ + "USING GIST (tsrange(parse_utc_timestamp(data->'validTime'->>'begin'),parse_utc_timestamp(data->'validTime'->>'end')))"; } public String createTrigramExtensionQuery() { diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderFeatureStore.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderFeatureStore.java index 2f857fb52..376d7ebe1 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderFeatureStore.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderFeatureStore.java @@ -43,6 +43,7 @@ public String createSelectEntriesQuery(FeatureFilter filter, Set dsIds, List foiIds) { @@ -128,16 +132,16 @@ public String getBinCountByPhenomenontime(long seconds, List dsIds, List .withFields(fields) .withObsFilter(filter) .withLimit(filter.getLimit()) + .withOffset(0) .build(); return selectEntriesObsQuery.toQuery(); } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderSystemDescStore.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderSystemDescStore.java index 9a36e2d05..705b192a6 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderSystemDescStore.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/QueryBuilderSystemDescStore.java @@ -56,19 +56,10 @@ public String selectLastVersionByUidQuery(String uid, String timestamp) { } @Override - public String createUidUniqueIndexQuery() { - return "CREATE UNIQUE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_idx ON "+this.getStoreTableName()+" " + - "((data->>'uniqueId'), "+VALID_TIME+")"; - } - - @Override - public String createTrigramDescriptionFullTextIndexQuery() { - return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_desc_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->>'description') gin_trgm_ops)"; - } - @Override - public String createTrigramUidFullTextIndexQuery() { - return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->>'uniqueId') gin_trgm_ops)"; - } + public String createUidUniqueIndexQuery() { + return "CREATE UNIQUE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_idx ON "+this.getStoreTableName()+" " + + "((data->>'uniqueId'), "+VALID_TIME+")"; + } public String addOrUpdateByIdQuery() { return this.insertFeatureByIdQuery()+" ON CONFLICT ((data->>'uniqueId'), "+VALID_TIME +") DO "+ diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/FilterQuery.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/FilterQuery.java index 72a91efa3..433170a81 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/FilterQuery.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/FilterQuery.java @@ -54,6 +54,10 @@ protected void addCondition(String condition) { filterQueryGenerator.addCondition(condition); } + protected void orCondition(String condition) { + filterQueryGenerator.orCondition(condition); + } + public void setCommandStreamTableName(String commandStreamTableName) { this.commandStreamTableName = commandStreamTableName; } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/DataStreamFilterQuery.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/DataStreamFilterQuery.java index 8d4185f4f..9d5855672 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/DataStreamFilterQuery.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/DataStreamFilterQuery.java @@ -14,6 +14,7 @@ package org.sensorhub.impl.datastore.postgis.builder.filter.datastream; +import org.sensorhub.api.common.BigId; import org.sensorhub.api.datastore.FullTextFilter; import org.sensorhub.api.datastore.TemporalFilter; import org.sensorhub.api.datastore.obs.DataStreamFilter; @@ -55,9 +56,18 @@ protected void handleId(long id) { protected void handleOutputNames(SortedSet names) { if (names != null && !names.isEmpty()) { - addCondition("("+tableName+".data->>'outputName') in (" + - names.stream().map(name -> "'" + name + "'").collect(Collectors.joining(",")) + - ")"); + StringBuilder query = new StringBuilder(); + boolean first = true; + String op = ""; + for(String name: names) { + if(first) { + first = false; + }else { + op = " OR "; + } + query.append(op).append(tableName).append(".data @> '{\"outputName\": \"").append(name).append("\"}'"); + } + addCondition(query.toString()); } } @@ -113,9 +123,13 @@ protected void handleSystemFilter(SystemFilter systemFilter) { if(uid.contains("*")) { operator = "ILIKE"; currentId = uid.replaceAll("\\*","%"); - } - sb.append("(").append(tableName).append(".data->'system@id'->>'uniqueID') "+operator+" '").append(currentId).append("'"); + // USE pgtrim index? + sb.append("(").append(tableName).append(".data->'system@id'->>'uniqueID') "+operator+" '").append(currentId).append("'"); + } else { + // USE GIN index + sb.append("(").append(tableName).append(".data @> '{\"system@id\": {\"uniqueID\": \"").append(currentId).append("\"").append("}}'").append(")"); + } if(++i < uniqueIds.size()) { sb.append(" OR "); } @@ -126,10 +140,18 @@ protected void handleSystemFilter(SystemFilter systemFilter) { // handle internal IDS if (systemFilter.getInternalIDs() != null && !systemFilter.getInternalIDs().isEmpty()) { - String sb = "(" + tableName + ".data->'system@id'->'internalID'->'id')::bigint in (" + - systemFilter.getInternalIDs().stream().map(bigId -> String.valueOf(bigId.getIdAsLong())).collect(Collectors.joining(",")) + - ")"; - addCondition(sb); + StringBuilder query = new StringBuilder(); + boolean first = true; + String op = ""; + for(BigId sysId: systemFilter.getInternalIDs()) { + if(first) { + first = false; + }else { + op = " OR "; + } + query.append(op).append(tableName).append(".data @> '{\"system@id\": {\"internalID\": {\"id\": ").append(sysId.getIdAsLong()).append("}}}'"); + } + addCondition(query.toString()); } } if (systemFilter.getParentFilter() != null || systemFilter.getProcedureFilter() != null @@ -150,7 +172,7 @@ protected void handleObsFilter(ObsFilter obsFilter) { protected void handleObservedPropertiesFilter(SortedSet properties) { if(properties != null) { StringBuilder sb = new StringBuilder(); - sb.append("jsonb_path_exists(").append(tableName).append(".data, '$.** ? ("); + sb.append(tableName).append(".data @? '$.** ? ("); boolean first=true; for(String property : properties) { if(!first) { @@ -160,7 +182,7 @@ protected void handleObservedPropertiesFilter(SortedSet properties) { sb.append("@ == \"").append(property).append("\""); first = false; } - sb.append(")')"); + sb.append(")'"); addCondition(sb.toString()); } } @@ -177,6 +199,5 @@ public static boolean hasOnlyInternalIds(DataStreamFilter dataStreamFilter) { dataStreamFilter.getOutputNames() == null && dataStreamFilter.getObservedProperties() == null && (dataStreamFilter.getInternalIDs() != null && !dataStreamFilter.getInternalIDs().isEmpty())); - } } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/RemoveDataStreamFilterQuery.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/RemoveDataStreamFilterQuery.java index 1958ddee7..44cd6b600 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/RemoveDataStreamFilterQuery.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/RemoveDataStreamFilterQuery.java @@ -34,11 +34,11 @@ protected void handleValidTimeFilter(TemporalFilter temporalFilter) { String min = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMin()); String max = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMax()); - String sb = "tsrange((" + + String sb = "tsrange(parse_utc_timestamp(" + tableName + - ".data->'validTime'->>'begin')::timestamp,(" + + ".data->'validTime'->>'begin'),parse_utc_timestamp(" + tableName + - ".data->'validTime'->>'end')::timestamp)" + + ".data->'validTime'->>'end'))" + " "+PostgisUtils.getOperator(temporalFilter)+" " + "'[" + min + "," + max + "]'::tsrange"; addCondition(sb); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/SelectDataStreamFilterQuery.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/SelectDataStreamFilterQuery.java index 0ca89566c..f404e58f6 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/SelectDataStreamFilterQuery.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/datastream/SelectDataStreamFilterQuery.java @@ -38,11 +38,11 @@ protected void handleValidTimeFilter(TemporalFilter temporalFilter) { String min = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMin()); String max = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMax()); - String sb = "tsrange((" + + String sb = "tsrange(parse_utc_timestamp(" + tableName + - ".data->'validTime'->>'begin')::timestamp,(" + + ".data->'validTime'->>'begin'),parse_utc_timestamp(" + tableName + - ".data->'validTime'->>'end')::timestamp)" + + ".data->'validTime'->>'end'))" + " "+PostgisUtils.getOperator(temporalFilter)+" " + "'[" + min + "," + max + "]'::tsrange"; addCondition(sb); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/feature/BaseFeatureFilterQuery.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/feature/BaseFeatureFilterQuery.java index a594c9ae8..72c210431 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/feature/BaseFeatureFilterQuery.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/feature/BaseFeatureFilterQuery.java @@ -28,6 +28,8 @@ import org.sensorhub.impl.datastore.postgis.utils.PostgisUtils; import org.vast.ogc.gml.IFeature; +import java.util.ArrayList; +import java.util.List; import java.util.SortedSet; import java.util.stream.Collectors; @@ -65,13 +67,15 @@ public void handleValidTimeFilter(TemporalFilter temporalFilter, String rangeOpS protected void handleFullTextFilter(FullTextFilter fullTextFilter) { if (fullTextFilter != null) { - // can use directly ~* for fast lookup - // https://www.postgresql.org/docs/current/pgtrgm.html if(fullTextFilter.getKeywords() != null) { - String sb = "(" + tableName + ".data->'properties'->>'description') ~* '(" + - fullTextFilter.getKeywords().stream().collect(Collectors.joining("|")) + - ")'"; - addCondition(sb); + List sqlKeywords = fullTextFilter.getKeywords() + .stream() + .map(k -> { + String start = (k.startsWith("*") ? "" : "%"); + String end = (k.endsWith("*") ? "" : "%");; + return tableName + ".data::text ILIKE '" + start + k.replaceAll("\\*", "%") + end + "'"; + }).toList(); + addCondition(" ( "+sqlKeywords.stream().collect(Collectors.joining(" OR "))+" ) "); } } } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/obs/SelectObsFilterQuery.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/obs/SelectObsFilterQuery.java index 94a28c7d8..831b237be 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/obs/SelectObsFilterQuery.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/filter/obs/SelectObsFilterQuery.java @@ -82,13 +82,15 @@ protected void handlePhenomenonTimeFilter(TemporalFilter temporalFilter) { filterQueryGenerator.addOrderBy(this.tableName + ".datastreamid"); filterQueryGenerator.addOrderBy(this.tableName + ".phenomenonTime DESC "); } else { + String min = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMin()); + String max = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMax()); String lowerBoundEquals = temporalFilter.getRange().lowerBoundType().equals(BoundType.OPEN) ? "":"="; String upperBoundEquals = temporalFilter.getRange().upperBoundType().equals(BoundType.OPEN) ? "":"="; addCondition( - this.tableName+".phenomenonTime >"+lowerBoundEquals+" '"+temporalFilter.getMin()+"'" + this.tableName+".phenomenonTime >"+lowerBoundEquals+" '"+min+"'" ); addCondition( - this.tableName+".phenomenonTime <"+upperBoundEquals+" '"+temporalFilter.getMax()+"'" + this.tableName+".phenomenonTime <"+upperBoundEquals+" '"+max+"'" ); } } @@ -101,18 +103,15 @@ protected void handleResultTimeFilter(TemporalFilter temporalFilter) { filterQueryGenerator.addOrderBy(this.tableName + ".datastreamid"); filterQueryGenerator.addOrderBy(this.tableName + ".phenomenonTime DESC "); } else { -// String min = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMin()); -// String max = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMax()); -// addCondition( -// "tsrange('"+min+"','"+max+"', '[]') @> "+this.tableName+".resultTime"); -// } + String min = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMin()); + String max = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMax()); String lowerBoundEquals = temporalFilter.getRange().lowerBoundType().equals(BoundType.OPEN) ? "":"="; String upperBoundEquals = temporalFilter.getRange().upperBoundType().equals(BoundType.OPEN) ? "":"="; addCondition( - this.tableName + ".resultTime >"+lowerBoundEquals+" '" + temporalFilter.getMin() + "'" + this.tableName + ".resultTime >"+lowerBoundEquals+" '" + min+ "'" ); addCondition( - this.tableName + ".resultTime <"+upperBoundEquals+" '" + temporalFilter.getMax() + "'" + this.tableName + ".resultTime <"+upperBoundEquals+" '" + max + "'" ); } } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/FilterQueryGenerator.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/FilterQueryGenerator.java index ea485abb1..325d62ea4 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/FilterQueryGenerator.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/FilterQueryGenerator.java @@ -7,6 +7,7 @@ public abstract class FilterQueryGenerator { protected List addConditions; protected List orConditions; protected long limit = -1; + protected long offset = -1; protected String tableName; protected List selectFields; @@ -19,6 +20,10 @@ public void setLimit(long limit){ this.limit = limit; } + public void setOffset(long offset){ + this.offset = offset; + } + protected void checkAddConditions() { if (this.addConditions == null) { this.addConditions = new ArrayList<>(); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/SelectFilterQueryGenerator.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/SelectFilterQueryGenerator.java index ba957cf30..71f8a9143 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/SelectFilterQueryGenerator.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/generator/SelectFilterQueryGenerator.java @@ -56,8 +56,11 @@ public String toQuery() { if (this.orConditions != null && !this.orConditions.isEmpty()) { if (addConditions == null || addConditions.isEmpty()) { sb.append(" WHERE "); + } else { + sb.append(" AND "); } - sb.append(this.orConditions.stream().collect(Collectors.joining(" OR "))); + + sb.append(" ( ").append(this.orConditions.stream().collect(Collectors.joining(" OR "))).append(" ) "); } if (this.groupBy != null && !this.groupBy.isEmpty()) { sb.append(" GROUP BY "); @@ -68,6 +71,9 @@ public String toQuery() { sb.append(" ORDER BY "); sb.append(this.orderBy.stream().collect(Collectors.joining(", "))); } +// if (this.offset >= 0) { +// sb.append(" OFFSET ").append(this.offset); +// } if (this.limit >= 0) { sb.append(" LIMIT ").append(this.limit); } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/query/SelectEntriesQuery.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/query/SelectEntriesQuery.java index ee2914689..4796ff8e3 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/query/SelectEntriesQuery.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/builder/query/SelectEntriesQuery.java @@ -32,6 +32,13 @@ public T withLimit(long limit) { return self(); } + public T withOffset(long offset) { + if(offset != 0) { + filterQueryGenerator.setOffset(offset); + } + return self(); + } + public SelectEntriesQuery build() { return new SelectEntriesQuery(this); } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/PostgisStore.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/PostgisStore.java index fa87cf75b..da3084970 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/PostgisStore.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/PostgisStore.java @@ -72,6 +72,8 @@ protected void init(String url, String dbName, String login, String password, St } private void initStore(String[] initScripts) { + logger.info("Executing init queries.."); + logger.error(String.join(";", initScripts)); try (Connection connection = this.connectionManager.getConnection()) { // if (!PostgisUtils.checkTable(connection, queryBuilder.getStoreTableName())) { // create table @@ -82,7 +84,6 @@ private void initStore(String[] initScripts) { } catch (SQLException e) { throw new RuntimeException(e); } - try { this.initIdProvider(); } catch (DataStoreException e) { diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStatusStore.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStatusStore.java index 7391e94e2..b01c0e81f 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStatusStore.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStatusStore.java @@ -83,6 +83,7 @@ public Stream> selectEntries(CommandStatusFilter fi queryStr, connectionManager, filter.getLimit(), + 0, (resultSet) -> resultSetToEntry(resultSet, fields), (entry) -> (filter.getValuePredicate() == null || filter.getValuePredicate().test(entry.getValue()))); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorResultSet, Spliterator.ORDERED), false); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStoreImpl.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStoreImpl.java index 3a8b7d9d2..b136aadc5 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStoreImpl.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/command/PostgisCommandStoreImpl.java @@ -87,6 +87,7 @@ public Stream> selectEntries(CommandFilter filter, Se queryStr, connectionManager, filter.getLimit(), + 0, (resultSet) -> resultSetToEntry(resultSet, fields), (entry) -> (filter.getValuePredicate() == null || filter.getValuePredicate().test(entry.getValue()))); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorResultSet, Spliterator.ORDERED), false); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/feature/PostgisBaseFeatureStoreImpl.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/feature/PostgisBaseFeatureStoreImpl.java index 1fc5d3cd2..9cae0bc3b 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/feature/PostgisBaseFeatureStoreImpl.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/feature/PostgisBaseFeatureStoreImpl.java @@ -80,12 +80,14 @@ protected PostgisBaseFeatureStoreImpl(String url, String dbName, String login, S super(idScope, dsIdProviderType, queryBuilder, useBatch); this.init(url, dbName, login, password, new String[]{ queryBuilder.createTableQuery(), - queryBuilder.createUidUniqueIndexQuery(), + queryBuilder.createUidUniqueIndexQuery(), // needed for INSERT ..or..UPDATE queryBuilder.createValidTimeIndexQuery(), + queryBuilder.createLowerValidTimeIndexQuery(), queryBuilder.createIdIndexQuery(), queryBuilder.createTrigramExtensionQuery(), queryBuilder.createTrigramDescriptionFullTextIndexQuery(), - queryBuilder.createTrigramUidFullTextIndexQuery() + queryBuilder.createSpatialIndex(), + queryBuilder.createParentIdIndex() } ); } @@ -205,7 +207,7 @@ private String fillAddOrUpdateStatement(FeatureKey featureKey, BigId parentID, V } public Stream> selectEntries(F filter, Set fields) { - String queryStr = queryBuilder.createSelectEntriesQuery(filter, fields); + String queryStr = queryBuilder.createSelectEntriesQuery(filter, fields); // if(logger.isDebugEnabled()) { // logger.debug(queryStr); // } @@ -214,6 +216,7 @@ public Stream> selectEntries(F filter, Set fields) { queryStr, connectionManager, STREAM_FETCH_SIZE, + 0, (resultSet) -> resultSetToEntry(resultSet, fields), (entry) -> (filter.getValuePredicate() == null || filter.getValuePredicate().test(entry.getValue()))); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorResultSet, Spliterator.ORDERED), false); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisBatchObsStoreImpl.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisBatchObsStoreImpl.java index b6c8d8329..d2db5ce56 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisBatchObsStoreImpl.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisBatchObsStoreImpl.java @@ -10,7 +10,7 @@ public class PostgisBatchObsStoreImpl extends PostgisObsStoreImpl { private static final Logger logger = LoggerFactory.getLogger(PostgisBatchObsStoreImpl.class); - public static final int BATCH_SIZE = 10000; + public static final int BATCH_SIZE = 1000; public PostgisBatchObsStoreImpl(String url, String dbName, String login, String password, int idScope, IdProviderType dsIdProviderType) { this(url,dbName,login,password,DEFAULT_TABLE_NAME,idScope,dsIdProviderType); diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisDataStreamStoreImpl.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisDataStreamStoreImpl.java index 76fbd7755..492530e97 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisDataStreamStoreImpl.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisDataStreamStoreImpl.java @@ -72,8 +72,8 @@ protected void init(PostgisObsStoreImpl obsStore, String url, String dbName, Str queryBuilder.createTableQuery(), queryBuilder.createIndexQuery(), queryBuilder.createUniqueIndexQuery(), - queryBuilder.createValidTimeBeginIndexQuery(), - queryBuilder.createValidTimeEndIndexQuery(), + queryBuilder.createImmutubleFunctionForValidTime(), + queryBuilder.createValidTimeIndexQuery(), queryBuilder.createTrigramExtensionQuery(), queryBuilder.createTrigramDescriptionFullTextIndexQuery() }); @@ -151,11 +151,11 @@ public void setValidTime(TimeExtent validTime) { @Override public Stream> selectEntries(DataStreamFilter filter, Set fields) { + long st = System.currentTimeMillis(); // build request String queryStr = queryBuilder.createSelectEntriesQuery(filter, fields); - + logger.info(queryStr); Map dataStreamMap = new HashMap<>(); - try (Connection connection = this.connectionManager.getConnection()) { try (Statement statement = connection.createStatement()) { try (ResultSet resultSet = statement.executeQuery(queryStr)) { @@ -172,26 +172,22 @@ public Stream> selectEntries(DataStreamFil } catch (SQLException e) { throw new RuntimeException(e); } - - // Fetch phenomenon and result time ranges so that we don't fetch them when accessing IDataStreamInfo List dsIds = new ArrayList<>(dataStreamMap.keySet()); - Map phenomenonTimeRanges = obsStore.getDataStreamPhenomenonTimeRanges(dsIds); - Map resultTimeRanges = obsStore.getDataStreamResultTimeRanges(dsIds); - List> results = new ArrayList<>(); for (Map.Entry entry : dataStreamMap.entrySet()) { - long id = entry.getKey(); + long dsId = entry.getKey(); + TimeExtent phenomenonTimeRange = obsStore.getDataStreamPhenomenonTimeRange(dsId); + TimeExtent resultTimeRange = obsStore.getDataStreamResultTimeRange(dsId); IDataStreamInfo dsInfo = entry.getValue(); - DataStreamInfoWithTimeRanges wrapper = new DataStreamInfoWithTimeRanges(id, dsInfo); - wrapper.setPhenomenonTimeRange(phenomenonTimeRanges.get(id)); - wrapper.setResultTimeRange(resultTimeRanges.get(id)); + DataStreamInfoWithTimeRanges wrapper = new DataStreamInfoWithTimeRanges(dsId, dsInfo); + wrapper.setPhenomenonTimeRange(phenomenonTimeRange); + wrapper.setResultTimeRange(resultTimeRange); - results.add(Map.entry(new DataStreamKey(obsStore.idScope, id), wrapper)); + results.add(Map.entry(new DataStreamKey(obsStore.idScope, dsId), wrapper)); } - - logger.debug("{}, {}",queryStr, results.size()); + logger.debug("{}, {} in {}ms ",queryStr, results.size(), (System.currentTimeMillis()-st)); return results.stream(); } diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisObsStoreImpl.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisObsStoreImpl.java index fd254e6c2..97e544bec 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisObsStoreImpl.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/store/obs/PostgisObsStoreImpl.java @@ -16,7 +16,6 @@ import com.google.common.collect.Range; import net.opengis.swe.v20.DataBlock; import org.apache.commons.text.StringSubstitutor; -import org.postgresql.util.PGobject; import org.sensorhub.api.common.BigId; import org.sensorhub.api.data.IDataStreamInfo; import org.sensorhub.api.data.IObsData; @@ -26,7 +25,6 @@ import org.sensorhub.api.datastore.system.ISystemDescStore; import org.sensorhub.impl.datastore.postgis.IdProviderType; import org.sensorhub.impl.datastore.postgis.builder.IteratorResultSet; -import org.sensorhub.impl.datastore.postgis.builder.ObsIteratorResultSet; import org.sensorhub.impl.datastore.postgis.builder.QueryBuilderObsStore; import org.sensorhub.impl.datastore.postgis.store.PostgisStore; import org.sensorhub.impl.datastore.postgis.utils.PostgisUtils; @@ -44,6 +42,7 @@ import java.time.temporal.ChronoField; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -77,6 +76,7 @@ public PostgisObsStoreImpl(String url, String dbName, String login, String passw // queryBuilder.createDataIndexQuery(), queryBuilder.createDataStreamIndexQuery(), queryBuilder.createPhenomenonTimeIndexQuery(), + queryBuilder.createPhenomenonTimeSimpleIndexQuery(), queryBuilder.createResultTimeIndexQuery(), queryBuilder.createFoiIndexQuery(), queryBuilder.createUniqueConstraint() @@ -108,17 +108,16 @@ public Stream> selectEntries(ObsFilter filter, Set> iteratorResultSet = - new ObsIteratorResultSet<>( + IteratorResultSet> iteratorResultSet = + new IteratorResultSet<>( queryStr, - queryBuilder.getStoreTableName(), connectionManager, - STREAM_FETCH_SIZE, filter.getLimit(), - (resultSet) -> resultSetToEntry(resultSet, fields), - (entry) -> (filter.getValuePredicate() == null || filter.getValuePredicate().test(entry.getValue())), - filter); + 0, + (ResultSet resultSet) -> resultSetToEntry(resultSet, fields), + (entry) -> (filter.getValuePredicate() == null || filter.getValuePredicate().test(entry.getValue()))); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorResultSet, Spliterator.ORDERED), false); + } private Entry resultSetToEntry(ResultSet resultSet, Set fields) { @@ -451,14 +450,19 @@ public Map getDataStreamPhenomenonTimeRanges(List dataSt if (dataStreamIds == null || dataStreamIds.isEmpty()) return result; - String sql = queryBuilder.getPhenomenonTimeRangeByDataStreamIdsQuery(); + String strIds = dataStreamIds.stream() + .map(id -> "'" + id + "'") + .collect(Collectors.joining(",")); + + String sql = queryBuilder.getPhenomenonTimeRangeByDataStreamIdsQuery(strIds); try (Connection conn = connectionManager.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { - Array sqlArray = conn.createArrayOf("bigint", dataStreamIds.toArray()); - ps.setArray(1, sqlArray); +// Array sqlArray = conn.createArrayOf("bigint", dataStreamIds.toArray()); +// ps.setArray(1, sqlArray); + System.out.println(ps.toString()); try (ResultSet rs = ps.executeQuery()) { while (rs.next()) { long id = rs.getLong("datastreamID"); @@ -518,13 +522,16 @@ public Map getDataStreamResultTimeRanges(List dataStream if (dataStreamIds == null || dataStreamIds.isEmpty()) return result; - String sql = queryBuilder.getResultTimeRangeByDataStreamIdsQuery(); + String strIds = dataStreamIds.stream() + .map(id -> "'" + id + "'") + .collect(Collectors.joining(",")); + String sql = queryBuilder.getResultTimeRangeByDataStreamIdsQuery(strIds); try (Connection conn = connectionManager.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { - Array sqlArray = conn.createArrayOf("bigint", dataStreamIds.toArray()); - ps.setArray(1, sqlArray); +// Array sqlArray = conn.createArrayOf("bigint", dataStreamIds.toArray()); +// ps.setArray(1, sqlArray); try (ResultSet rs = ps.executeQuery()) { while (rs.next()) { diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/utils/PostgisStreamer.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/utils/PostgisStreamer.java deleted file mode 100644 index 37971e1f2..000000000 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/utils/PostgisStreamer.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.sensorhub.impl.datastore.postgis.utils; - -import java.sql.*; -import java.util.function.Function; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; -import java.util.Spliterator; -import java.util.Spliterators; - -public class PostgisStreamer { - - /** - * Executes a SQL query on a given Statement and returns a Stream. - * - * @param statement An already created Statement (must belong to a Connection with autoCommit=false) - * @param query The full SQL query (SELECT ...) - * @param fetchSize The number of rows to prefetch on the server side - * @param mapper A function that converts each row of the ResultSet into a T - * @param The type of the returned objects - * @return Stream - */ - public static Stream streamFromStatement( - Statement statement, - String query, - int fetchSize, - Function mapper - ) { - try { - statement.setFetchSize(fetchSize); - ResultSet rs = statement.executeQuery(query); - - Stream stream = StreamSupport.stream( - Spliterators.spliteratorUnknownSize(new IteratorFromResultSet<>(rs, mapper), Spliterator.ORDERED), - false - ); - - // close the resultSet and the statement when the Stream closes - return stream.onClose(() -> { - try { - rs.close(); - statement.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - }); - } catch (SQLException e) { - throw new RuntimeException("Error executing query", e); - } - } - - private static class IteratorFromResultSet implements java.util.Iterator { - private final ResultSet rs; - private final Function mapper; - private boolean hasNextChecked = false; - private boolean hasNext = false; - - IteratorFromResultSet(ResultSet rs, Function mapper) { - this.rs = rs; - this.mapper = mapper; - } - - @Override - public boolean hasNext() { - if (hasNextChecked) return hasNext; - try { - hasNext = rs.next(); - hasNextChecked = true; - return hasNext; - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - @Override - public T next() { - if (!hasNextChecked && !hasNext()) throw new java.util.NoSuchElementException(); - hasNextChecked = false; - try { - return mapper.apply(rs); - } catch (Exception e) { - throw new RuntimeException("Error mapping row", e); - } - } - } -} - diff --git a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/utils/PostgisUtils.java b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/utils/PostgisUtils.java index 092203503..2c90fe27d 100644 --- a/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/utils/PostgisUtils.java +++ b/persistence/sensorhub-datastore-postgis/src/main/java/org/sensorhub/impl/datastore/postgis/utils/PostgisUtils.java @@ -218,7 +218,7 @@ public static String checkAndGetValidInstant(Instant instant) { } else if(instant.getEpochSecond() > PostgisUtils.MAX_INSTANT.getEpochSecond()) { return "infinity"; } else { - return instant.truncatedTo(ChronoUnit.SECONDS).toString(); + return instant.toString(); } }