Build Your Own Message DB Client

The Message DB event store and message store is plain-old Postgres. Using it just means connecting via the Postgres client offered in your language of choice and executing some Postgres server functions. Having an event store in Postgres is an obvious win for teams that don’t want to run technologies intended for the most extreme degree of scale. But first you’ll need to build a basic client.

We’re aware of a number of projects being undertaken presently in various companies to implement Message DB clients in languages such as JavaScript, Clojure, Go, and Elixir. Implementing a basic client requires implementing just a handful of calls to Message DB’s API.

The Message DB store API is made of 12 Postgres server functions, but only 3 or 4 of them are required for interacting with it from your applications:

  • write_message
  • get_stream_messages
  • get_category_messages
  • get_last_stream_message (optional)

Background

Here is some background on Message DB primitives and concepts used in the API.

Messages

Messages are the data structure stored in a message store. Events and commands, among others, are both kinds messages.

Messages have a JSON data attribute and a JSON metadata attribute, as well as a type attribute, among other internal, mechanical attributes.

Streams and Categories

Streams and categories are the fundamental unit of organization of events in the message store.

A message is always written to a stream.

A message can be retrieved either from the stream it was written to, or from the stream’s category.

A stream records the state of a single entity. The term entity stream and stream are largely interchangeable.

A category is a collection of streams that start with the same name. For example, the account-123 stream and the account-456 stream are both part of the account category.

The messages Table

While the messages table is an ordinary Postgres table that you can interact with via ordinary Postgres queries, the Message DB server functions implement the principal message store interaction patterns, and ensure that the appropriate reading and writing protocols are not by-passed.

And while you can interact with the table directly, doing so can violate the constraints of a stream-oriented store.

For more information about the messages table schema, see:
http://docs.eventide-project.org/user-guide/message-db/anatomy.html#messages-table

The Message DB Core API

The Message DB core API is comprised of the write_message function, the get_stream_messages function, the get_category_messages function, and the get_last_stream_message function.

Writing Messages: The write_message Function

Messages are always written to streams. They can be read from the stream they’re written to, or the category that the stream belongs to, but they’re always written to streams.

The signature of the write_message function is:

write_message(
  id varchar,
  stream_name varchar,
  type varchar,
  data jsonb,
  metadata jsonb DEFAULT NULL,
  expected_version bigint DEFAULT NULL
)

The write_message function returns the position of the written message.

Messages can optionally be written with optimistic concurrency protection using the expected_version parameter.

For more details about using the write_message function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#write-a-message

Writing Batches of Messages

Writing batches of messages to a stream isn’t directly supported by a single call to the write_message function. However, because Postgres supports atomic writes using database transactions, batch writes is ultimately supported.

To write multiple messages to a stream in a batch, start a Postgres transaction and issue multiple calls to the write_messages function.

Your client implementation can provide an method for writing a single message and a method for writing batch of messages. Both will call the write_message function. The implementation that writes a batch of message will start a Postgres transaction, call the write_message function for each message in the batch, and then commit the Postgres transaction.

Transactions should only be used for writes to the same stream. While it is technically possible to write to multiple streams using a Postgres transaction, doing so is ultimately a violation of event sourcing patterns and is strongly discouraged.

For more on Postgres transactions, see: https://www.postgresql.org/docs/current/tutorial-transactions.html

The Message DB server functions do not set or otherwise change the default transaction isolation level configured for the Postgres server. Fore more on Postgres isolation levels, see: https://www.postgresql.org/docs/current/transaction-iso.html

Get Messages from a Stream: The get_stream_messages Function

The principle use of retrieving messages from a stream is Entity projection.

The get_stream_messages function retrieves messages from a single entity stream, optionally specifying the starting position, the number of messages to retrieve, and an additional, arbitrary condition, including Postgres JSON query conditions, that are appended to the SQL query’s WHERE clause.

get_stream_messages(
  stream_name varchar,
  position bigint DEFAULT 0,
  batch_size bigint DEFAULT 1000,
  condition varchar DEFAULT NULL
)

The get_stream_messages function returns a list of records from the messages table.

For more details about using the get_stream_messages function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-messages-from-a-category

Get Messages from a Category: The get_category_messages Function

The principle use of retrieving messages from a category is consuming messages from an input queue and Pub/Sub. The function also supports partitioning the retrieved messages amongst multiple consumers.

The get_category_messages function retrieves messages from a category of streams, optionally specifying the starting position, the number of messages to retrieve, the correlation category for Pub/Sub, consumer group parameters, and an additional, arbitrary condition, including Postgres JSON query conditions, that are appended to the SQL query’s WHERE clause.

get_category_messages(
  category_name varchar,
  position bigint DEFAULT 0,
  batch_size bigint DEFAULT 1000,
  correlation varchar DEFAULT NULL,
  consumer_group_member varchar DEFAULT NULL,
  consumer_group_size varchar DEFAULT NULL,
  condition varchar DEFAULT NULL
)

The get_category_messages function returns a list of records from the messages table.

For more details about using the get_category_messages function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-messages-from-a-category

Filtering Messages with a SQL Condition

The condition parameter receives an arbitrary SQL condition which further filters the messages retrieved.

The condition parameter is supported for both stream retrieval and category retrieval.

SELECT * FROM get_stream_messages('someStream-123', condition => 'data->>''amount'' > 0');

SELECT * FROM get_category_messages('someStream', condition => 'data->>''amount'' > 0');

The SQL condition feature is deactivated by default. The feature is activated using the message_store.sql_condition Postgres configuration option: message_store.sql_condition=on. Using the feature without activating the configuration option will result in an error.

Get the Last Message from a Stream: The get_last_stream_message Function

The implementation of a call to the get_last_stream_message is entirely optional. Last message retrieval is only useful in certain, specialized use cases.

There are a few uses of retrieving the last message from a stream, including retrieving the last entity snapshot from an entity snapshot stream, the last consumer position offset record from a consumer position stream, and to access the position attribute from the last message in a stream. The position attribute of the last message is referred to as the stream’s version. Therefore, getting the position of a stream’s last message is synonymous with getting the stream version.

get_last_stream_message(
  stream_name varchar
)

The get_last_stream_message function returns the single message that is the last message in the stream.

This function works only for entity streams, and does not work for categories.

For more details about using the get_last_stream_message function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-last-message-from-a-stream

Alternative: stream_version Function

An alternate approach to retrieving a stream’s last message is to retrieve a stream’s version directly using the stream_version function.

The stream_version function is technically an internal function, but it is part of the Message DB public API and can be used.

For details on using the stream_version function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-stream-version-from-a-stream

Reference Implementation

The Eventide Project’s Message Store library implementation in Ruby is a good reference implementation that illustrates the construction of a Message DB client.

Review the code on GitHub: https://github.com/eventide-project/message-store-postgres

Answers to Your Questions

To chat with other users of Message DB and client implementers, join the conversation on the Eventide Project’s Slack: https://eventide-project-slack.herokuapp.com

Happy Coding!