Sync vs. Async

As already pointed out in the Quick Start and Basics chapters swave supports running streams synchronously, on the caller thread, as well as asynchronously, off the caller thread, on one or more configurable thread-pools.

Synchronous Execution

Running a stream synchronously on the caller thread means that the streamGraph.run() call will not only start the stream execution but actually perform the stream execution, in its entirety. Since no inter-thread communication is required in these cases the overall runtime overhead introduced by the streaming infrastructure is quite small, which can cause simple streams to run much faster than in comparable asynchronous setups.

However, since everything runs only on one thread there is no way to make use of any potentially existing parallelism in the hardware. Also, the stream cannot contain any stages that need to react to signals from outside the stream (like timers, interrupts, callbacks, etc.). The respective reference documentation for transformations, Spouts and Drains will tell you whether the underlying stage is able to run synchronously or not.

If a stream runs synchronously and run().result returns a Future then the returned Future instance will be already completed when the run() call returns. It might be though that the Future is completed with an error (i.e. a Failure instance) rather than the expected result value, for example because some element of the stream graph threw an exception during execution. The run() call itself will never throw an exception that is triggered during the runtime of a stream. It will throw an exception though when the stream cannot be started because the stream definition is illegal (e.g. because there are still unconnected ports). If you need something is guaranteed to never throw use tryRun().

Here is an example of a map stage that throws an exception when processing one particular stream element:

import scala.concurrent.Future
import scala.util.Failure
import swave.core._

implicit val env = StreamEnv()

val result: Future[Seq[Int]] =
  Spout.ints(0)
    .map(i => 1000 / (i - 10))
    .take(50)
    .to(Drain.seq(limit = 50)).run().result // shortcut: .drainToVector(limit = 50)

result.value.get shouldBe a [Failure[_]]

the [ArithmeticException] thrownBy result.value.get.get should have message "/ by zero"

Note that “running synchronously” doesn’t mean that there is blocking involved! The limits on what can run synchronously are established precisely because no stage is allowed to ever block. Whenever a stage might have a reason to block, e.g. because it needs to wait for the completion of a Future, it cannot run synchronously and needs to be notified of the event it is interested in in an asynchronous fashion.

Asynchronous Execution

swave will run a stream synchronously whenever it can, which is only the case when all Spouts, Drains and transformations in the stream graph (including potentially existing nested graphs) support this mode of execution.

To better understand how swave will behave by default and how you can control asynchronous execution in a fine-grained fashion let’s look at a simple example:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

val result: Future[List[String]] =
  Spout.ints(0)
    .filter(_ % 5 == 0)
    .map(_.toString)
    .take(10)
    .drainToList(limit = 100) // shortcut for `to(...).run().result`

result.value.get.get shouldEqual (0 to 45 by 5).map(_.toString)

Here is the stream graph of this example, which also shows which stage will run on which thread / dispatcher:

Basic Example Stream Graph

As you can see everything will run on the caller thread, i.e. synchronously.

async

Suppose now that we’d like to run this stream graph as is but off the caller thread, e.g. because it takes some longer amount of time to finish and we’d like to use the caller thread for something else.

We can do so by simply adding an async transformation to the graph at an arbitrary position:

import scala.concurrent.Future
import swave.core.util._
import swave.core._

implicit val env = StreamEnv()

val result: Future[Seq[String]] =
  Spout.ints(0)
    .filter(_ % 5 == 0)
    .map(_.toString)
    .take(10)
    .async()
    .drainToList(limit = 100) // shortcut for `to(...).run().result`

result.await() shouldEqual (0 to 45 by 5).map(_.toString)

env.shutdown()

Now the drainToList(...) returns immediately, likely before the produced Future value has been fulfilled. We therefore have to explicitly await the Future value in order to get a hold of it.
(Note that await is blocking and therefore only allowed in exceptional circumstances, like this test here!)

The graph still looks the same but is now run on the default dispatcher. Since no asynchronous boundaries have been introduced it will still run as one single block:

Basic Async Stream Graph

The async transformation has no effect on the data or the demand travelling through it. All it does is forcing the stream graph it’s part of into running asynchronously. It has one optional parameter: the name of a configured dispatcher to be used. If not specified then the default dispatcher will be assigned if no other assignment has been made for the async region the drain is placed in (we’ll talk about async regions in a bit.)

Async Transformations

Another way to move the execution of a stream graph off the caller thread is the addition of an asynchronous transformation that cannot run synchronously. We could for example add an withCompletionTimeout transformation to the graph to make sure it will never run for longer than one second:

import scala.concurrent.duration._
import scala.concurrent.Future
import swave.core.util._
import swave.core._

implicit val env = StreamEnv()

