Fan-Outs

Fan-Outs are stream graph components with one input port and several output ports.

Fan-Out

Currently these fan-out variants are available:

Additionally these fan-out shortcut transformations are defined:

Basic Example

swave’s streaming DSL allows you to define fan-outs in a flexible yet concise way.
Here is a basic example:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

val promise1 = Promise[Seq[String]]()
val drain1 =
  Drain.seq(limit = 10).captureResult(promise1)

val promise2 = Promise[Seq[Int]]()
val drain2 =
  Drain.seq(limit = 10).captureResult(promise2)

val result: Future[String] =
  Spout.ints(from = 0)
    .fanOutBroadcast(eagerCancel = true)
      .sub.filter(_ > 45).map(_.toString).to(drain1)
      .sub.map(_.toString).end
      .sub.slice(42, 7).to(drain2) // terminates the stream by cancelling after the 48th element
    .continue
    .drop(10)
    .drainToMkString(limit = 100)

result.value.get.get shouldEqual (10 to 48).mkString
promise1.future.value.get.get shouldEqual (46 to 48).map(_.toString)
promise2.future.value.get.get shouldEqual (42 to 48)

This example encodes this stream graph:

Fan-Out Example

This graph has three Drains, two producing a Seq to a Promise and one (the main one) producing a String.
As you can already (partially) see from this example a fan-out definition consists of

  1. a call to a specific fan-out variant, in this case fanOutBroadcast
  2. the addition of one or more fan-out branches via sub
  3. the definition of sub-branch transformations (as on any regular Spout) after the sub call
  4. closing the subs
  5. closing the fan-out

The last two points deserve some deeper explanation.

After you’ve finished the definition of a sub branch “pipeline” (following a sub call) there are two alternatives for what to do with this open sub branch. You can either drain it into a Drain with .to(drain) (as the first and last sub branches do in the example above) or you can leave it open by simply calling .end.

After you’ve finished defining all sub branches there are two ways to “close” the fan-out. If only a single sub is left open (with .end) you can use .continue to “pick-up” this open sub branch and simply continue appending more transformations to it. This sub then becomes the new “main line” of your stream graph.

If several subs are left open you cannot use .continue, because it wouldn’t be clear which one to pick up and what to do with the other open subs. In this case you can use one of the available fan-in variants to “join” the open subs and continue your stream graph definition in a fluent fashion. A common use case for this is a “diamond” graph.

Diamond Example

A common stream graph pattern is a “diamond” setup, where a fan-out first defines several sub branches, which each apply some specific transformations to “their” elements, before the ends of the sub branches are re-joined with some kind of fan-in logic. For example this graph:

Diamond Graph

can be encoded like this:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

val result: Future[String] =
  Spout(1, 2, 3)
    .fanOutBroadcast()
      .sub.map("A" + _).end
      .sub.map("B" + _).end
    .fanInToTuple // Spout[(String, String)]
    .drainToMkString(limit = 10, sep = ";")

result.value.get.get shouldEqual "(A1,B1);(A2,B2);(A3,B3)"

Adding one or even more sub branches that aren’t left open (i.e. drain into some Drain) wouldn’t affect the fan-in in any way. And if you add another fan-out sub that is left open (i.e. ends with .end) the fan-in would work just as well but produce a Tuple3 instead of a Tuple2.

Mixing Fan-Outs and Fan-Ins

Mixing fan-outs and fan-ins is possible even beyond what was shown in the examples above.
Take this code for example:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

val result: Future[List[(Double, String, Long, Int)]] =
  Spout(1, 2, 3)
    .fanOutBroadcast()
      .sub.map(_.toString).end
      .attach(Spout.longs(from = 10))
      .sub.to(Drain.ignore.dropResult) // just for fun
      .sub.end
      .attachLeft(Spout.doubles(from = 0, step = 0.5))
    .fanInToTuple // Spout[(Double, String, Long, Int)]
    .drainToList(limit = 10)

result.value.get.get shouldEqual Seq(
  (0.0, "1", 10, 1),
  (0.5, "2", 11, 2),
  (1.0, "3", 12, 3))

It encodes this graph:

Mixed Fan-Out/Fan-In Graph

This graph could be drawn in a simpler way but we show it like this in order to make it easier to correlate the DSL code with the visual representation.

As you can see from this example you can attach Spouts “from the outside” at any point in a fan-out definition, even on the left (with attachLeft), which allows for very flexible and yet concise definition of the large majority of common stream graphs. Some graphs, especially the ones containing cycles, cannot be constructed in a fully fluent fashion. But with only one small additional element, namely Couplings, even these graphs, and in fact all graphs, can be defined.

subContinue

In some of the “Show Off Examples” as well as in the swave sources you are going to come across a shortcut that is frequently used in fan-outs: subContinue.

subContinue is simply short-hand for .sub.end.continue, i.e. the addition of another fan-out sub branch that is immediately used to “close” the fan-out and continue the definition of the “main” stream pipeline.

As an example, this is a simple re-implementation of the tee transformation, which we call teee here in order to avoid a name clash with the existing tee:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

// simple extension for `Spout[T]`, could also be a value class
implicit class RichSpout[T](underlying: Spout[T]) {
  def teee(drain: Drain[T, Unit]): Spout[T] =
    underlying
      .fanOutBroadcast()
        .sub.to(drain)
        .subContinue // short for: .sub.end.continue
}

val promise = Promise[Seq[Int]]()

val result: Future[List[Int]] =
  Spout(1, 2, 3)
    .teee(Drain.seq(limit = 10).captureResult(promise))
    .drainToList(limit = 10)

result.value.get.get shouldEqual Seq(1, 2, 3)
promise.future.value.get.get shouldEqual Seq(1, 2, 3)

captureResult and dropResult

In the examples above you might have noticed the .captureResult and .dropResult calls, of which one is required for most types of Drains if they are to be used as the target of a fan-out sub branch.

The reason for this is that the DSL offers no way to access the result of a Drain when it is used within a fan-out sub branch. Therefore, in order to be usable in sub branch, a drain is required to not produce a result, which means that its result type must be Unit. Since most types of drains do produce results they must be explicitly transformed into Drains without result.

There are two options for this:

  1. capture the result in a Promise
  2. drop the result completely

Drains define transformation methods for both of these alternatives, which are named accordingly.