Skip to content
Merged
Show file tree
Hide file tree
Changes from 139 commits
Commits
Show all changes
154 commits
Select commit Hold shift + click to select a range
cb6f22f
Reduce remote storage by checking local retention
jiafu1115 Nov 18, 2025
4f96275
correct the configure name to keep same style with existed configures…
jiafu1115 Nov 19, 2025
c410ae3
fix the unit test for null point protect
jiafu1115 Nov 19, 2025
b765f33
Merge branch 'apache:trunk' into storage
jiafu1115 Jan 5, 2026
c4a77b9
correct the desc according to kip's review
jiafu1115 Jan 7, 2026
1b0705e
Clarify remote log latest enable documentation
jiafu1115 Jan 7, 2026
2cb87c7
change the naming according to KIP review
jiafu1115 Jan 29, 2026
aefdac1
add broker level configure according to KIP review
jiafu1115 Jan 29, 2026
3c5131b
fix unit test
jiafu1115 Jan 29, 2026
44d74ac
improve the code
jiafu1115 Jan 29, 2026
791881e
remove duplicated code for check
jiafu1115 Jan 30, 2026
31ecadb
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
f2b2464
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 Feb 5, 2026
5b059e1
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
4c108d3
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
05392d5
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
60060e2
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
997ecbd
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
3398d49
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
2ae53dd
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
7ef0a9e
KAFKA-19893: refactor the name and fix unit test
jiafu1115 Feb 6, 2026
b87f0af
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
f54c033
KAFKA-19893: add protected for the configures
jiafu1115 Feb 6, 2026
ef1ee28
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
1658a70
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
d99d7de
fix compile issue
jiafu1115 Feb 6, 2026
a5b6733
simple the code
jiafu1115 Feb 6, 2026
8b35773
Revert "simple the code"
jiafu1115 Feb 6, 2026
9ea9ac2
Don't change other codes
jiafu1115 Feb 7, 2026
7eefc4c
Keep the same style
jiafu1115 Feb 7, 2026
f18b95a
Merge branch 'apache:trunk' into storage
jiafu1115 Feb 7, 2026
f0adf67
code refactor
jiafu1115 Feb 7, 2026
3d14856
Refactor remote copy lag validation methods
jiafu1115 Feb 7, 2026
b6c53b6
define the constants value
jiafu1115 Feb 8, 2026
e19efef
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 8, 2026
dea59a3
refactor
jiafu1115 Feb 8, 2026
b2d2079
Code format
jiafu1115 Feb 8, 2026
cf80f85
correct the total size calculate
jiafu1115 Feb 8, 2026
a747a1a
refactor
jiafu1115 Feb 8, 2026
a392516
Merge branch 'apache:trunk' into storage
jiafu1115 Feb 10, 2026
71cdaf9
Clarify segment upload eligibility in documentation
jiafu1115 Feb 10, 2026
b5ad798
Update documentation for REMOTE_COPY_LAG_MS_CONFIG
jiafu1115 Feb 10, 2026
eaa2e40
Update documentation for remote copy lag configurations
jiafu1115 Feb 10, 2026
4ffe31f
Improve the log
jiafu1115 Feb 13, 2026
7ee6b12
Update retention checks to allow zero values
jiafu1115 Feb 13, 2026
b3b1d13
Fix comparison for remote copy lag checks
jiafu1115 Feb 13, 2026
a6bc552
Fix conditions for copy lag checks in RemoteLogManager
jiafu1115 Feb 13, 2026
1761b0a
refactor document
jiafu1115 Feb 13, 2026
31478c5
refactor document
jiafu1115 Feb 13, 2026
d493ba0
refactor document
jiafu1115 Feb 13, 2026
f994e15
correct the document
jiafu1115 Feb 13, 2026
21200a0
correct the document
jiafu1115 Feb 13, 2026
7aa8e44
correct the document again
jiafu1115 Feb 13, 2026
32ca138
Fix documentation for remote copy lag configs
jiafu1115 Feb 13, 2026
ee9e6ca
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 13, 2026
95039d4
Fix typos in TopicConfig documentation
jiafu1115 Feb 13, 2026
fc71a83
Fix documentation for remote log copy lag properties
jiafu1115 Feb 13, 2026
017deec
improve the document and implement
jiafu1115 Feb 13, 2026
5b223b1
Update LogConfig.java
jiafu1115 Feb 13, 2026
f458edd
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 13, 2026
2087e06
remove useless null protected judgement
jiafu1115 Feb 13, 2026
e550341
Merge branch 'trunk' into storage
jiafu1115 Feb 23, 2026
7574235
correct the judgement for upload
jiafu1115 Feb 23, 2026
f2ffe86
correct the format
jiafu1115 Feb 23, 2026
95d096b
correct the format
jiafu1115 Feb 23, 2026
b5aa4a5
correct the format
jiafu1115 Feb 23, 2026
0c4c7c3
code refactor
jiafu1115 Feb 23, 2026
f9afae2
code refactor
jiafu1115 Feb 23, 2026
1c251f0
correct the code
jiafu1115 Feb 23, 2026
7148ac5
improve the document
jiafu1115 Feb 23, 2026
47a14ab
Merge branch 'apache:trunk' into storage
jiafu1115 Mar 25, 2026
c89f5ed
Merge branch 'apache:trunk' into storage
jiafu1115 Mar 28, 2026
cf52590
refactor the code using new definition for delay configure
jiafu1115 Mar 31, 2026
7bbed2e
Correct multiple log-related imports to RemoteLogManager
jiafu1115 Mar 31, 2026
7a4ef68
Update copy lag condition checks in RemoteLogManager
jiafu1115 Mar 31, 2026
fa34452
Refactor copy lag checks in RemoteLogManager
jiafu1115 Mar 31, 2026
8f2c427
Fix the check style issue
jiafu1115 Mar 31, 2026
ab81e29
correct the configure item's default value for delay upload
jiafu1115 Apr 1, 2026
2b5fede
refactor the method position
jiafu1115 Apr 1, 2026
4050a5b
Refine documentation for retention configuration options
jiafu1115 Apr 1, 2026
a1adb06
Update RemoteLogManagerConfig.java
jiafu1115 Apr 1, 2026
f2a1949
add more validation
jiafu1115 Apr 1, 2026
4dfa05b
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 Apr 1, 2026
4dbd1de
Fix return value in copy lag condition
jiafu1115 Apr 1, 2026
5573462
Fix documentation typos in RemoteLogManagerConfig
jiafu1115 Apr 1, 2026
a5dbf91
Fix documentation for remote copy lag configuration
jiafu1115 Apr 1, 2026
3ac973d
Fix documentation for LOG_REMOTE_COPY_LAG_MS_PROP
jiafu1115 Apr 1, 2026
075ea73
add more description
jiafu1115 Apr 1, 2026
6aa8c91
improve the documents
jiafu1115 Apr 1, 2026
7fe96a4
fix code issue which cause unit test failed
jiafu1115 Apr 2, 2026
571280a
Merge branch 'apache:trunk' into storage
jiafu1115 Apr 2, 2026
bfc7530
remove example line
jiafu1115 Apr 2, 2026
a06de2b
add more debug log
jiafu1115 Apr 3, 2026
9115afd
refactor the basic log
jiafu1115 Apr 3, 2026
a23d2ea
Refactor copy lag checks in RemoteLogManager
jiafu1115 Apr 3, 2026
3d966a1
Add log configure unit test
jiafu1115 Apr 3, 2026
fba4cea
Add log configure unit test
jiafu1115 Apr 3, 2026
0a7038b
Add log configure unit test
jiafu1115 Apr 3, 2026
f377844
Fix one bug and add more tests
jiafu1115 Apr 3, 2026
02819b2
Using real constants instead of magic number
jiafu1115 Apr 4, 2026
3d7c5c9
Add more unit tests
jiafu1115 Apr 4, 2026
79d9587
improve the unit test
jiafu1115 Apr 4, 2026
d85551e
Fix typos in TopicConfig documentation
jiafu1115 May 5, 2026
bc5b395
Fix typos in RemoteLogManagerConfig documentation
jiafu1115 May 5, 2026
eec8220
Merge branch 'trunk' into storage
jiafu1115 May 5, 2026
1122220
judge logger.isDebugEnabled() before print debug
jiafu1115 May 5, 2026
2fa65e7
change debug to trace
jiafu1115 May 5, 2026
e1a9c99
improve the code for config according to code review
jiafu1115 May 5, 2026
6d481c5
add more validation configure
jiafu1115 May 7, 2026
b302dac
add document for Constraint
jiafu1115 May 7, 2026
c32d37b
Merge branch 'apache:trunk' into storage
jiafu1115 May 7, 2026
31c686a
update document
jiafu1115 May 7, 2026
8036c81
remove the logic for -1 and -2
jiafu1115 May 7, 2026
abd0563
change to 0
jiafu1115 May 7, 2026
33944d4
remove useless test
jiafu1115 May 7, 2026
ec8ff4f
correct the unit test
jiafu1115 May 7, 2026
06a93df
correct the unit test
jiafu1115 May 7, 2026
a20e4d2
correct the unit test
jiafu1115 May 7, 2026
3cd8b26
correct the unit test
jiafu1115 May 7, 2026
8a0a031
correct the implement
jiafu1115 May 7, 2026
f26f152
remove the logic for -1 and -2
jiafu1115 May 7, 2026
5a68341
Merge remote-tracking branch 'origin/storageV2' into storageV2
jiafu1115 May 7, 2026
3cb22a1
Merge pull request #3 from jiafu1115/storageV2
jiafu1115 May 7, 2026
d8b0c67
Fix condition order for copy lag checks
jiafu1115 May 7, 2026
ed9fc99
fix compile issue
jiafu1115 May 7, 2026
078db33
Fix the unit test
jiafu1115 May 7, 2026
2fa0c23
Add more test
jiafu1115 May 7, 2026
1cf1c3b
add more document for the code
jiafu1115 May 7, 2026
9b5e899
improve the document
jiafu1115 May 7, 2026
f25f002
improve the document
jiafu1115 May 7, 2026
6b626f6
improve the unit test
jiafu1115 May 7, 2026
88dbcc7
rollback part of the code of -1
jiafu1115 May 8, 2026
cf7e873
rollback part of the code of -1
May 8, 2026
d494963
improve the code according the code review
jiafu1115 May 17, 2026
4311f2c
Merge branch 'apache:trunk' into storage
jiafu1115 May 17, 2026
1855a55
refactor the code
jiafu1115 May 17, 2026
eef02ef
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 May 17, 2026
41a47dd
refactor the code
jiafu1115 May 17, 2026
69c9c9b
remove the useless unit test
jiafu1115 May 18, 2026
5880721
refactor the unit test: split to dedicated test class
jiafu1115 May 18, 2026
8f23a7c
remove the unrelated code change.
jiafu1115 May 18, 2026
6d218ff
simpler the code
May 18, 2026
d478387
refactor the code
jiafu1115 May 18, 2026
134337d
add one unit test for both lag and retention is -1
jiafu1115 May 18, 2026
499cb59
reduce the code complex
jiafu1115 May 18, 2026
cbf725c
continue to reduce the code complex
jiafu1115 May 18, 2026
40d91a7
continue to reduce the code complex
jiafu1115 May 18, 2026
20ca059
continue to reduce the code complex
jiafu1115 May 18, 2026
8f4cdd7
correct the unit test
jiafu1115 May 19, 2026
3f0764f
remove the document for -1
jiafu1115 May 19, 2026
a8ad430
simple the code according to the code review suggestion
jiafu1115 May 21, 2026
367390a
handle the special case for segment age.
jiafu1115 May 21, 2026
dbaa4d6
add more unit test
jiafu1115 May 21, 2026
9cfbc5e
Merge branch 'apache:trunk' into storage
jiafu1115 May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ public class TopicConfig {
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";

public static final String REMOTE_COPY_LAG_MS_CONFIG = "remote.copy.lag.ms";
public static final String REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
"The value should not exceed the real local retention ms except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention ms as maximum delay. " +
"If the real local retention ms is configured as infinite, -1 is treated as no delay check on this time configure. " +
"For how the real local retention time is computed, see <code>local.retention.ms</code>.";

public static final String REMOTE_COPY_LAG_BYTES_CONFIG = "remote.copy.lag.bytes";
public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
"The value should not exceed the real local retention bytes except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention bytes as maximum delay. " +
"If the real local retention bytes is configured as infinite, -1 is treated as no delay check on this size configure. " +
"For how the real local retention size is computed, see <code>local.retention.bytes</code>.";

public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,28 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE
}
}