val result: Future[List[String]] =
  Spout.ints(0)
    .filter(_ % 5 == 0)
    .map(_.toString)
    .take(10)
    .withCompletionTimeout(1.second) // cannot run synchronously
    .drainToList(limit = 100)

result.await() shouldEqual (0 to 45 by 5).map(_.toString)

env.shutdown()

Since withCompletionTimeout forces swave to run the graph on some dispatcher the stream graph of this example looks like this:

Stream Graph with Async Transformation

Because nothing prescribes the use of a specific dispatcher swave will assign the default dispatcher to all stages.
If we wanted to we could assign a custom dispatcher by adding a .async(dispatcherName) somewhere.

Explicit Async Boundaries

While simply moving stream execution away from the caller thread is nice it doesn’t really help with running things in parallel. In order to do that we need to introduce asynchronous boundaries, which can be done with asyncBoundary:

import scala.concurrent.Future
import swave.core.util._
import swave.core._

implicit val env = StreamEnv()

val run: StreamRun[Future[Seq[String]]] =
  Spout.ints(0)
    .filter(_ % 5 == 0)
    .asyncBoundary() // adds an explicit async boundary here
    .map(_.toString)
    .take(10)
    .to(Drain.seq(limit = 100)).run()

run.result.await() shouldEqual (0 to 45 by 5).map(_.toString)

env.shutdownOn(run.termination)

asyncBoundary splits the stream graph into two async regions, which are sub-graphs that run independently from each other on their own dispatchers:

Stream Graph with Async Boundary

If the default dispatcher is configured to use more than one thread (as it usually is) this stream setup can potentially keep two cores busy, because both async regions can run concurrently on separate threads.

By adding more async boundaries we could further increase the parallelization potential, i.e. possibly increase throughput, at the expense of higher latency because each async boundary introduces some overhead resulting from inter-thread communication. Our toy example here will definitely not benefit from async boundaries because the individual stages are too light-weight. Any benefit from parallelization will be completely outweighed by the async boundary’s overhead. In real-world applications however, where the stream stages do actual and potentially heavy work, the ability to quickly and easily add async boundaries at different points and evaluating their effect on application performance is an important tool.

Like async the asyncBoundary transformation takes as optional parameter the name of a dispatcher that is to be assigned to all stages in its upstream region. The dispatcher for its downstream region can be defined by a potentially existing other asyncBoundary further downstream or an .async(dispatcherId) marker.

Complex Example

Of course stream graphs aren’t always straight pipelines, so let’s look at a more complex example involving several drains and async boundaries as well as custom dispatcher assignments:

Complex Stream Graph with several Async Boundary

As you can see it’s possible that both sides of an async boundary belong to the same async region if there is another boundary-less connection.

Here is the code for this (admittedly quite contrived) example:

import com.typesafe.config.ConfigFactory
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._
import swave.core.util._
import swave.core._

val config = ConfigFactory.parseString {
  """swave.core.dispatcher.definition {
    |  foo.thread-pool.fixed-pool-size = 4
    |  bar.thread-pool.fixed-pool-size = 4
    |}""".stripMargin
}
implicit val env = StreamEnv(config = Some(config))

def upperChars(s: String): Spout[Char] =
  Spout(s.iterator).map(_.toUpper)

def drain(promise: Promise[String]): Drain[Char, Unit] =
  Drain.mkString(limit = 100)
    .captureResult(promise)
    .async("bar") // same as adding `.async` to all regions this drain is placed in

val result2 = Promise[String]()
val run: StreamRun[Future[String]] =
  upperChars("Hello")
    .asyncBoundary("foo")
    .fanOutBroadcast()
      .sub.drop(2).concat(upperChars("-Friend-").asyncBoundary()).end
      .sub.take(2).asyncBoundary().multiply(2).end
    .fanInConcat()
    .tee(Pipe[Char].asyncBoundary().deduplicate.to(drain(result2)))
    .map(_.toLower)
    .to(Drain.mkString(limit = 100))
    .run() // not using `.drainTo(...)` allows us access to the `StreamRun`

run.result.await(5.seconds) shouldEqual "llo-friend-hhee"
result2.future.await(5.seconds) shouldEqual "LO-FRIEND-HE"

// only shut down when all regions have properly terminated
env.shutdownOn(run.termination)

In stream graphs with more than one async region a graceful shutdown requires attaching to the run.termination (a Future[Unit]), which will only be completed when all parts of the stream graph have fully terminated.

Simply calling env.shutdown as soon as (some) drain result has become available might cause the StreamEnv to shut down its thread-pools before all regions have had the chance to properly run their cleanup code. This might cause the appearance of java.util.concurrent.RejectedExecutionException: Dispatcher '...' has already shut down errors in your logs.

Since this mechanism for shutting down gracefully is so common swave already provides the shutdownOn method on StreamEnv, which internally attaches the shutdown call to the given termination future.