Fan-Ins are stream graph components with several input ports and a single output port.
swave’s streaming DSL allows you to define fan-ins in a flexible yet concise way.
import scala.concurrent.Future import swave.core._ implicit val env = StreamEnv() def foo = Spout(1, 2, 3) def bar = Spout(10, 20, 30) def baz = Spout(100, 200, 300) val result: Future[List[Int]] = foo .attach(bar) .attach(baz) //.attach(...) .fanInConcat() .drainToList(limit = 10) result.value.get.get shouldEqual Seq(1, 2, 3, 10, 20, 30, 100, 200, 300)
When you call
attach(...) on a Spout you get back a DSL type which represents several open stream ports, not just a single one as in the case of a simple Spout. You can add more spouts to the mix by simply calling
.attach(...) again, as often as you like.
Once you have assembled all the spouts for the fan-in in this way simply call one of the defined
fanIn... variants to “close” the fan-in with the respective logic. Currently these fan-in variants are available:
Additionally these fan-in shortcut transformations are defined:
For asymmetric fan-ins, like fanInConcat or fanInToProduct, the order of the inputs is important. To give you more flexibility for assembling fan-in inputs in the desired way swave also defines
attachLeft(...), in addition to
attach(...). As you can probably guess
attachLeft adds a new open spout to the left of the list of open spouts.
Here is an example:
import scala.concurrent.Future import swave.core._ implicit val env = StreamEnv() def reds = Spout.ints(from = 0, step = 10) def greens = Spout.ints(from = 100, step = 20) def blues = Spout.ints(from = 200, step = 30) val result: Future[List[(Int, Int, Int)]] = greens // open = greens .attach(blues) // open = greens :: blues .attachLeft(reds) // open = reds :: greens :: blues .fanInToTuple // Spout[(Int, Int, Int)] .take(3) .drainToList(limit = 10) result.value.get.get shouldEqual List( (0, 100, 200), (10, 120, 230), (20, 140, 260) )
import scala.concurrent.Future import swave.core._ implicit val env = StreamEnv() case class Person(id: Long, name: String, age: Int) def ids = Spout.longs(1L) def names = Spout("Alice", "Bob", "Charlie", "David") def ages = Spout(27, 21, 48, 36) val result: Future[List[Person]] = ids .attach(names) .attach(ages) .fanInToProduct[Person] .drainToList(limit = 10) result.value.get.get shouldEqual List( Person(1L, "Alice", 27), Person(2L, "Bob", 21), Person(3L, "Charlie", 48), Person(4L, "David", 36) )
One thing you can also see in this example is that swave attempts to reduce all boilerplate to the absolute minimum. Here the creation of case class instances from sub-streams for each member is implicitly taken care of.
swave builds on shapeless to make this kind of type-logic possible.