Fibonacci Example

This example shows how you can use a stream graph to model recursive algorithms like the generation of the stream of all fibonacci numbers.

Here is a very simple way to create an infinite stream of all Fibonacci numbers that relies on an unfold Spout:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

// the "infinite" stream of all Fibonacci numbers
def fibonacciNumbers: Spout[Int] =
  Spout.unfold(0 -> 1) { case (a, b) =>
    Spout.Unfolding.Emit(elem = a, next = b -> (a + b))
  }

val result: Future[List[Int]] =
  fibonacciNumbers
    .take(8)
    .drainToList(limit = 100)

result.value.get.get shouldEqual List(0, 1, 1, 2, 3, 5, 8, 13)

Since the stream graph here does not include any stages that require asynchronous dispatch everything runs synchronously on the caller thread and the result future will be already completed when we get a hold of it.

While this is a concise and efficient implementation the recursion required for the stream generation is hereby provided by the “unfolding” feature. As such it is “built-in” and not that interesting from a “show-off” perspective.

A maybe more illustrative way to construct the same stream is the following one, which makes the recursion explicit in the stream graph structure:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

// the "infinite" stream of all Fibonacci numbers
def fibonacciNumbers: Spout[Int] = {
  val c = Coupling[Int]
  Spout(0, 1)
    .concat(c.out)
    .fanOutBroadcast(eagerCancel = true)
      .sub.buffer(2, Buffer.RequestStrategy.Always).sliding(2).map(_.sum).to(c.in)
      .subContinue
}

val result: Future[List[Int]] =
  fibonacciNumbers
    .take(8)
    .drainToList(limit = 100)

result.value.get.get shouldEqual List(0, 1, 1, 2, 3, 5, 8, 13)

The stream graph defined by this code can be visualized like this:

Fibonacci Example Stream Graph

High-Level Mechanics

From a high-level perspective this stream setup is built around a graph cycle that only contains two semantically significant logic stages:

  1. A sliding stage, which produces, for every incoming element, a Seq containing the last two elements.
  2. A map stage, which takes the Seq produced by the sliding stage, computes the sum of both its elements and pushes this sum into the loop as the next element.

This simple setup is the core of the stream graph and suffices for generating the “infinite” sequence of Fibonacci numbers. Everything else is required infrastructure for

  • seeding the loop with the two initial numbers (the Spout(0, 1))
  • closing the loop, which cannot be constructued by the fluent DSL alone (the Coupling)
  • creating sufficient demand in the loop to “kick” the process into the motion (the buffer stage, see section below)
  • syphoning off the generated values and stopping the loop when nobody is interested in its results any more (the fanOutBroadcast stage with the eagerCancel = true configuration)

Low-Level Mechanics

If you are interested, here’s how this setup works in detail, explained by tracing the initial phase of the stream execution step by step:

  1. When the stream is started three stages begin to actively send signals: the main Drain at the very end, the buffer stage in the first fan-out sub branch and the sliding stage right behind it. All 3 stages signal demand to the upstreams. The main drain signals demand for Int.MaxValue elements and the buffer and sliding stages each signal demand for 2 elements.

  2. The take stage receives the demand signal from its downstream and forwards it as Request(8) to its own upstream, which, via the subContinue, is the fanOutBroadcast stage.

  3. The fanOutBroadcast stage receives the demand signal for 2 elements (from the buffer stage) on its first sub stream and a demand of 8 on its second sub stream. It therefore signals a demand of 2 to its own upstream, the concat.

  4. The concat stage forwards this demand to its first upstream, the Spout(0, 1), which immediately delivers two elements and signals completion.

  5. The concat stage forwards the two received elements (0 and 1) to its downstream and registers the completion of its first upstream, which will cause all still unfulfilled as well as future demand to go to its second upstream (the out side of the Coupling).

  6. The fanOutBroadcast stage receives the two elements (0 and 1) and forwards them to both of its downstreams.

  7. The take stage forwards the elements to the main Drain and registers that it has seen the first two of the total 8 elements it is expecting.

  8. The buffer stage receives the two elements and immediately forwards them to the sliding stage, which has already demanded them at the very start of the stream. Then the buffer stage immediately re-requests two more elements from the fanOutBroadcast stage.

  9. The fanOutBroadcast stage now has unfulfilled demand of 2 on its first sub and 6 on its seconds. It therefore signals demand of 2 to the concat stage, which, via the Coupling and the map, arrives at the sliding stage.

  10. The sliding stage produces its first window, a Seq(0, 1) to the map, which in result produces a 1 element to the concat stage (again, via the Coupling). Then it signals demand for the next element to the buffer stage.

  11. The concat, which currently has unfulfilled demand of 2, receives the 1 element and produces it to the fanOutBroadcast, which pushes it to both of its sub streams, where it arrives at the main Drain as well as the buffer.

  12. As the buffer has already seen demand of 1 from its downstream it immediately forwards this element to the sliding stage, which produces the next window, a Seq(1, 1) to the map, which in result produces a 2 to the concat, which forwards it to the fanOutBroadcast, which produces it to both its downstreams.

At this point a continuous cycle of demand and element production has been established and will run as long as nothing stops the process. In this case the process will be stopped by the take stage. After it has received 8 elements it sends a Cancel signal to its upstream, the fanOutBroadcast. Since that is configured with eagerCancel = true it will not continue to run “on one leg” (i.e. with only on sub stream left), but rather forward the Cancel signal to its own upstream, which causes all stages in the cycle to be stopped by the propagating cancellation.

Learnings

When you play around with the code of this example you’ll notice that things won’t work when you remove the buffer stage, reduce its size or change it’s RequestStrategy. The reason is that a certain amount of demand is required in the cycle in order to kick things into motion and establish a perpetual flow of elements. When you follow the initial sequence of events closely, as outlined above, you’ll see why this is the case.

The learning here is that, while cycles are a perfectly valid and useful streaming pattern, they can require additional buffer stages to work correctly. Figuring out what exactly the minimum required buffer size is can be hard to figure out by simply looking at the stage setup. Starting out with a bigger buffer and reducing it until things break can be a good and pragmatic approach.

A second learning is that, if you have perpetual cycles, you need to think about a way to stop the stream if it’s not supposed to run forever. A fanOutBroadcast configured with eagerCancel = true, as shown in this example, is one possible way.