Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the race function #16

Merged
merged 3 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,33 @@ Trying to get the value from a canceled job will throw an `InterruptedException`

**You won't pay any additional cost for canceling a job**. The cancellation mechanism is based on the interruption of the virtual thread. No new structured scope is created for the cancellation mechanism.

## Racing Jobs

The library provides the `race` method to race two jobs. The `race` function returns the result of the first job that completes. The other job is canceled. The following code snippet shows how to use the `race` method:

```scala 3
val results = new ConcurrentLinkedQueue[String]()
val actual: Int | String = structured {
race[Int, String](
{
delay(1.second)
results.add("job1")
throw new RuntimeException("Error")
}, {
delay(500.millis)
results.add("job2")
"42"
}
)
}
actual should be("42")
results.toArray should contain theSameElementsInOrderAs List("job2")
```

If the first job completes with an exception, the `race` method waits for the second job to complete. and returns the result of the second job. If the second job completes with an exception, the `race` method throws the first exception it encountered.

Each job adhere to the rules of structured concurrency. The `race` function is optimized. Every raced block creates more than one virtual thread under the hood, which should not be a problem for the Loom runtime.

## Contributing

If you want to contribute to the project, please do it! Any help is welcome.
Expand Down
119 changes: 87 additions & 32 deletions core/src/main/scala/in/rcard/sus4s/sus4s.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package in.rcard.sus4s

import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure
import java.util.concurrent.StructuredTaskScope.{ShutdownOnFailure, ShutdownOnSuccess}
import java.util.concurrent.{CompletableFuture, StructuredTaskScope}
import scala.concurrent.ExecutionException
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -58,34 +58,34 @@ object sus4s {

/** Cancels the job and all its children jobs. Getting the value of a cancelled job throws an
* [[InterruptedException]]. Cancellation is an idempotent operation.
*
* <h2>Example</h2>
* {{{
* val expectedQueue = structured {
* val queue = new ConcurrentLinkedQueue[String]()
* val job1 = fork {
* val innerJob = fork {
* fork {
* Thread.sleep(3000)
* println("inner-inner-Job")
* queue.add("inner-inner-Job")
* }
* Thread.sleep(2000)
* println("innerJob")
* queue.add("innerJob")
* }
* Thread.sleep(1000)
* queue.add("job1")
* }
* val job = fork {
* Thread.sleep(500)
* job1.cancel()
* queue.add("job2")
* }
* queue
* }
* expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")
* }}}
*
* <h2>Example</h2>
* {{{
* val expectedQueue = structured {
* val queue = new ConcurrentLinkedQueue[String]()
* val job1 = fork {
* val innerJob = fork {
* fork {
* Thread.sleep(3000)
* println("inner-inner-Job")
* queue.add("inner-inner-Job")
* }
* Thread.sleep(2000)
* println("innerJob")
* queue.add("innerJob")
* }
* Thread.sleep(1000)
* queue.add("job1")
* }
* val job = fork {
* Thread.sleep(500)
* job1.cancel()
* queue.add("job2")
* }
* queue
* }
* expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")
* }}}
*/
def cancel(): Suspend ?=> Unit = {
// FIXME Refactor this code
Expand Down Expand Up @@ -199,8 +199,11 @@ object sus4s {
case Some(l) => Some(childThread :: l)
}
executingThread.complete(childThread)
try result.complete(block)
catch
try {
val resultValue = block
result.complete(resultValue)
resultValue
} catch
case _: InterruptedException =>
result.completeExceptionally(new InterruptedException("Job cancelled"))
case throwable: Throwable =>
Expand All @@ -209,7 +212,7 @@ object sus4s {
})
Job(result, executingThread)
}

