Monte Carlo Pi Example

This example is a slightly adapted version of the one that has been shown in a few talks on swave in the past.
It implements a simple approximation of Pi via the Monte Carlo method, whose basic idea can be illustrated like this:

Approximating Pi with the Monte Carlo method

If we’d shoot holes through this unit square at random points the share of the holes that are located in the grey area would asymptotically approach π/4 as we increase the number of shots.

There are many ways in which a simulation of this method can be implemented in code.
Here is a stream-based solution, which is intentionally somewhat convoluted to show off a few of swave’s features:

import swave.core.util.XorShiftRandom
import swave.core._

implicit val env = StreamEnv()

val random = XorShiftRandom()

Spout.continually(random.nextDouble())
  .grouped(2)
  .map { case x +: y +: Nil => Point(x, y) }
  .fanOutBroadcast()
    .sub.filter(_.isInner).map(_ => InnerSample).end
    .sub.filterNot(_.isInner).map(_ => OuterSample).end
  .fanInMerge()
  .scan(State(0, 0)) { _ withNextSample _ }
  .drop(1)
  .injectSequential()           // these three transformations together have
  .map(_ drop 999999 take 1)  // the same effect as `.takeEveryNth(1000000)`
  .flattenConcat()            // (in fact, this is how `takeEveryNth` is implemented)
  .map(state ⇒ f"After ${state.totalSamples}%,10d samples π is approximated as ${state.π}%.6f")
  .take(50)
  .foreach(println)

val time = System.currentTimeMillis() - env.startTime
println(f"Done. Total time: $time%,6d ms, Throughput: ${50000.0 / time}%.3fM samples/sec\n")

//////////////// MODEL ///////////////

case class Point(x: Double, y: Double) {
  def isInner: Boolean = x * x + y * y < 1.0
}

sealed trait Sample
case object InnerSample extends Sample
case object OuterSample extends Sample

case class State(totalSamples: Long, inCircle: Long) {
  def π: Double = (inCircle.toDouble / totalSamples) * 4.0
  def withNextSample(sample: Sample) =
    State(totalSamples + 1, if (sample == InnerSample) inCircle + 1 else inCircle)
}

When this code is run it generates 50 mio samples, at maximum speed, to approximate π using the Monte Carlo method. After each 1 mio samples one line of progress information is printed to console.

The streams setup contains elements from all five basic groups of stream transformations: simple transformations, fan-outs, fan-ins, injects and flattenings. The implemented stream graph can be visualized like this.

Monte Carlo Pi Stream Graph

One thing that’s interesting about this example is that everything is running synchronously (yet without any blocking) on the caller thread. There are no stages in the graph that require asynchronous dispatch and thus swave will run it entirely without moving execution to another thread, which, in simple and “small” cases like this one, is often the fastest way to run a stream.

Rate Detaching

To demonstrate another feature called “rate detaching” let’s transform the example slightly. So far we’ve printed a line of status information after every 1 mio samples. While this probably works ok on your machine it might end up generating too much output on really fast machines and too little if the machine is really slow.

We can fix this by letting the stream produce a status update once a second, i.e. independently of the number of generated samples:

import scala.util.{Failure, Success}
import scala.concurrent.duration._
import swave.core.util.XorShiftRandom
import swave.core._

implicit val env = StreamEnv()
import env.defaultDispatcher // for the future transformations below

val random = XorShiftRandom()

Spout.continually(random.nextDouble())
  .grouped(2)
  .map { case x +: y +: Nil => Point(x, y) }
  .fanOutBroadcast()
    .sub.filter(_.isInner).map(_ => InnerSample).end
    .sub.filterNot(_.isInner).map(_ => OuterSample).end
  .fanInMerge()
  .scan(State(0, 0)) { _ withNextSample _ }
  .conflateToLast
  .throttle(1, per = 1.second, burst = 0)
  .onElement(state =>
    println(f"After ${state.totalSamples}%,10d samples π is approximated as ${state.π}%.6f"))
  .take(20)
  .drainToLast()
  .onComplete {
    case Success(State(totalSamples, _)) =>
      val time = System.currentTimeMillis() - env.startTime
      val throughput = totalSamples / 1000.0 / time
      println(f"Done. Total time: $time%,6d ms, Throughput: $throughput%.3fM samples/sec\n")
      env.shutdown()

    case Failure(e) => println(e)
  }

println("Main thread exit.")

//////////////// MODEL ///////////////

case class Point(x: Double, y: Double) {
  def isInner: Boolean = x * x + y * y < 1.0
}

sealed trait Sample
case object InnerSample extends Sample
case object OuterSample extends Sample

case class State(totalSamples: Long, inCircle: Long) {
  def π: Double = (inCircle.toDouble / totalSamples) * 4.0
  def withNextSample(sample: Sample) =
    State(totalSamples + 1, if (sample == InnerSample) inCircle + 1 else inCircle)
}

Up to the scan stage everything has stayed as before. But rather than then simply filtering for every million-th element we use conflateToLast, which is one frequently used incarnation of the more general conflate. conflate, together with its sibling expand, are the two basic variants of transformations that perform “rate detaching”: They decouple their downstream’s request rate from their upstream’s production rate.

In our case we want the upstream to run at full speed, dropping all incoming elements that the downstream cannot digest. Whenever the downstream requests one element we want it to receive the most current element from upstream. This is exactly what conflateToLast does.

Apart from the difference in how status update lines are printed there is another important difference between this version and the one above: The stream now runs asynchronously off the caller thread. This is caused by the throttle transformation, which must react to asynchronous timer signals and thus cannot run purely on the caller thread.