Fahrenheit Example

This example is a direct translation of the one shown in the FS2 README.
It demonstrates a simple stream pipeline that makes use of the builtin File IO and Text Handling support to perform a “practical” task:

import java.io.File
import scala.util.{Failure, Success}
import scala.concurrent.Future
import swave.core.io.files._   // enables `Spout.fromFile`
import swave.compat.scodec._   // enables `ByteVector` support
import swave.core.text._       // enables text transformations
import swave.core._

implicit val env = StreamEnv()
import env.defaultDispatcher // for the future transformations below

def fahrenheitToCelsius(f: Double): Double =
  (f - 32.0) * (5.0/9.0)

def converter(fahrenheitReadingsInput: File,
              celciusReadingsOutput: File): RunnableStreamGraph[Future[Long]] =
  Spout.fromFile(fahrenheitReadingsInput)    // Spout[ByteVector]
    .utf8Decode                              // Spout[String]
    .lines                                   // Spout[String]
    .filterNot(_.trim.isEmpty)               // Spout[String]
    .filterNot(_ startsWith "//")            // Spout[String]
    .map(_.toDouble)                         // Spout[Double]
    .map(fahrenheitToCelsius)                // Spout[Double]
    .map("%.2f" format _)                    // Spout[String]
    .intersperse("\n")                       // Spout[String]
    .utf8Encode                              // Spout[ByteVector]
    .to(Drain.toFile(celciusReadingsOutput)) // StreamGraph[Future[Long]]
    .seal()                                  // RunnableStreamGraph[Future[Long]]

// when we are ready to roll, start the stream
val run: StreamRun[Future[Long]] =
  converter(inputFile, outputFile).run()

// since the stream runs asynchronously we can't directly access the result
run.result.onComplete {
  case Success(x) => println(s"OK, $x bytes written")
  case Failure(e) => println(s"Error: $e")
}

// shut down when everything has terminated
env.shutdownOn(run.termination)

This code constructs a StreamGraph, which reads bytes from an input file (in chunks of configurable size), UTF8-decodes them and re-chunks into lines. If then filters out the lines containing the actual data (i.e. skips blank lines and comment lines), parses the temperatur readings in degrees Fahrenheit, converts them to Celsius, re-inserts newlines, UTF-8 encodes the output, and writes the resulting bytes to the output file.

All of this happens in constant memory and, because Spout.fromFile and Drain.toFile come with implicit asynchronous boundaries, potentially parallelized across three cores. (The explanations in from the MD5 Example also apply here, with one more asynchronous boundary before the Drain.toFile.)

The input and output files will be always and automatically be properly closed, upon normal termination as well as in the case of errors.

Since the stream graph has three asynchronous regions it might be that parts of the graph are still running when the result future is completed. Therefore we should not shut down the StreamEnv immediately afterwards but hook into the run.termination Future to trigger a graceful shutdown.