Drains

A Drain is a stream graph component with one input port and a no output port. As such it serves as a “point of exit” from a stream setup to other destinations (e.g. to memory, disk or the network).

A Drain

swave provides constructors for the most frequently used Drains on the Drain companion object. You can also look directly into the sources for the complete reference.

Drain Result

The type Drain[-T, +R] carries two type parameters. The first specifies the type of stream elements that the drain is ready to receive and the second is the drain’s “result type”. A drain’s result is the vehicle for getting data out of a stream.

When a drain is used to close a stream graph, e.g. via the to(...) method of a Spout, the result type is carried over to the StreamGraph where it defines the type of the run() call, i.e. what you get back when the stream is started.

For example:

import scala.concurrent.Future
import swave.core._

implicit val env = StreamEnv()

// a drain, which produces the sum of all `Int` elements it receives
def sumDrain: Drain[Int, Future[Int]] =
  Drain.fold(0)(_ + _)

Spout(1 to 100)  // Spout[Int]
  .to(sumDrain)  // StreamGraph[Int]
  .run()         // StreamRun[Future[Int]
  .result        // Future[Int]
  .value         // Option[Try[Int]]
  .get           // Try[Int]
  .get shouldEqual 5050

// same but shorter
Spout(1 to 100)
  .drainTo(sumDrain) // shortcut for `.to(sumDrain).run().result`
  .value.get.get shouldEqual 5050

Result Types are Async

One consequence of the fact that every stage must be able to run asynchronously, off the caller thread, is that the drain result (which you get by calling streamGraph.run().result) cannot directly contain stream elements. As blocking is not an option the run() call must return right away if the stream is running asynchronously, often before the first data elements have even begun their traversal of the stream graph.

In consequence most kinds of drains have a result type of Future[T] rather than just T.

However, if you know that a stream runs synchronously (because it doesn’t contain asynchronous stages) the Future instance you’ll get back from the run() call will be already fulfilled. This is because, in those cases, the complete stream execution will happen “inside” the run() call. Then, and only then, it’s fine to directly access the future’s value via future.value.get directly after the run() call.