Fan-Ins

Fan-Ins are stream graph components with several input ports and a single output port.

A Fan-In

swave’s streaming DSL allows you to define fan-ins in a flexible yet concise way.
For example:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

def foo = Spout(1, 2, 3)
def bar = Spout(10, 20, 30)
def baz = Spout(100, 200, 300)

val result: Future[List[Int]] =
  foo
    .attach(bar)
    .attach(baz)
  //.attach(...)
    .fanInConcat()
    .drainToList(limit = 10)

result.value.get.get shouldEqual Seq(1, 2, 3, 10, 20, 30, 100, 200, 300)

When you call attach(...) on a Spout you get back a DSL type which represents several open stream ports, not just a single one as in the case of a simple Spout. You can add more spouts to the mix by simply calling .attach(...) again, as often as you like.

Once you have assembled all the spouts for the fan-in in this way simply call one of the defined fanIn... variants to “close” the fan-in with the respective logic. Currently these fan-in variants are available:

Additionally these fan-in shortcut transformations are defined:

Symmetric vs. Asymmetric Fan-Ins

Some fan-in variants are “symmetric” in the sense that the order of the input streams doesn’t matter.
fanInMerge and fanInToSum are probably the most used variants in that category.

For asymmetric fan-ins, like fanInConcat or fanInToProduct, the order of the inputs is important. To give you more flexibility for assembling fan-in inputs in the desired way swave also defines attachLeft(...), in addition to attach(...). As you can probably guess attachLeft adds a new open spout to the left of the list of open spouts.

Here is an example:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

def reds = Spout.ints(from = 0, step = 10)
def greens = Spout.ints(from = 100, step = 20)
def blues = Spout.ints(from = 200, step = 30)

val result: Future[List[(Int, Int, Int)]] =
  greens              // open = greens
    .attach(blues)    // open = greens :: blues
    .attachLeft(reds) // open = reds :: greens :: blues
    .fanInToTuple // Spout[(Int, Int, Int)]
    .take(3)
    .drainToList(limit = 10)

result.value.get.get shouldEqual List(
  (0, 100, 200),
  (10, 120, 230),
  (20, 140, 260)
)

Homogeneous vs. Heterogeneous Fan-Ins

The examples above show “homogeneous” fan-ins, in which all inputs are of the same type. With variants like fanInConcat or fanInMerge this is the most common case.

However, fanInToTuple, fanInToProduct or fanInToSum are usually used on inputs of differing types, i.e. as “heterogeneous” fan-ins. Here is an example:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

case class Person(id: Long, name: String, age: Int)

def ids = Spout.longs(1L)
def names = Spout("Alice", "Bob", "Charlie", "David")
def ages = Spout(27, 21, 48, 36)

val result: Future[List[Person]] =
  ids
    .attach(names)
    .attach(ages)
    .fanInToProduct[Person]
    .drainToList(limit = 10)

result.value.get.get shouldEqual List(
  Person(1L, "Alice", 27),
  Person(2L, "Bob", 21),
  Person(3L, "Charlie", 48),
  Person(4L, "David", 36)
)

One thing you can also see in this example is that swave attempts to reduce all boilerplate to the absolute minimum. Here the creation of case class instances from sub-streams for each member is implicitly taken care of.
swave builds on shapeless to make this kind of type-logic possible.