Skip to content

Commit

Permalink
feat: Add Pattern timeout support
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Feb 22, 2025
1 parent 22685b9 commit 66b74c8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

package org.apache.pekko.pattern

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeoutException }

import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -87,4 +86,64 @@ trait FutureTimeoutSupport {
}
p
}

/**
* Returns a [[scala.concurrent.Future]] that will be completed with a [[TimeoutException]]
* if the provided value is not completed within the specified duration.
*/
def timeout[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])(
implicit ec: ExecutionContext): Future[T] = {
val future =
try value
catch {
case NonFatal(t) => Future.failed(t)
}
future.value match {
case Some(_) => future
case None => // not completed yet
val p = Promise[T]()
val timeout = using.scheduleOnce(duration) {
p.tryFailure(new TimeoutException(s"Timeout of $duration expired"))
if (future.isInstanceOf[CompletableFuture[T]]) {
future.asInstanceOf[CompletableFuture[T]]
.toCompletableFuture
.cancel(true)
}
}
future.onComplete { result =>
timeout.cancel()
p.tryComplete(result)
}
p.future
}
}

/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[TimeoutException]]
* if the provided value is not completed within the specified duration.
*/
def timeoutCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: => CompletionStage[T])(
implicit ec: ExecutionContext): CompletionStage[T] = {
val stage: CompletionStage[T] =
try value
catch {
case NonFatal(t) => Futures.failedCompletionStage(t)
}
if (stage.toCompletableFuture.isDone) {
stage
} else {
val p = new CompletableFuture[T]
val timeout = using.scheduleOnce(duration) {
p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired"))
stage.toCompletableFuture.cancel(true)
}
stage.handle[Unit]((v: T, ex: Throwable) => {
timeout.cancel()
if (v != null) p.complete(v)
if (ex != null) p.completeExceptionally(ex)
})
p
}
}

}
11 changes: 11 additions & 0 deletions actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,17 @@ object Patterns {
value: Callable[CompletionStage[T]]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value.call())(context)

/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[java.util.concurrent.TimeoutException]]
* if the provided value is not completed within the specified duration.
*/
def timeout[T](
duration: java.time.Duration,
scheduler: Scheduler,
context: ExecutionContext,
value: Callable[CompletionStage[T]]): CompletionStage[T] =
timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)

/**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration.
Expand Down

0 comments on commit 66b74c8

Please sign in to comment.