Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ abstract class LogicalPlan
def isStreaming: Boolean = _isStreaming
private[this] lazy val _isStreaming = children.exists(_.isStreaming)

/** Marks if a streaming node is a stateful operator. */
def isStateful: Boolean = false

/** Marks if a subplan contains a stateful operator. */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two suggestions for the Scaladoc:

  1. "Marks if" is awkward — these return a boolean rather than marking anything. "Whether …" or "Returns true if …" is more conventional. For containsStatefulOperator, please also say it includes this (the body reads isStateful || children.exists(...)).

  2. More substantively, please nail down what "stateful" means here. The new definition is the streaming-runtime view (any operator that becomes a StateStoreWriter at execution) and matches MicroBatchExecution.containsStatefulOperator exactly. It diverges from UnsupportedOperationChecker.isStatefulOperation on two operators: Deduplicate is stateful here regardless of whether keys carry an event-time column, and streaming GlobalLimit is included here but not there. Calling that out — and noting that isStatefulOperation is intentionally narrower (scoped to the chained-watermark correctness check) and isn't a drop-in replacement target — will keep future PRs from silently swapping callers and changing analyzer semantics. Worth naming which existing checks are intended replacement targets, too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, makes sense for the wording - I'll update.

For 2, I think it's mostly a bug if there are divergence. Deduplicate should be marked as stateful regardless of the event time column. Streaming GlobalLimit should be also marked as stateful, although it's almost a niche usage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK looks like it's not about this PR but about the way we check the stateful operator in UnsupportedOperationChecker. It's a bit nuanced and it's not the same with this, but good to unify it if unification doesn't hurt. Though it should be a follow-up rather than this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, e1e208e

As commented as above, 2 isn't addressed. FYI.

def containsStatefulOperator: Boolean = _containsStatefulOperator
private[this] lazy val _containsStatefulOperator =
isStateful || children.exists(_.containsStatefulOperator)

override def verboseStringWithSuffix(maxFields: Int): String = {
super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ case class Join(

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): Join = copy(left = newLeft, right = newRight)

override def isStateful: Boolean = left.isStreaming && right.isStreaming
}

/**
Expand Down Expand Up @@ -1243,6 +1245,8 @@ case class Aggregate(
override protected def withNewChildInternal(newChild: LogicalPlan): Aggregate =
copy(child = newChild)

override def isStateful: Boolean = child.isStreaming

// Whether this Aggregate operator is group only. For example: SELECT a, a FROM t GROUP BY a
private[sql] def groupOnly: Boolean = {
// aggregateExpressions can be empty through Dateset.agg,
Expand Down Expand Up @@ -1757,6 +1761,8 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN

override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimit =
copy(child = newChild)

override def isStateful: Boolean = child.isStreaming
}

/**
Expand Down Expand Up @@ -2002,6 +2008,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): Distinct =
copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This override is non-obvious at the Distinct layer — Distinct doesn't directly become a StateStoreWriter. The existing comment in UnsupportedOperationChecker.isStatefulOperation explains it: "Since the Distinct node will be replaced to Aggregate in the optimizer rule ReplaceDistinctWithAggregate, here we also need to check all Distinct node by assuming it as Aggregate." Worth preserving that rationale here, or at least a // see ReplaceDistinctWithAggregate one-liner.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - it's good to have a comment as it's not directly converted in physical planning but rather go through operator rewrite. Let's have a code comment to briefly explain it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, e1e208e

}

/**
Expand Down Expand Up @@ -2169,6 +2176,7 @@ case class Deduplicate(
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): Deduplicate =
copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
}

case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) extends UnaryNode {
Expand All @@ -2180,6 +2188,7 @@ case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan)
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): DeduplicateWithinWatermark =
copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ case class FlatMapGroupsWithState(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): FlatMapGroupsWithState =
copy(child = newLeft, initialState = newRight)
override def isStateful: Boolean = child.isStreaming
}

object TransformWithState {
Expand Down Expand Up @@ -655,6 +656,7 @@ case class TransformWithState(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithState =
copy(child = newLeft, initialState = newRight)
override def isStateful: Boolean = child.isStreaming
}

/** Factory for constructing new `FlatMapGroupsInR` nodes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ case class FlatMapGroupsInPandasWithState(

override protected def withNewChildInternal(
newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
}

/**
Expand Down Expand Up @@ -215,6 +216,7 @@ case class TransformWithStateInPySpark(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithStateInPySpark =
copy(child = newLeft, initialState = newRight)
override def isStateful: Boolean = child.isStreaming

def leftAttributes: Seq[Attribute] = {
assert(resolved, "This method is expected to be called after resolution.")
Expand Down