The Confluent JDBC Sink allows you to configure Kafka Connect to take
care of moving data reliably from Kafka to a relational database. Most
of the usual suspects (e.g. PostgreSQL, MySQL, Oracle etc) are
supported out the box and in theory, you could connect your data to
any database with a JDBC driver.
Making the current status of workers available over an HTTP API
Publishing metrics that facilitate the monitoring of all connectors in
a standard way
Assuming your infrastructure has an instance of Kafka Connect up and
running, all you need to do as a user of this system is submit a JSON
HTTP request to register a “job” and Kafka Connect will take care of
the rest.
To make things concrete, imagine we’re implementing an event-driven
bank and we have some process (or at scale, a collection of processes)
that keeps track of customer balances by applying a transaction
log. Each time a customer balance is updated for some transaction, a
record is written to the customer-balances topic and we’d like to sink
this topic into a database table so that other systems can quickly
look up the current balance for some customer without having to apply
all the transactions themselves.
The configuration for such a sink might look something like this…
It may be argued that since this is all just configuration, there is no
need for testing. Or if you try to test this, aren’t you just testing
Kafka Connect itself? I probably would have agreed with this sentiment until
the 2nd or 3rd time I had to reset the UAT environment after deploying a
slightly incorrect kafka connect job.
It is difficult to get these things perfectly correct first time and
an error can be costly to fix even if they happen in a test
environment (especially if the test environment is shared by other
developers and needs to be fixed or reset before trying again). For
this reason, it’s really nice to be able to quickly test it out in
your local environment and/or run some automated tests as part of your
continuous integration flow before any code gets merged.
So how do we test such a thing? Here’s a list of some of the steps we
could take. We could go further but this seems to catch most of the
errors that I’ve seen go wrong in practice.
Create the “customer-balances” topic from which data will be fed
into the the sink
Register the “customer-balance-sink” connector with a kafka-connect
instance provided by the test environment (and wait until it gets
into the “RUNNING” state)
Generate some example records to load into the input topic
Wait until the last of the generated records appears in the sink
table
Check that all records written to the input topic made it into the
sink table
Top-down, meet Bottom-up
As an aside, and to provide a bit of background to my thought
processes, many years ago, I came across the web.py project by the
late Aaron Swartz. The philosophy for that framework was
Think about the ideal way to write a web app. Write the code to make it happen.
This was one of many things he wrote that has stuck with me over the
years and it always comes to mind whenever I’m attempting to solve a new problem.
So when I thought about “the ideal way to write a test for a kafka connect sink”,
something like the following came to mind. This is the Top-down part of the
development process.
The first parameter to this function is simply a map that provides
information to the test helper about things like
How to identify the connector so that it can be found and loaded into the test environment
Where to write the test data
How to generate the test data (and how much test data to generate)
How to find the data in the database after the connect job has loaded it
into the database
How to decide when the all data has appeared in the sink
The second parameter is a function that will be invoked with all the
data that has been collected by the test-machine journal during the
test run (specifically the generated seed data, and the data retrieved
from the sink table by periodically polling the database with the
test-specific query defined by the help/poll-table helper).
For this, we use regular functional composition to build a single
assertion function from any number of single purpose assertion
functions like help/table-counts? and help/table-columns?. Each
assertion helper returns a function that receives the journal, runs
some assertions, and then returns the journal so that it may be
composed with other helpers. If any new testing requirements are
identified they can be easily added independently of the existing
assertion helpers.
With these basic testing primitives in mind we now need to “write the
code to make it happen”. i.e. The Bottom-up part of the development
process. With a bit of luck, they will meet in the middle.
Test Environment Additions
In addition to the base docker-compose config included in the
previous post, we
need a couple of extra services. We can either put those in their own
file and combine the two compose files using the -f option of
docker-compose, or we can just bundle it all up into a single compose
file. Each option has it’s trade-offs. I don’t feel too strongly
either way. Use whichever option fits best with your team’s workflow.
This will also depend on the particular database you use. We use PostgreSQL
here because it’s awesome.
Implementing the Test Helpers
The test helpers are a collection of higher-order functions that
allow the test-jdbc-sink function to pass control back to the test
author in order to run test-specific tasks. Let’s look at those
before delving into test-jdbc-sink itself which is a bit more
involved. The helpers are all fairly straight-forward so hopefully
the docstrings will be enough to understand what’s going on.
Finally, here is the annotated code for test-jdbc-sink. This has not yet
been properly extracted from the project which uses these tests so it
contains a bit of accidental complexity but hopefully I’ll be able to get
some version of this into jackdaw
soon. In the meantime I’m hoping it serves as a nice bit of
documentation for using the test-machine outside of contrived
examples.
And that’s it for now! Thanks for reading. Look forward to hearing
your thoughts and questions about this on Twitter. I tried to keep it
as short as possible so let me know if there’s anything I glossed over
which you’d like to see explained in more detail in subsequent posts.