Build Your Own Message DB Client
The Message DB event store and message store is plain-old Postgres. Using it just means connecting via the Postgres client offered in your language of choice and executing some Postgres server functions. Having an event store in Postgres is an obvious win for teams that don’t want to run technologies intended for the most extreme degree of scale. But first you’ll need to build a basic client.
We’re aware of a number of projects being undertaken presently in various companies to implement Message DB clients in languages such as JavaScript, Clojure, Go, and Elixir. Implementing a basic client requires implementing just a handful of calls to Message DB’s API.
The Message DB store API is made of 12 Postgres server functions, but only 3 or 4 of them are required for interacting with it from your applications:
write_message
get_stream_messages
get_category_messages
get_last_stream_message
(optional)
Background
Here is some background on Message DB primitives and concepts used in the API.
Messages
Messages are the data structure stored in a message store. Events and commands, among others, are both kinds messages.
Messages have a JSON data
attribute and a JSON metadata
attribute, as well as a type
attribute, among other internal, mechanical attributes.
Streams and Categories
Streams and categories are the fundamental unit of organization of events in the message store.
A message is always written to a stream.
A message can be retrieved either from the stream it was written to, or from the stream’s category.
A stream records the state of a single entity. The term entity stream and stream are largely interchangeable.
A category is a collection of streams that start with the same name. For example, the account-123
stream and the account-456
stream are both part of the account
category.
The messages
Table
While the messages
table is an ordinary Postgres table that you can interact with via ordinary Postgres queries, the Message DB server functions implement the principal message store interaction patterns, and ensure that the appropriate reading and writing protocols are not by-passed.
And while you can interact with the table directly, doing so can violate the constraints of a stream-oriented store.
For more information about the messages
table schema, see:
http://docs.eventide-project.org/user-guide/message-db/anatomy.html#messages-table
The Message DB Core API
The Message DB core API is comprised of the write_message
function, the get_stream_messages
function, the get_category_messages
function, and the get_last_stream_message
function.
Writing Messages: The write_message
Function
Messages are always written to streams. They can be read from the stream they’re written to, or the category that the stream belongs to, but they’re always written to streams.
The signature of the write_message
function is:
write_message(
id varchar,
stream_name varchar,
type varchar,
data jsonb,
metadata jsonb DEFAULT NULL,
expected_version bigint DEFAULT NULL
)
The write_message
function returns the position of the written message.
Messages can optionally be written with optimistic concurrency protection using the expected_version
parameter.
For more details about using the write_message
function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#write-a-message
Writing Batches of Messages
Writing batches of messages to a stream isn’t directly supported by a single call to the write_message
function. However, because Postgres supports atomic writes using database transactions, batch writes is ultimately supported.
To write multiple messages to a stream in a batch, start a Postgres transaction and issue multiple calls to the write_messages
function.
Your client implementation can provide an method for writing a single message and a method for writing batch of messages. Both will call the write_message
function. The implementation that writes a batch of message will start a Postgres transaction, call the write_message
function for each message in the batch, and then commit the Postgres transaction.
Transactions should only be used for writes to the same stream. While it is technically possible to write to multiple streams using a Postgres transaction, doing so is ultimately a violation of event sourcing patterns and is strongly discouraged.
For more on Postgres transactions, see: https://www.postgresql.org/docs/current/tutorial-transactions.html
The Message DB server functions do not set or otherwise change the default transaction isolation level configured for the Postgres server. Fore more on Postgres isolation levels, see: https://www.postgresql.org/docs/current/transaction-iso.html
Get Messages from a Stream: The get_stream_messages
Function
The principle use of retrieving messages from a stream is Entity projection.
The get_stream_messages
function retrieves messages from a single entity stream, optionally specifying the starting position, the number of messages to retrieve, and an additional, arbitrary condition, including Postgres JSON query conditions, that are appended to the SQL query’s WHERE clause.
get_stream_messages(
stream_name varchar,
position bigint DEFAULT 0,
batch_size bigint DEFAULT 1000,
condition varchar DEFAULT NULL
)
The get_stream_messages
function returns a list of records from the messages
table.
For more details about using the get_stream_messages
function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-messages-from-a-category
Get Messages from a Category: The get_category_messages
Function
The principle use of retrieving messages from a category is consuming messages from an input queue and Pub/Sub. The function also supports partitioning the retrieved messages amongst multiple consumers.
The get_category_messages
function retrieves messages from a category of streams, optionally specifying the starting position, the number of messages to retrieve, the correlation category for Pub/Sub, consumer group parameters, and an additional, arbitrary condition, including Postgres JSON query conditions, that are appended to the SQL query’s WHERE clause.
get_category_messages(
category_name varchar,
position bigint DEFAULT 0,
batch_size bigint DEFAULT 1000,
correlation varchar DEFAULT NULL,
consumer_group_member varchar DEFAULT NULL,
consumer_group_size varchar DEFAULT NULL,
condition varchar DEFAULT NULL
)
The get_category_messages
function returns a list of records from the messages
table.
For more details about using the get_category_messages
function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-messages-from-a-category
Filtering Messages with a SQL Condition
The condition
parameter receives an arbitrary SQL condition which further filters the messages retrieved.
The condition
parameter is supported for both stream retrieval and category retrieval.
SELECT * FROM get_stream_messages('someStream-123', condition => 'data->>''amount'' > 0');
SELECT * FROM get_category_messages('someStream', condition => 'data->>''amount'' > 0');
The SQL condition feature is deactivated by default. The feature is activated using the message_store.sql_condition
Postgres configuration option: message_store.sql_condition=on
. Using the feature without activating the configuration option will result in an error.
Get the Last Message from a Stream: The get_last_stream_message
Function
The implementation of a call to the get_last_stream_message
is entirely optional. Last message retrieval is only useful in certain, specialized use cases.
There are a few uses of retrieving the last message from a stream, including retrieving the last entity snapshot from an entity snapshot stream, the last consumer position offset record from a consumer position stream, and to access the position
attribute from the last message in a stream. The position
attribute of the last message is referred to as the stream’s version. Therefore, getting the position of a stream’s last message is synonymous with getting the stream version.
get_last_stream_message(
stream_name varchar
)
The get_last_stream_message
function returns the single message that is the last message in the stream.
This function works only for entity streams, and does not work for categories.
For more details about using the get_last_stream_message
function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-last-message-from-a-stream
Alternative: stream_version
Function
An alternate approach to retrieving a stream’s last message is to retrieve a stream’s version directly using the stream_version
function.
The stream_version
function is technically an internal function, but it is part of the Message DB public API and can be used.
For details on using the stream_version
function, see:
http://docs.eventide-project.org/user-guide/message-db/server-functions.html#get-stream-version-from-a-stream
Reference Implementation
The Eventide Project’s Message Store library implementation in Ruby is a good reference implementation that illustrates the construction of a Message DB client.
Review the code on GitHub: https://github.com/eventide-project/message-store-postgres
Answers to Your Questions
To chat with other users of Message DB and client implementers, join the conversation on the Eventide Project’s Slack: https://eventide-project-slack.herokuapp.com
Happy Coding!