StreamsofStreams
Streams, like most other abstractions, can be nested. This means that you can have a Spout[Spout[T]]
like you can have a List[List[T]]
. swave offers a number of transformations that either

create a streamofstreams from in incoming stream of “regular” elements (which we call injecting)

or flatten a streamofstreams back to an outgoing stream of “regular” elements (which we call flattening)
While the shape of these streamofstreams transformations is the same as for Simple Transformations the internal and external complexity is significantly higher. This is because the statespace of the statemachines implementing the transformation logic increases significantly with the number of open streams that a stage has to concurrently deal with. We therefore categorize streamofstreams transformations as a separate group.
Injecting Transformations
swave currently defines these injecting transformations:
Flattening Transformations
swave currently defines these flattening transformations:
Example
As an example of a streamofstreams application let’s look at a (slightly simplified) implementation of the takeEveryNth transformation, which we call takeEvery
here in order to avoid name clashes:
import scala.concurrent.Future
import swave.core._
implicit val env = StreamEnv()
// simple extension for `Spout[T]`, could also be a value class
implicit class RichSpout[T](underlying: Spout[T]) {
def takeEvery(n: Long): Spout[T] =
underlying // Spout[T]
.injectSequential() // Spout[Spout[T]]
.map(_.drop(n1).take(1)) // Spout[Spout[T]]
.flattenConcat() // Spout[T]
}
val result: Future[List[Int]] =
Spout.ints(from = 1)
.takeEvery(10)
.take(5)
.drainToList(limit = 100)
result.value.get.get shouldEqual Seq(10, 20, 30, 40, 50)
This is a typical application of the injectSequential transformation, which creates a sub stream, pushes as many elements into it as the sub stream accepts, then opens the next sub stream and so on. Every sub stream accepts n
elements, of which only the last one is produced. When all sub streams are concatenated we get exactly the kind of “take every nth element” effect that we intended.
Relationship to FanIns and FanOuts
The injecting and flattening transformations have a very close relationship to fanouts and fanins, respectively.
Check out the the next chapter for more details.