Drain[-T, +R] carries two type parameters. The first specifies the type of stream elements that the drain is ready to receive and the second is the drain’s “result type”. A drain’s result is the vehicle for getting data out of a stream.
When a drain is used to close a stream graph, e.g. via the
to(...) method of a Spout, the result type is carried over to the StreamGraph where it defines the type of the
run() call, i.e. what you get back when the stream is started.
import scala.concurrent.Future import swave.core._ implicit val env = StreamEnv() // a drain, which produces the sum of all `Int` elements it receives def sumDrain: Drain[Int, Future[Int]] = Drain.fold(0)(_ + _) Spout(1 to 100) // Spout[Int] .to(sumDrain) // StreamGraph[Int] .run() // StreamRun[Future[Int] .result // Future[Int] .value // Option[Try[Int]] .get // Try[Int] .get shouldEqual 5050 // same but shorter Spout(1 to 100) .drainTo(sumDrain) // shortcut for `.to(sumDrain).run().result` .value.get.get shouldEqual 5050
One consequence of the fact that every stage must be able to run asynchronously, off the caller thread, is that the drain result (which you get by calling
streamGraph.run().result) cannot directly contain stream elements. As blocking is not an option the
run() call must return right away if the stream is running asynchronously, often before the first data elements have even begun their traversal of the stream graph.
In consequence most kinds of drains have a result type of
Future[T] rather than just
However, if you know that a stream runs synchronously (because it doesn’t contain asynchronous stages) the
Future instance you’ll get back from the
run() call will be already fulfilled. This is because, in those cases, the complete stream execution will happen “inside” the
run() call. Then, and only then, it’s fine to directly access the future’s value via
future.value.get directly after the