10

Making type-safe internet bots with Haskell

 2 years ago
source link: https://wjwh.eu/posts/2022-05-06-haskell-conduit-bots.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Making type-safe internet bots with Haskell

Posted on May 6, 2022 by wjwh

haskell-logo.png
There are basically two types of client applications on the internet:

  • Clients that use the request-response model. One request will generally result in one response. Some examples would be web browsers and many API clients.
  • Clients that receive a continuous stream of data from a server and may or may not send data back at any time. Examples of this type would include chatbots, automated trading applications and multiplayer video games.

In this article, I will describe a fairly general way to use Haskell for constructing a specific kind of clients of the second type: clients that listen to a single network socket and send replies to that same socket, possibly while maintaining state. This article assumes basic proficiency with Haskell and networking.

Conduits

The conduit package is a library that provides primitives for streaming data. You can think of a Conduit as a datatype representing a single element in a pipeline, similar to how you can write Unix pipelines in the shell. In most shells every input and output must be text-based, but since we run this inside a Haskell program we can make use of the type system to enable richer in- and output types of each element. Every element in the pipeline has:

  • An input type;
  • An output type which may or may not be the same as the input type;
  • A monadic environment such as IO or State in which it does its computations;
  • A result type to describe any results. For all but the last conduit in a pipeline this will be the empty type ().

Individual elements can be chained together with the .| operator, which makes sure that the output type of the argument on the left matches the input type of the argument on the right. A very basic pipeline might be something like yieldMany [1..15] .| mapC (length . show) .| printC, which will print the number of digits in the numbers 1 to 15 to standard output. You can run a pipeline with the runConduit function or one of its monadic siblings. This is in itself not something you would need Conduit for; a simple list could do this perfectly well. However, there are several benefits to using it:

  • It can maintain constant memory use even when working with very large amounts of data, like big files or long-running streams of network data.
  • It will handle the cleaning up of system resources like file handles and sockets as soon as possible, unlike lazy I/O where it is much harder to control when this happens.
  • Finally, it is much easier to interleave monadic and pure effects with each other in a controlled fashion. For example: if you want to add logging statements in the middle of your pipeline for debugging or auditing reasons then a pure pipeline of list functions becomes unwieldy quite quickly.

For more information on the rationale behind conduits and how to use them, the conduit readme is an excellent resource.

Some basic examples

To test these examples, I made an extremely basic server program which will listen on port 8000 and serve an endless stream of JSON values to whatever connects. The values are shaped like this:

$ nc localhost 8000
{"counter":1,"time":1647525924}
{"counter":2,"time":1647525925}
{"counter":3,"time":1647525926}
{"counter":4,"time":1647525927}
{"counter":5,"time":1647525928}
{"counter":6,"time":1647525929}
^C

Every JSON value contains only the counter and time keys. The time key contains the current UNIX timestamp, while the counter key contains the amount of values that have been sent so far. Values are sent out at a rate of 1 per second. Every test program in the following section will connect to this server. All the test programs including the test program can be found here.

A basic network conduit example

As a first example, let’s make a program that connects to the test server and then simply prints all the received values to the standard output. We can do this with the functions from Data.Conduit.Network from the conduit-extra package. This package provides a runTCPClient function, which does exactly what the name implies. It takes as arguments a ClientSettings and a function with signature (AppData -> IO a). The AppData contains everything you need to know about the connection, you can stream all the received data from it with the appSource function:

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad .| stdout

Since the appSource conduit outputs ByteString values and the stdout wants ByteStrings as input values, we have to do no further transformations to make the types match up. The output of this program is pretty much what you would expect:

$ cabal run
{"counter":1,"time":1647529231}
{"counter":2,"time":1647529232}
{"counter":3,"time":1647529233}
{"counter":4,"time":1647529234}
^C

So far, we have not done anything we couldn’t just do with netcat, so let’s look at a slightly more advanced example.

A conduit that parses the incoming stream

