diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java index db279f066630..9f29a6496f77 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java @@ -18,24 +18,28 @@ package org.apache.beam.runners.dataflow.worker.streaming; import com.google.auto.value.AutoValue; -import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; /** {@link Work} instance and a processing function used to process the work. */ @AutoValue -public abstract class ExecutableWork implements Runnable { +public abstract class ExecutableWork { - public static ExecutableWork create(Work work, Consumer executeWorkFn) { + public static ExecutableWork create(Work work, Function executeWorkFn) { return new AutoValue_ExecutableWork(work, executeWorkFn); } public abstract Work work(); - public abstract Consumer executeWorkFn(); + public abstract Function executeWorkFn(); - @Override - public void run() { - executeWorkFn().accept(work()); + public WorkResult run() { + try { + return executeWorkFn().apply(work()); + } catch (Throwable t) { + throw ExceptionUtils.propagate(t); + } } public final WorkId id() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java new file mode 100644 index 000000000000..07db0e98aca4 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.auto.value.AutoValue; + +/** The result of executing an {@link ExecutableWork}. */ +@AutoValue +public abstract class WorkResult { + public static WorkResult create(int itemsProcessed, long bytesProcessed) { + return new AutoValue_WorkResult(itemsProcessed, bytesProcessed); + } + + public abstract int itemsProcessed(); + + public abstract long bytesProcessed(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 9079c3cc69b8..5a2a7584bc9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -38,7 +40,7 @@ public class BoundedQueueExecutor { // Used to guard elementsOutstanding and bytesOutstanding. private final Monitor monitor; - private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); private final Object decrementQueueDrainLock = new Object(); private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); private int elementsOutstanding = 0; @@ -106,7 +108,7 @@ protected void afterExecute(Runnable r, Throwable t) { // Before adding a Work to the queue, check that there are enough bytes of space or no other // outstanding elements of work. - public void execute(Runnable work, long workBytes) { + public void execute(ExecutableWork work, long workBytes) { monitor.enterWhenUninterruptibly( new Guard(monitor) { @Override @@ -119,12 +121,17 @@ public boolean isSatisfied() { executeMonitorHeld(work, workBytes); } - // Forcibly add something to the queue, ignoring the length limit. - public void forceExecute(Runnable work, long workBytes) { + public void forceExecute(ExecutableWork work, long workBytes) { monitor.enter(); executeMonitorHeld(work, workBytes); } + /** Forcibly execute a Runnable callback with 0 bytes of size. */ + public void forceExecute(Runnable work) { + monitor.enter(); + executeMonitorHeld(work); + } + // Set the maximum/core pool size of the executor. public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) { // For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core @@ -221,32 +228,54 @@ public String summaryHtml() { } } - private void executeMonitorHeld(Runnable work, long workBytes) { + private void executeMonitorHeld(ExecutableWork work, long workBytes) { bytesOutstanding += workBytes; ++elementsOutstanding; monitor.leave(); + executor.execute( + () -> { + // Any execution exception thrown by work.run() propagates uncaught, triggering + // the default JVM UncaughtExceptionHandler which immediately crashes/terminates + // the JVM. Since the process exits immediately, reclaiming resource budgets in + // this JVM is unnecessary. Furthermore, since a failed execution does not return + // a WorkResult, we do not have a good/accurate fallback value to decrement. + WorkResult result = work.run(); + decrementCounters(result); + }); + } + + private void executeMonitorHeld(Runnable work) { + ++elementsOutstanding; + monitor.leave(); + try { executor.execute( () -> { try { work.run(); } finally { - decrementCounters(workBytes); + // Commit finalizer callbacks catch and swallow all exceptions downstream + // to keep the worker alive (so the JVM does not crash). Therefore, to + // prevent elements outstanding capacity leaks under swallowed failures, + // we must guarantee decrementing element counts in the finally block. + decrementCounters(WorkResult.create(1, 0L)); } }); - } catch (RuntimeException e) { - // If the execute() call threw an exception, decrement counters here. - decrementCounters(workBytes); - throw e; + } catch (Throwable e) { + // Since finalizer rejections are caught and swallowed downstream, we must + // decrement elements outstanding immediately on task submission failure to + // prevent permanent capacity leaks in the running JVM. + decrementCounters(WorkResult.create(1, 0L)); + throw ExceptionUtils.propagate(e); } } - private void decrementCounters(long workBytes) { + private void decrementCounters(WorkResult result) { // All threads queue decrements and one thread grabs the monitor and updates // counters. We do this to reduce contention on monitor which is locked by // GetWork thread - decrementQueue.add(workBytes); + decrementQueue.add(result); boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); if (submittedToExistingBatch) { // There is already a thread about to drain the decrement queue @@ -265,12 +294,12 @@ private void decrementCounters(long workBytes) { long bytesToDecrement = 0; int elementsToDecrement = 0; while (true) { - Long pollResult = decrementQueue.poll(); + WorkResult pollResult = decrementQueue.poll(); if (pollResult == null) { break; } - bytesToDecrement += pollResult; - ++elementsToDecrement; + bytesToDecrement += pollResult.bytesProcessed(); + elementsToDecrement += pollResult.itemsProcessed(); } if (elementsToDecrement == 0) { return; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java new file mode 100644 index 000000000000..4bbdbb3216ca --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +import org.apache.beam.sdk.annotations.Internal; + +/** Utility methods for simplifying work with exceptions and throwables. */ +@Internal +public final class ExceptionUtils { + + private ExceptionUtils() {} + + /** + * Propagates {@code throwable} as-is if it is an instance of {@link RuntimeException} or {@link + * Error}, or else as a last resort wraps it in a {@code RuntimeException} and then propagates. + */ + public static RuntimeException propagate(Throwable throwable) { + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } else if (throwable instanceof Error) { + throw (Error) throwable; + } else { + throw new RuntimeException(throwable); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 5a66545ab335..22573bf1ced2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -156,7 +156,7 @@ public void finalizeCommits(Iterable finalizeIds) { } for (Runnable callback : callbacksToExecute) { try { - finalizationExecutor.forceExecute(callback, 0); + finalizationExecutor.forceExecute(callback); } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 1428037d9ca0..65a1325cd40e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -42,11 +42,13 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; @@ -233,15 +235,15 @@ public void queueAppliedFinalizeIds(ImmutableList appliedFinalizeIds) { * * @implNote This will block the calling thread during execution of user DoFns. */ - private void processWork( + private WorkResult processWork( ComputationState computationState, Work work, ImmutableList getWorkStreamLatencies) { work.recordGetWorkStreamLatencies(getWorkStreamLatencies); - processWork(computationState, work); + return processWork(computationState, work); } - private void processWork(ComputationState computationState, Work work) { + private WorkResult processWork(ComputationState computationState, Work work) { Windmill.WorkItem workItem = work.getWorkItem(); String computationId = computationState.getComputationId(); ByteString key = workItem.getKey(); @@ -258,7 +260,7 @@ private void processWork(ComputationState computationState, Work work) { outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true)); work.setState(Work.State.COMMIT_QUEUED); work.queueCommit(outputBuilder.build(), computationState); - return; + return WorkResult.create(1, work.getSerializedWorkItemSize()); } long processingStartTimeNanos = System.nanoTime(); @@ -284,6 +286,7 @@ private void processWork(ComputationState computationState, Work work) { work.queueCommit(validatedCommitRequest, computationState); recordProcessingStats(commitRequest, workItem, executeWorkResult); LOG.debug("Processing done for work token: {}", workItem.getWorkToken()); + return WorkResult.create(1, work.getSerializedWorkItemSize()); } catch (Throwable t) { // OutOfMemoryError that are caught will be rethrown and trigger jvm termination. try { @@ -294,10 +297,14 @@ private void processWork(ComputationState computationState, Work work) { invalidWork -> computationState.completeWorkAndScheduleNextWorkForKey( invalidWork.getShardedKey(), invalidWork.id())); + // Failure successfully processed/invalidated/rescheduled. Return failure WorkResult to + // release budget cleanly. + return WorkResult.create(1, work.getSerializedWorkItemSize()); } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t2) { - throw new RuntimeException(t2); + LOG.warn("Failed to process work failure safely for work {}", work.id(), t2); + throw ExceptionUtils.propagate(t2); } } finally { // Update total processing time counters. Updating in finally clause ensures that diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d8a1d1b90d47..593883343bd4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -106,6 +106,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness; @@ -373,7 +374,10 @@ private static ExecutableWork createMockWork( computationId, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.class)), false, Instant::now), - processWorkFn); + work -> { + processWorkFn.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } private byte[] intervalWindowBytes(IntervalWindow window) throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 865ae2612803..4942e9610fd6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -73,7 +73,7 @@ private static ExecutableWork createWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, Instant::now), - ignored -> {}); + work -> WorkResult.create(1, work.getSerializedWorkItemSize())); } private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { @@ -85,7 +85,7 @@ private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, () -> Instant.EPOCH), - ignored -> {}); + work -> WorkResult.create(1, work.getSerializedWorkItemSize())); } private static Work.ProcessingContext createWorkProcessingContext() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java index 1c8b8fca131d..d9ad1157ee4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java @@ -77,7 +77,7 @@ private static ExecutableWork createWork(ShardedKey shardedKey, long workToken, mock(HeartbeatSender.class)), false, Instant::now), - ignored -> {}); + work -> WorkResult.create(1, work.getSerializedWorkItemSize())); } @Before diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index d7ea039bb809..bdadcd2b9e8c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; @@ -85,18 +86,22 @@ private static ExecutableWork createWork(Consumer executeWorkFn) { mock(HeartbeatSender.class)), false, Instant::now), - executeWorkFn); + work -> { + executeWorkFn.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } - private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) { - return () -> { - start.countDown(); - try { - stop.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; + private ExecutableWork createSleepProcessWork(CountDownLatch start, CountDownLatch stop) { + return createWork( + ignored -> { + start.countDown(); + try { + stop.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } @Before @@ -123,9 +128,9 @@ public void testScheduleWorkWhenExceedMaximumPoolSize() throws Exception { CountDownLatch processStop2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch processStop3 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); - Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); + ExecutableWork m3 = createSleepProcessWork(processStart3, processStop3); executor.execute(m1, 1); processStart1.await(); @@ -152,8 +157,8 @@ public void testScheduleWorkWhenExceedMaximumBytesOutstanding() throws Exception CountDownLatch processStop1 = new CountDownLatch(1); CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStop2 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); executor.execute(m1, 10000000); processStart1.await(); @@ -187,9 +192,9 @@ public void testOverrideMaximumPoolSize() throws Exception { CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -225,9 +230,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception { CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -264,9 +269,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsIncrease CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -308,9 +313,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced( CountDownLatch processStop2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch processStop3 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); - Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); + ExecutableWork m3 = createSleepProcessWork(processStart3, processStop3); // Initial state. assertEquals(0, executor.activeCount()); @@ -351,6 +356,37 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced( executor.shutdown(); } + @Test + public void testRunnableExceptionPropagationDecrementsCounters() throws Exception { + CountDownLatch processStart = new CountDownLatch(1); + CountDownLatch processStop = new CountDownLatch(1); + + Runnable work = + () -> { + processStart.countDown(); + try { + processStop.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("Simulated finalizer processing exception"); + }; + + executor.forceExecute(work); + processStart.await(); + + assertEquals(1, executor.elementsOutstanding()); + + processStop.countDown(); + + // Wait until outstanding elements are released + while (executor.elementsOutstanding() != 0) { + Thread.sleep(10); + } + + assertEquals(0, executor.elementsOutstanding()); + } + @Test public void testRenderSummaryHtml() { String expectedSummaryHtml = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java index 68a11895fa12..291be6f0bf9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; @@ -98,7 +99,10 @@ private static ExecutableWork createWork(Supplier clock, Consumer mock(HeartbeatSender.class)), false, clock), - processWorkFn); + work -> { + processWorkFn.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } private static ExecutableWork createWork(Consumer processWorkFn) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java index 054db878c869..e9dc42de8aa6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; @@ -137,7 +138,10 @@ private ExecutableWork createOldWork( "computationId", new FakeGetDataClient(), ignored -> {}, heartbeatSender), false, ActiveWorkRefresherTest::aLongTimeAgo), - processWork); + work -> { + processWork.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } @Test