/** Suspends the execution of the current thread for the given duration.
*
* @param duration
Expand All @@ -218,4 +221,56 @@ object sus4s {
def delay(duration: Duration): Suspend ?=> Unit = {
Thread.sleep(duration.toMillis)
}

/** Races two concurrent tasks and returns the result of the first one that completes. The other
* task is cancelled. If the first task throws an exception, it waits for the end of the second
* task. If both tasks throw an exception, the first one is rethrown.
*
* Each block follows the [[structured]] concurrency model. So, for each block, a new virtual
* thread is created more than the thread forking the block.
*
* <h2>Example</h2>
* {{{
* val results = new ConcurrentLinkedQueue[String]()
* val actual: Int | String = structured {
* race[Int, String](
* {
* delay(1.second)
* results.add("job1")
* throw new RuntimeException("Error")
* }, {
* delay(500.millis)
* results.add("job2")
* "42"
* }
* )
* }
* actual should be("42")
* results.toArray should contain theSameElementsInOrderAs List("job2")
* }}}
*
* @param firstBlock
* First block to race
* @param secondBlock
* Second block to race
* @tparam A
* Result type of the first block
* @tparam B
* Result type of the second block
* @return
* The result of the first block that completes
*/
def race[A, B](firstBlock: Suspend ?=> A, secondBlock: Suspend ?=> B): Suspend ?=> A | B = {
val loomScope = new ShutdownOnSuccess[A | B]()
given suspended: Suspend = SuspendScope(loomScope.asInstanceOf[StructuredTaskScope[Any]])
try {
loomScope.fork(() => { structured { firstBlock } })
loomScope.fork(() => { structured { secondBlock } })

loomScope.join()
loomScope.result(identity)
} finally {
loomScope.close()
}
}
}
136 changes: 136 additions & 0 deletions core/src/test/scala/CancelSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import in.rcard.sus4s.sus4s
import in.rcard.sus4s.sus4s.{delay, fork, structured}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.duration.*

class CancelSpec extends AnyFlatSpec with Matchers {
"cancellation" should "cancel at the first suspending point" in {
val expectedQueue = structured {
val queue = new ConcurrentLinkedQueue[String]()
val cancellable = fork {
delay(2.seconds)
queue.add("cancellable")
}
val job = fork {
delay(500.millis)
cancellable.cancel()
queue.add("job2")
}
queue
}
expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")
}

it should "not throw an exception if joined" in {

val expectedQueue = structured {
val queue = new ConcurrentLinkedQueue[String]()
val cancellable = fork {
delay(2.seconds)
queue.add("cancellable")
}
val job = fork {
delay(500.millis)
cancellable.cancel()
queue.add("job2")
}
cancellable.join()
queue
}
expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")
}

it should "not cancel parent job" in {

val expectedQueue = structured {
val queue = new ConcurrentLinkedQueue[String]()
val job1 = fork {
val innerCancellableJob = fork {
delay(2.seconds)
queue.add("cancellable")
}
delay(1.second)
innerCancellableJob.cancel()
queue.add("job1")
}
val job = fork {
delay(500.millis)
queue.add("job2")
}
queue
}
expectedQueue.toArray should contain theSameElementsInOrderAs List("job2", "job1")
}

it should "cancel children jobs" in {
val expectedQueue = structured {
val queue = new ConcurrentLinkedQueue[String]()
val job1 = fork {
val innerJob = fork {
fork {
delay(3.seconds)
println("inner-inner-Job")
queue.add("inner-inner-Job")
}

delay(2.seconds)
println("innerJob")
queue.add("innerJob")
}
delay(1.second)
queue.add("job1")
}
val job = fork {
delay(500.millis)
job1.cancel()
queue.add("job2")
}
queue
}
expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")
}

it should "not throw any exception when joining a cancelled job" in {
val expected = structured {
val cancellable = fork {
delay(2.seconds)
}
delay(500.millis)
cancellable.cancel()
cancellable.join()
42
}

expected shouldBe 42
}

it should "not throw any exception if a job is canceled twice" in {
val expected = structured {
val cancellable = fork {
delay(2.seconds)
}
delay(500.millis)
cancellable.cancel()
cancellable.cancel()
42
}

expected shouldBe 42
}

it should "throw an exception when asking for the value of a cancelled job" in {
assertThrows[InterruptedException] {
structured {
val cancellable = fork {
delay(2.seconds)
}
delay(500.millis)
cancellable.cancel()
cancellable.value
}
}
}
}
Loading