def validateLogRemoteCopyLagMs(): Unit = {
val logRetentionMs: Long = newConfig.logRetentionTimeMillis
val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs
val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)")
}
}

def validateLogRemoteCopyLagBytes(): Unit = {
val logRetentionBytes: Long = newConfig.logRetentionBytes
val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)")
}
}

def validateCordonedLogDirs(): Unit = {
val logDirs = newConfig.logDirs()
val cordonedLogDirs = newConfig.cordonedLogDirs()
Expand All @@ -592,6 +614,8 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE

validateLogLocalRetentionMs()
validateLogLocalRetentionBytes()
validateLogRemoteCopyLagMs()
validateLogRemoteCopyLagBytes()
validateCordonedLogDirs()
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long)
logProps
}
}
61 changes: 61 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class LogConfigTest {
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
case TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean")
case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op

Expand Down Expand Up @@ -258,6 +260,65 @@ class LogConfigTest {
doTestInvalidLocalLogRetentionProps(2000L, -1, 100, 1000L)
}

@Test
def testInvalidRemoteCopyLagMsWhenGreaterThanEffectiveLocalRetentionMs(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "1001")

val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG))
}

@Test
def testInvalidRemoteCopyLagBytesWhenGreaterThanEffectiveLocalRetentionBytes(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "1001")

