Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions actor-tests/src/test/scala/org/apache/pekko/actor/TimerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,44 @@ class TimersAndStashSpec extends PekkoSpec {
}
}

// Same scenario as ActorWithTimerAndStash but mixing in UnboundedStash, which is a sibling of
// Stash (both extend UnrestrictedStash), to cover the timer/stash interaction for it too (#3258).
class ActorWithTimerAndUnboundedStash(probe: ActorRef) extends Actor with Timers with UnboundedStash {
timers.startSingleTimer("key", "scheduled", 50.millis)
def receive: Receive = stashing
def notStashing: Receive = {
case msg => probe ! msg
}

def stashing: Receive = {
case StopStashing =>
context.become(notStashing)
unstashAll()
case "scheduled" =>
probe ! "saw-scheduled"
stash()
}
}

// Same scenario mixing in UnrestrictedStash directly (needs an explicitly configured
// deque-based mailbox, as it does not declare a RequiresMessageQueue) (#3258).
class ActorWithTimerAndUnrestrictedStash(probe: ActorRef) extends Actor with Timers with UnrestrictedStash {
timers.startSingleTimer("key", "scheduled", 50.millis)
def receive: Receive = stashing
def notStashing: Receive = {
case msg => probe ! msg
}

def stashing: Receive = {
case StopStashing =>
context.become(notStashing)
unstashAll()
case "scheduled" =>
probe ! "saw-scheduled"
stash()
}
}

"Timers combined with stashing" should {

"work" in {
Expand All @@ -350,6 +388,24 @@ class TimersAndStashSpec extends PekkoSpec {
actor ! StopStashing
probe.expectMsg("scheduled")
}

"work with UnboundedStash (#3258)" in {
val probe = TestProbe()
val actor = system.actorOf(Props(new ActorWithTimerAndUnboundedStash(probe.ref)))
probe.expectMsg("saw-scheduled")
actor ! StopStashing
probe.expectMsg("scheduled")
}

"work with UnrestrictedStash (#3258)" in {
val probe = TestProbe()
val actor = system.actorOf(
Props(new ActorWithTimerAndUnrestrictedStash(probe.ref))
.withMailbox("pekko.actor.mailbox.unbounded-deque-based"))
probe.expectMsg("saw-scheduled")
actor ! StopStashing
probe.expectMsg("scheduled")
}
}

}
8 changes: 6 additions & 2 deletions actor/src/main/scala/org/apache/pekko/actor/Timers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ trait Timers extends Actor {
case OptionVal.Some(m: AutoReceivedMessage) =>
context.asInstanceOf[ActorCell].autoReceiveMessage(Envelope(m, self, context.system))
case OptionVal.Some(m) =>
if (this.isInstanceOf[Stash]) {
// this is important for stash interaction, as stash will look directly at currentMessage #24557
if (this.isInstanceOf[StashSupport]) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// This is important for stash interaction, as stash reads the message directly from
// currentMessage (StashSupport) #24557. We match StashSupport rather than Stash so that
// actors mixing in UnboundedStash or UnrestrictedStash directly - which are siblings of
// Stash, not subtypes - also rewrite the unwrapped timer message; otherwise stash()
// would re-stash the TimerMsg wrapper and the message would be lost on unstash (#3258).
actorCell.currentMessage = actorCell.currentMessage.copy(message = m)
}
super.aroundReceive(receive, m)
Expand Down