Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@
package org.apache.beam.runners.dataflow.worker.streaming;

import com.google.auto.value.AutoValue;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;

/** {@link Work} instance and a processing function used to process the work. */
@AutoValue
public abstract class ExecutableWork implements Runnable {
public abstract class ExecutableWork {

public static ExecutableWork create(Work work, Consumer<Work> executeWorkFn) {
public static ExecutableWork create(Work work, Function<Work, WorkResult> executeWorkFn) {
return new AutoValue_ExecutableWork(work, executeWorkFn);
}

public abstract Work work();

public abstract Consumer<Work> executeWorkFn();
public abstract Function<Work, WorkResult> executeWorkFn();

@Override
public void run() {
executeWorkFn().accept(work());
public WorkResult run() {
try {
return executeWorkFn().apply(work());
} catch (Throwable t) {
throw ExceptionUtils.propagate(t);
}
}

public final WorkId id() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import com.google.auto.value.AutoValue;

/** The result of executing an {@link ExecutableWork}. */
@AutoValue
public abstract class WorkResult {
public static WorkResult create(int itemsProcessed, long bytesProcessed) {
return new AutoValue_WorkResult(itemsProcessed, bytesProcessed);
}

public abstract int itemsProcessed();

public abstract long bytesProcessed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.runners.dataflow.worker.streaming.WorkResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;

Expand All @@ -38,7 +40,7 @@ public class BoundedQueueExecutor {

// Used to guard elementsOutstanding and bytesOutstanding.
private final Monitor monitor;
private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<WorkResult> decrementQueue = new ConcurrentLinkedQueue<>();
private final Object decrementQueueDrainLock = new Object();
private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false);
private int elementsOutstanding = 0;
Expand Down Expand Up @@ -106,7 +108,7 @@ protected void afterExecute(Runnable r, Throwable t) {

// Before adding a Work to the queue, check that there are enough bytes of space or no other
// outstanding elements of work.
public void execute(Runnable work, long workBytes) {
public void execute(ExecutableWork work, long workBytes) {
monitor.enterWhenUninterruptibly(
new Guard(monitor) {
@Override
Expand All @@ -119,12 +121,17 @@ public boolean isSatisfied() {
executeMonitorHeld(work, workBytes);
}

// Forcibly add something to the queue, ignoring the length limit.
public void forceExecute(Runnable work, long workBytes) {
public void forceExecute(ExecutableWork work, long workBytes) {
monitor.enter();
executeMonitorHeld(work, workBytes);
}

/** Forcibly execute a Runnable callback with 0 bytes of size. */
public void forceExecute(Runnable work) {
monitor.enter();
executeMonitorHeld(work);
}

// Set the maximum/core pool size of the executor.
public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) {
// For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core
Expand Down Expand Up @@ -221,32 +228,54 @@ public String summaryHtml() {
}
}

private void executeMonitorHeld(Runnable work, long workBytes) {
private void executeMonitorHeld(ExecutableWork work, long workBytes) {
bytesOutstanding += workBytes;
++elementsOutstanding;
monitor.leave();

executor.execute(
() -> {
// Any execution exception thrown by work.run() propagates uncaught, triggering
// the default JVM UncaughtExceptionHandler which immediately crashes/terminates
// the JVM. Since the process exits immediately, reclaiming resource budgets in
// this JVM is unnecessary. Furthermore, since a failed execution does not return
// a WorkResult, we do not have a good/accurate fallback value to decrement.
WorkResult result = work.run();
decrementCounters(result);
});
Comment on lines +236 to +245
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The executeMonitorHeld method for ExecutableWork lacks error handling for task submission and execution failures, which can lead to resource leaks in elementsOutstanding and bytesOutstanding.

  1. If executor.execute() throws a RejectedExecutionException (e.g., during worker shutdown), the counters incremented just before will never be decremented, leading to a permanent capacity leak in the JVM.
  2. If work.run() throws an exception and the JVM does not immediately terminate (e.g., if the exception is caught by the thread pool's uncaught exception handler but the process continues), the counters will also leak.

This implementation should be made consistent with the Runnable version of executeMonitorHeld (lines 248-272), which correctly handles these scenarios. Even if a JVM crash is expected on work failure, providing a defensive decrement (using the initial workBytes as a fallback) is safer.

    try {
      executor.execute(
          () -> {
            WorkResult result = null;
            try {
              result = work.run();
            } finally {
              // Any execution exception thrown by work.run() propagates uncaught, triggering
              // the default JVM UncaughtExceptionHandler which immediately crashes/terminates
              // the JVM. Since the process exits immediately, reclaiming resource budgets in
              // this JVM is unnecessary. Furthermore, since a failed execution does not return
              // a WorkResult, we fallback to decrementing the initial budget to prevent
              // resource leaks in case the JVM does not crash.
              decrementCounters(result != null ? result : WorkResult.create(1, workBytes));
            }
          });
    } catch (Throwable e) {
      // If the execute() call threw an exception, decrement counters here.
      decrementCounters(WorkResult.create(1, workBytes));
      throw ExceptionUtils.propagate(e);
    }

}

private void executeMonitorHeld(Runnable work) {
++elementsOutstanding;
monitor.leave();

try {
executor.execute(
() -> {
try {
work.run();
} finally {
decrementCounters(workBytes);
// Commit finalizer callbacks catch and swallow all exceptions downstream
// to keep the worker alive (so the JVM does not crash). Therefore, to
// prevent elements outstanding capacity leaks under swallowed failures,
// we must guarantee decrementing element counts in the finally block.
decrementCounters(WorkResult.create(1, 0L));
}
});
} catch (RuntimeException e) {
// If the execute() call threw an exception, decrement counters here.
decrementCounters(workBytes);
throw e;
} catch (Throwable e) {
// Since finalizer rejections are caught and swallowed downstream, we must
// decrement elements outstanding immediately on task submission failure to
// prevent permanent capacity leaks in the running JVM.
decrementCounters(WorkResult.create(1, 0L));
throw ExceptionUtils.propagate(e);
}
}

private void decrementCounters(long workBytes) {
private void decrementCounters(WorkResult result) {
// All threads queue decrements and one thread grabs the monitor and updates
// counters. We do this to reduce contention on monitor which is locked by
// GetWork thread
decrementQueue.add(workBytes);
decrementQueue.add(result);
boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
if (submittedToExistingBatch) {
// There is already a thread about to drain the decrement queue
Expand All @@ -265,12 +294,12 @@ private void decrementCounters(long workBytes) {
long bytesToDecrement = 0;
int elementsToDecrement = 0;
while (true) {
Long pollResult = decrementQueue.poll();
WorkResult pollResult = decrementQueue.poll();
if (pollResult == null) {
break;
}
bytesToDecrement += pollResult;
++elementsToDecrement;
bytesToDecrement += pollResult.bytesProcessed();
elementsToDecrement += pollResult.itemsProcessed();
}
if (elementsToDecrement == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util;

import org.apache.beam.sdk.annotations.Internal;

/** Utility methods for simplifying work with exceptions and throwables. */
@Internal
public final class ExceptionUtils {

private ExceptionUtils() {}

/**
* Propagates {@code throwable} as-is if it is an instance of {@link RuntimeException} or {@link
* Error}, or else as a last resort wraps it in a {@code RuntimeException} and then propagates.
*/
public static RuntimeException propagate(Throwable throwable) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new RuntimeException(throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
}
for (Runnable callback : callbacksToExecute) {
try {
finalizationExecutor.forceExecute(callback, 0);
finalizationExecutor.forceExecute(callback);
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.WorkResult;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
Expand Down Expand Up @@ -233,15 +235,15 @@ public void queueAppliedFinalizeIds(ImmutableList<Long> appliedFinalizeIds) {
*
* @implNote This will block the calling thread during execution of user DoFns.
*/
private void processWork(
private WorkResult processWork(
ComputationState computationState,
Work work,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) {
work.recordGetWorkStreamLatencies(getWorkStreamLatencies);
processWork(computationState, work);
return processWork(computationState, work);
}

private void processWork(ComputationState computationState, Work work) {
private WorkResult processWork(ComputationState computationState, Work work) {
Windmill.WorkItem workItem = work.getWorkItem();
String computationId = computationState.getComputationId();
ByteString key = workItem.getKey();
Expand All @@ -258,7 +260,7 @@ private void processWork(ComputationState computationState, Work work) {
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
work.setState(Work.State.COMMIT_QUEUED);
work.queueCommit(outputBuilder.build(), computationState);
return;
return WorkResult.create(1, work.getSerializedWorkItemSize());
}

long processingStartTimeNanos = System.nanoTime();
Expand All @@ -284,6 +286,7 @@ private void processWork(ComputationState computationState, Work work) {
work.queueCommit(validatedCommitRequest, computationState);
recordProcessingStats(commitRequest, workItem, executeWorkResult);
LOG.debug("Processing done for work token: {}", workItem.getWorkToken());
return WorkResult.create(1, work.getSerializedWorkItemSize());
} catch (Throwable t) {
// OutOfMemoryError that are caught will be rethrown and trigger jvm termination.
try {
Expand All @@ -294,10 +297,14 @@ private void processWork(ComputationState computationState, Work work) {
invalidWork ->
computationState.completeWorkAndScheduleNextWorkForKey(
invalidWork.getShardedKey(), invalidWork.id()));
// Failure successfully processed/invalidated/rescheduled. Return failure WorkResult to
// release budget cleanly.
return WorkResult.create(1, work.getSerializedWorkItemSize());
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Throwable t2) {
throw new RuntimeException(t2);
LOG.warn("Failed to process work failure safely for work {}", work.id(), t2);
throw ExceptionUtils.propagate(t2);
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.WorkResult;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness;
Expand Down Expand Up @@ -373,7 +374,10 @@ private static ExecutableWork createMockWork(
computationId, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.class)),
false,
Instant::now),
processWorkFn);
work -> {
processWorkFn.accept(work);
return WorkResult.create(1, work.getSerializedWorkItemSize());
});
}

private byte[] intervalWindowBytes(IntervalWindow window) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static ExecutableWork createWork(Windmill.WorkItem workItem) {
createWorkProcessingContext(),
false,
Instant::now),
ignored -> {});
work -> WorkResult.create(1, work.getSerializedWorkItemSize()));
}

private static ExecutableWork expiredWork(Windmill.WorkItem workItem) {
Expand All @@ -85,7 +85,7 @@ private static ExecutableWork expiredWork(Windmill.WorkItem workItem) {
createWorkProcessingContext(),
false,
() -> Instant.EPOCH),
ignored -> {});
work -> WorkResult.create(1, work.getSerializedWorkItemSize()));
}

private static Work.ProcessingContext createWorkProcessingContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static ExecutableWork createWork(ShardedKey shardedKey, long workToken,
mock(HeartbeatSender.class)),
false,
Instant::now),
ignored -> {});
work -> WorkResult.create(1, work.getSerializedWorkItemSize()));
}

@Before
Expand Down
Loading
Loading