val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG))
}

@Test
def testValidRemoteCopyLagWhenBothLagChecksAreDisabled(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "0")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "0")

validateTopicLogConfig(props)
}

@Test
def testValidRemoteCopyLagMinusOneResolvesToLocalRetention(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "900")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "-1")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "2000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1800")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "-1")

validateTopicLogConfig(props)
}

private def validateTopicLogConfig(props: util.Map[String, String]): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
LogConfig.validate(util.Map.of, props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}

private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long,
localRetentionBytes: Int,
retentionBytes: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,55 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}

@Test
def testDynamicRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add one test for valid dynamic broker config change? Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, move the testDynamicRemoteCopyLagThrowsOnIncorrectConfig test to DynamicBrokerConfigTest.java instead of DynamicBrokerConfigTest.scala.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be taken up later, we already have one dynamic broker config change test in KafkaConfigTest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. thanks @kamalcph
Thank you very much for your patient and thorough review.

// remote copy lag ms cannot exceed effective local retention ms
verifyIncorrectRemoteCopyLagProps(
retentionMs = 1000L,
logLocalRetentionMs = -2L,
remoteCopyLagMs = 1001L,
retentionBytes = 1000L,
logLocalRetentionBytes = -2L,
remoteCopyLagBytes = 100L
)

// remote copy lag bytes cannot exceed effective local retention bytes
verifyIncorrectRemoteCopyLagProps(
retentionMs = 1000L,
logLocalRetentionMs = -2L,
remoteCopyLagMs = 100L,
retentionBytes = 1000L,
logLocalRetentionBytes = -2L,
remoteCopyLagBytes = 1001L
)

}