One of the main benefits of Haskell is that we can leverage the type system to encode extra information about our values. Since the JSON values follow a well defined format, we can use the aeson package to parse them into a data structure:

data CounterMessage = CounterMessage { counter :: Integer, time :: Integer }
    deriving stock (Show, Generic)
    deriving anyclass (FromJSON)

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad
                 .| linesUnboundedAscii
                 .| mapC (decodeStrict :: ByteString -> Maybe CounterMessage)
                 .| printC

By using the typeclass deriving functionality in GHC, we can automatically derive Show and FromJSON instances for our CounterMessage data structure. We can then use the decodeStrict function from the aeson package to decode the received JSON strings into Maybe CounterMessage values with mapC. The mapC conduit is similar to map on lists, it will apply a function to every value passed to it and pass the result downstream.

Since parsing can fail if the JSON value does not have the right structure to fit into a CounterMessage, it will return a value wrapped in Maybe. Since network connections gives no guarantees about how much data becomes available at a time, we also have to use the linesUnboundedAscii conduit. It collects all the incoming ByteString values until it encounters a newline and will then release a new ByteString containing the entire line. Finally, we can no longer use the stdout conduit to print to standard output. The mapC decodeStrict conduit will output Maybe CounterMessage values but stdout expects ByteString values as input, so the types do not match up. Instead, we can use the printC conduit, which will print any incoming values to standard output as long as they have a Show instance defined. Since CounterMessage has an automatically derived Show instance and Maybe a also has a Show instance as long as a has one, the combined Maybe CounterMessage will also have a Show instance.

$ cabal run
Just (CounterMessage {counter = 1, time = 1647529721})
Just (CounterMessage {counter = 2, time = 1647529722})
Just (CounterMessage {counter = 3, time = 1647529723})
Just (CounterMessage {counter = 4, time = 1647529724})
^C

Since all the JSON values were parsed succesfully, we have only Just values in the output stream.

A stateful conduit

So far, we have not done anything permanent with the values we receive except printing them to standard output. One way of keeping state and updating it with every new incoming value is the scanlC conduit, which behaves similar to the scanl function from Data.List: It computes a value like foldl, but also outputs all the intermediate values. You can see how it works in the following example:

data CounterMessage = CounterMessage { counter :: Integer, time :: Integer }
    deriving stock (Show, Generic)
    deriving anyclass (FromJSON)

data CounterState = CounterState
  { total :: Integer
  , lastUpdate :: Integer 
  } deriving (Show,Eq)

updateCounterState :: CounterState -> CounterMessage -> CounterState
updateCounterState (CounterState total _) (CounterMessage counter newTime)
  = CounterState (total + counter) newTime

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad
                 .| linesUnboundedAscii
                 .| mapC (decodeStrict :: C.ByteString -> Maybe CounterMessage)
                 .| mapC fromJust
                 .| scanlC updateCounterState (CounterState 0 0)
                 .| printC

