IdrisPipes: a library for composable and effectful stream processing in Idris
Let’s implement a pipe library for Idris, inspired by the great Haskell Pipes and Haskell Conduit libraries. Our goal is to provide Idris with a library for composable and effectful production, transformation and consumption of streams of data.
In short, our IdrisPipes library is an Idris package that aims at providing the means to write:
- Effectful programs, with side effects such as IO
- Over a stream of data, potentially infinite
- Efficiently, by streaming data and controlling memory consumption
- In a composable way, allowing problem decomposition and reuse
In this first post, we will illustrate with examples the motivations behind this library, and give a quick overview of the features it offers. Future posts will be dedicated to explain how it works in more details, how to build your own pipes easily, as well as describing some of the difficulties I had implementing it in Idris.
Motivation & Examples
To explain the goal of this library, we will first illustrate how to use it on a small example of effectful code, and see how we can use IdrisPipes to improve on it.
A world without pipes
We will start with a small effectful function, which we will later enrich with additional requirements. This small function is named “echo”:
- It acquires some strings from the standard inputs
- Echo them back in the standard output
- And stops upon receiving the string “quit” as input
Here is one possible implementation of this need, without using pipes:
echo : IO ()
echo = do
putStr "in> " -- Write "in> " in standard output
l <- getLine -- Read the standard input
when (l /= "quit") $ do -- Upon encountering something else than "quit"
putStrLn ("out> " ++ l) -- Echo the string in standard output
echo -- Recurse to loop again
Let us run it into the REPL. Upon each line entered after the “in” prompt, it echos back the same string in the “out” prompt, and the “quit” string correctly terminates the execution of the function:
in> toto -- user input
out> toto -- output
in> titi
out> titi
in> quit
Simple enough. Let us now enrich our “echo” function with an additional requirement.
Adding concerns
Our echo
function should not repeat the same output twice in a row anymore. In other words, if we write two times in a row the same string, we want it to ignore the second occurrence:
in> toto -- user input
out> toto -- output
in> toto -- same entry, do not repeat
in> quit
We can easily adapt our previous echo
function to support this new requirement, with just a few additional lines of code:
echo : IO ()
echo = loop (const True) where
loop : (String -> Bool) -> IO ()
loop isDifferent = do -- `isDifferent` refers to the previous string
putStr "in> "
l <- getLine
when (l /= "quit") $ do
when (isDifferent l) $ -- If the string is different from the previous
putStrLn ("out> " ++ l) -- Echo the string in standard output
loop (/= l) -- Track the last read string
While the number of lines did not increase that much, the overall complexity of the function did.
The loop has to maintain a state that allows us to identify whether the new value is different from the previous one. This state is visible from the whole loop, and comes with additional conditional branching. In short, the additional concern is coupled with the rest of the echo
function.
Refactoring with IdrisPipes
We will now see how to get rid of this coupling by refactoring the function using IdrisPipes. We will start with our initial “echo” function and then add the additional requirement of deduplicating entries afterwards.
Thinking in streams
IdrisPipes is based on the idea of building small pipes dedicated to one specific task to perform on a stream of data (like production, transformation or consumption). Each pipe is only concerned with dealing with awaiting for new inputs, transforming this piece of data, and yielding resulting outputs.
These pipes can be connected together to form more complex pipes: the outputs of one pipes become the inputs of the next pipe. Ultimately, these pipes are assembled as a complete pipeline, an Effect
in the vocabulary of IdrisPipes, a recipe for stream processing.
The key to our refactoring of echo
to use IdrisPipes is thus to observe in what way the function corresponds to the processing of a stream of data.
Refactoring
We can easily view our echo
function as processing a stream of lines read from the standard input and flowing down to standard output. We can therefore use IdrisPipes to model the recipe matching it:
stdinLn "in> "
.| takingWhile (/= "quit")
.| mapping ("out> " ++)
.| stdoutLn
This recipe is made of several pipes, connected together with the .|
pipe operator:
stdinLn
feeds the pipe with strings read from the standard inputtakingWhile
forwards elements passing through it, interrupting the pipeline if it sees the string “quit”mapping
transforms elements passing through it (here to prepend a prompt to each string)stdoutLn
writes in standard output any string which goes through it
Assembled together, these pipes build an Effect
that corresponds to the recipe of echoing back strings as long as the string read is not “quit”.
Running a recipe
The Effect
we just wrote is but the description for a recipe for processing a stream. This allow us to decouple the specification of the transformation from its execution (and therefore potentially offer several ways to execute a recipe).
To run the recipe in a sequential fashion, and start flowing some data (from left to right) inside it, we use the runEffect
function:
echo : IO ()
echo = runEffect $ -- Run the pipeline which
stdinLn "in> " -- * read the standard input
.| takingWhile (/= "quit") -- * as long as the user does not write "quit"
.| mapping ("out> " ++) -- * preprend "out>" to the string read
.| stdoutLn -- * then write the string in standard output
The code above is equivalent to our initial “echo” function:
echo : IO ()
echo = do
putStr "in> " -- Write "in> " in standard output
l <- getLine -- Read the standard input
when (l /= "quit") $ do -- Upon encountering something else than "quit"
putStrLn ("out> " ++ l) -- Echo the string in standard output
echo -- Recurse to loop again
The big difference is that each concerns are kept separated: for instance, the acquisition of inputs (stdinLn
) and the stop condition (takingWhile
) are fully decoupled. As we will see, this will help us a lot when comes to some additional requirements in our program.
Using pipes to decouple concerns
In our initial implement of “echo”, adding the requirement of deduplicating the input strings led to coupled concerns. Using IdrisPipes, we can instead isolate this additional concern into a specific pipe, named deduplicating.
This pipe will await values, keep track of the last value read, and only yield those that are different from the previous one. The full code of deduplicating is shown below as reference (*). We will later explain how to build our own pipelines in more details:
deduplicating : (Eq a, Monad m) => Pipe a m a
deduplicating = recur (the (a -> Bool) (const True)) where
recur isDifferent =
awaitOne $ \x => do
when (isDifferent x) (yield x)
recur (/= x)
This deduplicating logic is isolated, decoupled from the rest of the pipeline, and can now be used else where in the code. In particular, we can add this new pipe in our pipeline to support the new requirement, and no other parts of the pipeline need to know about it:
echo : IO ()
echo = runEffect $
stdinLn "in> "
.| takingWhile (/= "quit")
.| deduplicating -- Remove consecutive repeating calls
.| mapping ("out> " ++)
.| stdoutLn
And we are done. We managed to add the requirement of deduplicated entries as a separate concern, decoupled from the rest of the pipeline.
(*) This Pipe is also available in the library in Pipes.Prelude, along with plenty other standard pipes. You should consider taking a look at the rich set of already available pipes before implementing your own.
A Quick tour of IdrisPipes support for stream processing
Let us now do a quick tour of the design and model followed by IdrisPipes and the features it provides to build effectful streaming programs.
A few concepts - One composition operator
A pipeline of data transformation typically consists of three kind of elements: a source that streams pieces of data, intermediary pipes to transform that data, and a sink that consumes the data to produce a single result. Each of these elements can produce some side effects as well.
IdrisPipes defines a type for each of these kinds of pipes. These types are listed below, with m
standing for the Monad matching the desired side effects:
- A
Pipe i m o
awaits values of typei
and yields values of typeo
- A
Source m o
yields values of typeo
, and cannot await any values - A
Sink i m r
awaits values of typei
to build a result of typem r
- A
Effect m r
cannot yield or await, and producesa m r
when executed
In short, an Effect
represents the complete pipeline, starting with a Source
, ending with a Sink
and possibly made of intermediary Pipes
, and which can be run using runEffect
:
echo : IO ()
echo = runEffect $
stdinLn "in> " -- Source
.| takingWhile (/= "quit") -- Pipe
.| deduplicating -- Pipe
.| mapping ("out> " ++) -- Pipe
.| stdoutLn -- Sink
All these types compose together nicely using a single .|
operator, to produce new pipes that automatically match their associated semantic model:
- A
Source m a
followed by aPipe a m b
is aSource m b
- A
Pipe a m b
followed by aSink b m r
is aSink a m r
- A
Pipe a m b
followed by aPipe b m c
is aPipe a m c
- A
Source m a
followed by aSink a m r
is anEffect m r
All these types are in fact type synonyms for the core data type PipeM i o r1 m r2
, with some of the type variables replaced by Void
. We will explore in future posts what this type represents in more details.
Note: If you look at the API, you will also notice additional types such as SourceM
, SinkM
, PipeM
. These types are more general than Source
, Sink
and Pipe
and give full access to the return type of the pipe, something useful in particular cases. We will explore this in future posts.
Two main primitives: yield and await
Defining a new pipe is rather easy and relies upon just a few ingredients: yield
(to send a value downstream) and await
(to receive a value from upstream). To complete the picture, we can use tail recursion to build stateful pipes, and the Monad m
to produce side-effects.
For instance, we can define an infinite Source
that yield
the repeated application of a given function (known as the function iterate
in Idris) rather easily:
iterating : (Monad m) => (a -> a) -> a -> Source m a
iterating f = recur where
recur a = do
yield a -- Yield the value downstream
recur (f a) -- Recurse with (f a) as next seed
Similarly, we can easily define a Sink
that folds over its inputs to build a result. It just awaits for inputs and combine them with an initial accumulator. Upon await
returning Nothing
, the stream does not have any more values to stream, and we can return the result:
fold : (Monad m) => (a -> b -> b) -> b -> Sink a m b
fold f = recur where
recur acc = do
ma <- await -- await for more inputs
case ma of
Just x => do -- in case there is a value to process
acc' <- f x acc -- * combine it the current accumulator
recur acc' -- * and recurse with the new value
Nothing => pure acc -- otherwise, return the result
We can plug these two pipes together to sum the integers from 0 to 4 as follows:
runPure $ -- Run the pipeline in the Identity Monad (pure)
iterating (+1) 0 -- Generate the integers from 0 to infinity
.| takingWhile (< 5) -- Take the ones below 5
.| fold (+) 0 -- And sum them
Note: these pipes are already available in IdrisPipes, although the actual implementation is more concise and general. The library also offer a function “awaitOr” that allows to capture the return type of the previous pipe.
Return value and early termination
As demonstrated by the great Haskell Pipes and Haskell Conduit libraries, there is a huge design space for a pipe library such as IdrisPipes.
In particular, there are many design choice we can make regarding the support of early termination inside the core of the library, or the support of return values at different stages of the pipeline.
Following the reading of this great post on the flows of Haskell Pipes and Conduit by Michael Snoyman, I decided to follow the path of Conduit and integrate early termination inside the core of IdrisPipes.
This way, you can define a Sink
such as fold, or a pipe such as groupBy
, which would not have been possible without the support for early termination, while still allowing for the pipes to return some values.
Pull based stream model
IdrisPipes follows a pull-based streaming model:
- The
Sink
starts processing first - Each pipe gives control to the pipe before it when it awaits a value
- Each pipe immediately releases control to the pipe after it when it yields a value
This allows keeping the memory consumption of the pipeline under control. You can tweak it, and exchange memory for speed in some cases, by implementing pipes that group individual pieces of data into chunks. In fact, an implementation of this pipe is available in Pipes.Prelude, named chunking
:
chunking : (Monad m) => (n: Nat) -> {auto prf: GTE n 0} -> Pipe a m (List a)
chunking = ?hole -- Defined in Pipes.Prelude
One direct consequence of this pull-based streaming model is that the source might not be totally consumed. The pipeline will only consume as much as the source as needed.
It also means that you can use IdrisPipes to perform pure lazy computation (using runPure
), which you can find useful as Idris is strict by default (there are other solutions available for you to do this though).
Large collection of built-in algorithms
In order to avoid you having to re-invent the wheel, IdrisPipes also comes with a large collection of already existing pipes in Pipes.Prelude, which will only grow as time goes by.
Here is a non-exhaustive list of existing pipes you can use:
mapping f
maps each element of the pipe withf
filtering p
only forward elements satisfyingp
groupingBy p
build chunks of elements comparable byp
splittingBy p
splits in chunk at elements satisfyingp
tracing f
runs an effectful functionf
on each element
The library also comes with some helper functions which help you building your own pipes. For instance, some helpers will take care of early termination and automatic forwarding of the return value of the previous pipe. In particular, the following function might prove useful to you:
awaitForever
helps building stateless pipesawaitOne
helps building stateful pipeseach
helps yielding several valuesidP
gives you an identity pipe
These helpers are available and documented in Pipes.Core.
Missing features
IdrisPipes does not yet support leftovers, or the prompt release of resources, as Haskell pipes-safe and Haskell conduit allows you to do. These features might be integrated into the library in the coming months.
Conclusion and what’s next?
In this post, we went over the motivations and goals behind IdrisPipes as well as a quick overview of the package and the features it offers.
In future posts, we will zoom into the implementation details of the package to explain how it works, and discuss some of the design choices and some possible alternatives.
You can have a look at the full package in this GitHub repo.
Comments