def verifyIncorrectRemoteCopyLagProps(retentionMs: Long,
logLocalRetentionMs: Long,
remoteCopyLagMs: Long,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename remoteCopyLagMs -> logRemoteCopyLagMs and remoteCopyLagBytes -> logRemoteCopyLagBytes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph thanks for review. #22349 created.

retentionBytes: Long,
logLocalRetentionBytes: Long,
remoteCopyLagBytes: Long): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString)
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
val config = KafkaConfig(props)
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[DirectoryEventHandler]))
config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)

val newProps = new Properties()
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs.toString)
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, remoteCopyLagMs.toString)
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes.toString)
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, remoteCopyLagBytes.toString)
// validate default config
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false))
// validate per broker config
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
}

@Test
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ class KafkaConfigTest {
case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")

/** New group coordinator configs */
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Expand Down Expand Up @@ -1200,6 +1202,10 @@ class KafkaConfigTest {
assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs)
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes)
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10017L, () => config.remoteLogManagerConfig.logRemoteCopyLagMs)
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10018L, () => config.remoteLogManagerConfig.logRemoteCopyLagBytes)
// not dynamically updatable
case QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG =>
// topic only config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public final class ServerTopicConfigSynonyms {
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG),
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetIndex;
Expand Down Expand Up @@ -916,6 +917,7 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti
* 1) Segment is not the active segment and
* 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only
* committed/acked messages
* 3) Segment has exceeded copy lag by time or size when configured (remote.copy.lag.ms, remote.copy.lag.bytes)
* @param log The log from which the segments are to be copied
* @param fromOffset The offset from which the segments are to be copied
* @param lastStableOffset The last stable offset of the log
Expand All @@ -925,10 +927,17 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, L
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
List<LogSegment> segments = log.logSegments(fromOffset, Long.MAX_VALUE);
if (!segments.isEmpty()) {
long currentTimeMs = time.milliseconds();
long totalLogSize = UnifiedLog.sizeInBytes(segments);
long cumulativeSize = 0;
for (int idx = 1; idx < segments.size(); idx++) {
LogSegment previousSeg = segments.get(idx - 1);
LogSegment currentSeg = segments.get(idx);
if (currentSeg.baseOffset() <= lastStableOffset) {
cumulativeSize += previousSeg.size();
if (delayCopy(log.config(), previousSeg, currentTimeMs, totalLogSize, cumulativeSize)) {
break;
}
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
}
}
Expand All @@ -937,6 +946,68 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, L
return candidateLogSegments;
}

private boolean delayCopy(LogConfig logConfig, LogSegment previousSeg, long currentTimeMs, long totalLogSize, long cumulativeSize) {
if (logConfig == null) {
Comment thread
kamalcph marked this conversation as resolved.
Outdated
return false;
}

long copyLagMs = logConfig.remoteCopyLagMs();
long copyLagBytes = logConfig.remoteCopyLagBytes();
if (logger.isTraceEnabled()) {
logger.trace("delayCopy check for segment {}: copyLagMs={}, copyLagBytes={}, currentTimeMs={}, totalLogSize={}, cumulativeSize={}, sizeLagBytes={}",
previousSeg, copyLagMs, copyLagBytes, currentTimeMs, totalLogSize, cumulativeSize, totalLogSize - cumulativeSize);
}

if (copyLagMs == 0 || copyLagBytes == 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log a warning or info message when a user configures one lag property but leaves the other at its default value of 0?

return false;
}

boolean needCheckCopyLagMs = copyLagMs > 0;
boolean needCheckCopyLagBytes = copyLagBytes > 0;

// When no lag delay is enabled, upload immediately.
if (!needCheckCopyLagMs && !needCheckCopyLagBytes) {
Comment thread
jiafu1115 marked this conversation as resolved.
Outdated
return false;
}

// When both lag delays are enabled, delay upload only if both delay checks decide to delay.
if (needCheckCopyLagMs && needCheckCopyLagBytes) {
return notExceededCopyLagTime(previousSeg, currentTimeMs, copyLagMs) && notExceededCopyLagSize(previousSeg, totalLogSize, cumulativeSize, copyLagBytes);
}

// If only one lag delay is enabled, use that check as the final result.
if (needCheckCopyLagMs) {
return notExceededCopyLagTime(previousSeg, currentTimeMs, copyLagMs);
}

return notExceededCopyLagSize(previousSeg, totalLogSize, cumulativeSize, copyLagBytes);
}

private boolean notExceededCopyLagTime(LogSegment segment, long currentTimeMs, long copyLagMs) {
try {
long segmentAgeMs = currentTimeMs - segment.largestTimestamp();
boolean exceeded = segmentAgeMs >= copyLagMs;
if (logger.isTraceEnabled()) {
logger.trace("{} eligible for upload by time? {} (segment age {} ms, copy lag {} ms)",
segment, exceeded, segmentAgeMs, copyLagMs);
}
return !exceeded;
} catch (IOException e) {
logger.warn("Failed to get largest timestamp for segment {}, take it as eligible for upload based on time", segment, e);
return false;
}
}

private boolean notExceededCopyLagSize(LogSegment segment, long totalLogSize, long cumulativeSize, long copyLagBytes) {
long sizeLagBytes = totalLogSize - cumulativeSize;
boolean exceeded = sizeLagBytes >= copyLagBytes;
if (logger.isTraceEnabled()) {
logger.trace("{} eligible for upload by size? {} (size lag {} bytes, copy lag {} bytes, totalLogSize={}, cumulativeSize={})",
segment, exceeded, sizeLagBytes, copyLagBytes, totalLogSize, cumulativeSize);
}
return !exceeded;
}

public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException, RetriableRemoteStorageException {
if (isCancelled())
return;
Expand Down
Loading
Loading