The scanlC conduit has type signature Monad m => (a -> b -> a) -> a -> ConduitT b a m (). Since the appSource conduit forces m to be IO, we can simplify the type signature to (a -> b -> a) -> a -> ConduitT b a IO (). Since we know we want it to accept input value of type CounterMessage and maintain a state of type CounterState, we can further “fill in” the type signature to be (CounterState -> CounterMessage -> CounterState) -> CounterState -> ConduitT CounterMessage CounterState IO (). In normal language: it will take a function CounterState -> CounterMessage -> CounterState that updates the previous state with new information (very similar to the function you would pass to foldl') and an initial CounterState as arguments and performs as a conduit with CounterMessage inputs and CounterState outputs. In this case, we define the updateCounterState function to maintain a sum of all the counter values received so far and also to keep the last timestamp received. Every time our scanlC conduit receives a new CounterMessage, it will use the updateCounterState function to update its current CounterState and then emit this new value to the next element in the pipeline. Since CounterState also has a Show instance, we can send it directly into printC to be printed:

$ cabal run
CounterState {total = 0, lastUpdate = 0}
CounterState {total = 1, lastUpdate = 1647532637}
CounterState {total = 3, lastUpdate = 1647532638}
CounterState {total = 6, lastUpdate = 1647532639}
CounterState {total = 10, lastUpdate = 1647532640}
^C

It starts off with the initial state we passed to scanlC, then update it with each CounterMessage it receives. This update function was very simple, but you can make it (and the state it maintains) as complex as you want. Since the entire conduit pipeline is in IO there is nothing stopping you from doing database queries or even web requests for each update, although you will need to use scanlMC if you wish to have a non-pure update function.

Responding based on the state

Thus far we have only made conduit pipelines that ended up printing their values to standard output, possibly after doing some transformations on the data received. It will often not be enough to only receive data, sometimes we wish to send data as well. The AppData structure which we used for the appSource conduit can also be used for an appSink conduit, which takes in ByteString inputs and will send them to the socket that the AppData represents. Hooking it into pipeline we already have works pretty much as you would expect:

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad
                 .| linesUnboundedAscii
                 .| mapC (decodeStrict :: C.ByteString -> Maybe CounterMessage)
                 .| mapC fromJust
                 .| scanlC updateCounterState (CounterState 0 0)
                 .| filterC (odd . total)
                 .| mapC (ByteString.Char8.pack . (++ "\n") . show . lastUpdate)
                 .| appSink ad

The above example will send back the lastUpdate value of the CounterState back over the socket on a new line, but only if the total so far is odd. In this contrived example we simply use the Show instance to convert via String, but you can of course use any function you want to generate the ByteString.

If you do not wish to return value over the same socket but want to send requests to some other service, you can use the mapM_ combinator instead. It works just like the mapM_ function from Data.Foldable, except (you guessed it) it works on conduits.

Firming up the code

The above examples only scratch the surface of what is possible with conduit pipelines. The ecosystem around conduit is huge and many library authors have made adaptors to add interoperability with it to their packages. Some interesting directions to explore might be:

  • Almost every interesting data stream these days will require a TLS connection. For this, the network-conduit-tls package provides a runTLSClient which is a drop-in replacement for the runTCPClient function used in the examples above.
  • It is often desirable to add logging at some point in the pipeline. The iterM conduit will apply a monadic action on anything that it receives and then pass on the value unaltered. You can use this for logging like so: iterM (liftIO . print), assuming that the value passed in is a member of the Show typeclass of course.
  • Websockets: websocket clients can be created with the aptly named websockets library.
  • The examples used a stream of newline delimited JSON values, but not every stream is formatted like that. For example, the Redis replication stream uses a custom protocol that is designed to be both human readable and efficient to parse. For these kind of streams, you can define a custom attoparsec parser and hook it into the pipeline with conduitParser.
  • If you wish to test your application, you can store a pre-recorded stream as a test fixture and then use sourceFile to mimic a normal stream. This will allow you to quickly iterate without having to depend on a (possibly third-party) network service.

The technique described above works well for a client application talking to a single server. This is often enough, but sometimes you need to talk to several servers and make decisions based on information from all of them. In such a case, there are several frameworks built around Functional Reactive Programming (FRP) concepts available, such as Reactive, Reflex, Sodium and many more. FRP is a huge topic though and out of scope for this blog post.

Conclusion

Haskell is a programming language focused on lazy evaluation, and as such it also started out with lazily evaluated I/O. Several decades later, the problems with lazy I/O have proven greater than the benefits and several libraries have been developed that provide a better abstraction. Conduit is one such library and it provides a clean and very composable model for operating with streams of data. This makes it possible to develop and test components of the pipeline in separation, even by separate teams if necessary. The wide variety of available combinator functions combined with the type system also allows for

The network conduits make it very easy to construct conduit pipelines that interact with a socket. The resulting programs look very different than their counterparts in imperative languages, but they are quite readable to anyone with some basic Haskell knowledge and can be altered easily by adding or removing parts of the pipeline. Next time you find yourself writing a client application for some network protocol, give the conduits a try!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK