Streams-of-Streams

Streams, like most other abstractions, can be nested. This means that you can have a Spout[Spout[T]] like you can have a List[List[T]]. swave offers a number of transformations that either

  1. create a stream-of-streams from in incoming stream of “regular” elements (which we call injecting)

    An Injecting Transformation

  2. or flatten a stream-of-streams back to an outgoing stream of “regular” elements (which we call flattening)

    A Flattening Transformation

While the shape of these stream-of-streams transformations is the same as for Simple Transformations the internal and external complexity is significantly higher. This is because the state-space of the state-machines implementing the transformation logic increases significantly with the number of open streams that a stage has to concurrently deal with. We therefore categorize stream-of-streams transformations as a separate group.

Injecting Transformations

swave currently defines these injecting transformations:

Flattening Transformations

swave currently defines these flattening transformations:

Example

As an example of a stream-of-streams application let’s look at a (slightly simplified) implementation of the takeEveryNth transformation, which we call takeEvery here in order to avoid name clashes:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

// simple extension for `Spout[T]`, could also be a value class
implicit class RichSpout[T](underlying: Spout[T]) {
  def takeEvery(n: Long): Spout[T] =
    underlying                  // Spout[T]
      .injectSequential()       // Spout[Spout[T]]
      .map(_.drop(n-1).take(1)) // Spout[Spout[T]]
      .flattenConcat()          // Spout[T]
}

val result: Future[List[Int]] =
  Spout.ints(from = 1)
    .takeEvery(10)
    .take(5)
    .drainToList(limit = 100)

result.value.get.get shouldEqual Seq(10, 20, 30, 40, 50)

This is a typical application of the injectSequential transformation, which creates a sub stream, pushes as many elements into it as the sub stream accepts, then opens the next sub stream and so on. Every sub stream accepts n elements, of which only the last one is produced. When all sub streams are concatenated we get exactly the kind of “take every n-th element” effect that we intended.

Relationship to Fan-Ins and Fan-Outs

The injecting and flattening transformations have a very close relationship to fan-outs and fan-ins, respectively.
Check out the the next chapter for more details.