Quick Start

Like most other Scala streaming toolkits swave fully separates stream construction and stream execution into two distinct phases. First, you define your stream setup using a concise and flexible DSL. During this phase no resources are being claimed (apart from the memory for the graph structures) and no data start to flow. Only when everything is fully assembled and you explicitly start the stream will the internal machinery actually spring to life.

But let’s start from the beginning…

Required Imports

After having added swave to your build (as described in the Setup chapter) you bring all the key identifiers into scope with this single import:

import swave.core._

In most cases this is all you need, but sometimes additional imports are required in order to enable certain support functionality (like the Domain Adapters).

Creating Spouts

With the main import in place we can create sources of stream elements, which are called Spouts in swave.
(“spout” is an English word for a tube or lip projecting from a container, through which liquid can be poured.)

Here is a simple one:

def tenInts: Spout[Int] = Spout(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

As you can probably guess a Spout[T] is parameterized with the type of the elements that the spout produces. swave predefines quite a few ways to create spouts from other types, e.g. from Iterable, Iterators, Option and Try, but also Future or Publisher. Check out the chapter on Spouts for more details.

Attaching Transformations

Once you have a Spout you can define transformations on it, e.g. like this:

def threeInts = tenInts
  .map(_ * 2)
  .filter(_ > 6)
  .take(3)

Most transformations give you back another Spout of the same or a different type, but some, especially the fan-outs and fan-ins work a bit differently.
The predefined transformations represent the real “meat” of swave. They encode a lot of the general logic that is typically required when working with streams. The power of stream-processing stems in large parts from being able to nicely assemble higher-level logic from lower-level primitives in a concise and elegant fashion. Therefore a central part of learning to program with streams is understanding which transformations already exist and how a given piece of business logic might be encoded by combining them in the right way. As with most programs there are usually many ways to achieve the same thing.
For example, here are ten ways of producing the same stream (the first 100 natural numbers) with swave:

def a = Spout(1 to 100)

def b = Spout(1, 2, 3 to 100: _*)

def c = Spout(100 to 1 by -1).map(101 - _)

def d = Spout.ints(from = 1, step = 1).take(100)

def e = Spout.iterate(1)(_ + 1).takeWhile(_ <= 100)

def f = Spout.repeat(1).take(99).scan(1)(_ + _)

def g = Spout(1 to 50) concat Spout(51 to 100)

def h = Spout(List.tabulate(10, 10)(_ * 10 + _))
  .flattenConcat()
  .map(_ + 1)

def i = {
  var i = 0
  Spout.continually { i += 1; i }.take(100)
}

def j = Spout.unfold(1) { i =>
  if (i < 100) Spout.Unfolding.Emit(i, next = i + 1)
  else Spout.Unfolding.EmitFinal(i)
}

Attaching Drains

After you’ve finished defining transformations you’ll want to run your stream, i.e. have it produce its output.
In order to do that, however, we first have to define what should happen with the stream output, i.e. where it should be produced to. This is done by attaching a Drain to the spout.

A Drain consumes the data elements of a stream for the purpose of producing some result. For example, this result could be a Seq of all elements, just the first or just the last element, or no element at all. In the latter case the drain might merely serve to achieve some kind of side-effect, like the execution of a function for each element produced by the stream.

Here is a Drain that collects all incoming elements into a Vector:

def drain[T]: Drain[T, Future[immutable.Seq[T]]] = Drain.seq[T](limit = 1000)

In addition to the type parameter for the element type (that we’ve already seen on Spout) a Drain has a second one, which defines the type of the result that the drain produces. For most drains this will be a Future, since the Drain has to work with synchronous as well as asynchronous streams.

Here are some other frequently used drains:

def drain1[T]: Drain[T, Future[T]] = Drain.head
def drain2[T]: Drain[T, Future[Unit]] = Drain.foreach(println)

As you can see, even the drains that produce “no” result, still produce one :).
For example the Drain.foreach, which runs a stream to completion only for side-effects, produces a Future[Unit]. Even though in the happy case the future’s value isn’t very interesting (the Unit value), it still signals two things:

  • if and when the stream completed (at the drain)
  • whether it terminated successfully or with an error

Once you have a drain you can attach it to a matching (type-wise) Spout with to(...), e.g.:

def streamGraph: StreamGraph[Future[Unit]] =
  Spout(1 to 100).to(Drain.foreach(println))

The result of attaching a Drain to a Spout is a StreamGraph, a type you’ll probably use less frequently in your own code. A StreamGraph represents a complete stream pipeline or graph setup, which is ready to be run. Its single type parameter is the result type of the Drain that was used to “close” the stream pipeline.

Note that up until this point, including the attachment of a Drain, nothing has really happened apart from describing what your stream setup looks like. No data has started to flow and no resources (apart from the memory for the pipeline) have been claimed.

Running a Stream

It is only when you call .run() on a StreamGraph that the whole stream machinery kicks into motion and data elements start to flow. Thereby one very important thing to have in mind is that you can only ever call .run() once on any given stream setup. After the first .run() call all elements of the stream setup, i.e. all involved spouts and drains (as well as, potentially, Pipes and Modules) have been “spent”. They cannot be used again and will cause all subsequent streams that they are incorporated into to fail with an IllegalReuseException. If you want to re-run the stream another fresh StreamGraph instance must be created.

swave streams can run synchronously (yet without any blocking!) purely on the caller thread. All the examples we’ve looked at so far are of this kind. However, there might be components in your stream setup that require asynchronous dispatch and therefore cannot run purely on the caller thread in a non-blocking fashion. For example, a spout or drain might be connected to a network socket and thus must be “woken up” when new data arrive or the kernel signals that the socket is now ready to accept data. Or the stream might contain a transformation that requires the concept of “time” in order to do its thing (like throttle or takeWithin).
Or you might want to introduce an asynchronous boundary manually (see the asyncBoundary transformation), in order to allow for different parts of your pipeline to be run in parallel. In all of these cases the stream will be started asynchronously when you call .run(), which means it will be started and run on another thread.

StreamEnv

When you try to run a stream now, only with what we’ve talked about so far, you’ll see that there is still one final thing missing. For example if we try to compile this snippet:

val result: Future[Unit] =
  Spout(1 to 100)
    .to(Drain.foreach(println))
    .run().result

the compiler would stop us with this error message:

... : could not find implicit value for parameter env: swave.core.StreamEnv
        .run()
            ^  

which tells us that we need to supply an implicit StreamEnv instance.

The StreamEnv is similar to the ActorSystem type in Akka. It provides all the global configuration information that’s required by swave’s internal streaming engine. For example, the StreamEnv contains the thread-pool configuration(s) and general Settings you have configured as well as a global Logger and Scheduler instance.

The simplest and yet perfectly fine way to supply a StreamEnv instance is this:

implicit val env = StreamEnv()

This will simply load the complete Configuration from the reference.conf and, potentially, application.conf files on your classpath.

Note: If any of the streams that is started with a particular StreamEnv instance is asynchronous, i.e. requires dispatch onto another thread, the StreamEnv instance needs to be explicitly shut down when the application wants to exit. Otherwise the internal thread-pools will not be terminated and thus keep the JVM from exiting (unless all thread-pools are configured with daemonic = on, which is not the default).

This is how you trigger an orderly shutdown of a StreamEnv instance:

env.shutdown()

drainTo Shortcuts

Since attaching a Drain and immediately calling .run() on the result is such a common pattern, swave offers several shortcuts that allow you to do both in one single step. For example:

def spout = Spout(1 to 100)

val result1: Future[Unit] = spout.foreach(println)

val result2: Future[Int] = spout.drainToHead()

val result3: Future[List[Int]] = spout.drainToList(limit = 1000)

There are more drain... variants available on Spout, you might want to check them out as well.