PostgreSql events proxy¶
Send NOTIFY and WAL events from PostgreSQL to upstream services
Purpose¶
This tool enables a decoupled architecture, think sending emails when a user signs up. Instead of having explicit code in your signup
function that does the work (and slows down your response), you just have to worry about inserting the row into the database. This event (either the WAL or a NOTIFY generated by a trigger) will be proxied across to an upstream service. From there, you can have multiple consumers reacting to that event (send signup email, send sms notification). Those consumers tend to be very short, self contained scripts.
The larger goal is to enable the development of backends around PostgREST / subZero philosophy.
Check out the PostgREST Starter Kit / subZero Starter Kit to see how pg-event-proxy
fits in a larger project.
Currently the supported upstreams are (others like lambda, sns, webhook planned) - amqp 0.9 (RabbitMQ) - mqtt (Apache ActiveMQ, Cassandana, HiveMQ, Mosquitto, RabbitMQ, AWS IoT, Amazon MQ, ...) - redis pubsub (Redis)
This gives you a wide range of "destinations" for the database events
Configuration¶
Configuration is done through environment variables (useful in docker context) or a ini configuration file
Environment variables¶
Configuration for the event source (PostgreSql)
PGPROXY_LOCAL-POSTGRESQL_URI=postgresql://username:password@domain.tld:port/database
these configurations are only used when streaming WAL
PGPROXY_LOCAL-REPLICATION_SLOT=my_replication_slot
(defaultpg_event_proxy
)PGPROXY_LOCAL-WAL2JSON_PARAMS=\"format-version\" '2' , \"include-types\" '1'
(default\"format-version\" '2' , \"include-types\" '0'
)
Configuration for the event destination (upstream)
Each upstream has a slightly different configuration but they all share the BRIDGE_CHANNELS
configuration.
The format of "BRIDGE_CHANNELS" config is as follows:
pgchannel1->exchange:topic_exchange_name, pgchannel2->queue:queue_name, pgchannel3->topic:topic_name, ...
wal2json
has a special meaning when used as a value for pgchannel.
In this case the replication events (WAL) will be streamed (as opposed to events from NOTIFY wal2json, ... query).
For example wal2json->exchange:amqp.topic
will stream WAL to "amqp.topic" exchange.
When streaming WAL, the database user needs to have REPLICATION privileges
Configuration for RabbitMQ upstream
PGPROXY_LOCAL-UPSTREAM=amqp
PGPROXY_LOCAL-UPSTREAM_CONFIG-URI=amqp://user:pass@rabbitmq//
PGPROXY_LOCAL-UPSTREAM_CONFIG-BRIDGE_CHANNELS=events->exchange:amq.topic, wal2json->exchange:amq.topic
PGPROXY_LOCAL-UPSTREAM_CONFIG-DELIVERY_MODE=1
# message delivery mode, 1 = non-persistent, 2 = persistent
Configuration for Redis upstream
- PGPROXY_LOCAL-UPSTREAM=redis
- PGPROXY_LOCAL-UPSTREAM_CONFIG-URI=redis://redis:6379
- PGPROXY_LOCAL-UPSTREAM_CONFIG-BRIDGE_CHANNELS=events->topic:events, wal2json->topic:wal
Configuration for Mqtt upstream
PGPROXY_LOCAL-UPSTREAM=mqtt
PGPROXY_LOCAL-UPSTREAM_CONFIG-URI=tcp://mosquitto:1883
PGPROXY_LOCAL-UPSTREAM_CONFIG-BRIDGE_CHANNELS=events->topic:events, wal2json->topic:wal
PGPROXY_LOCAL-UPSTREAM_CONFIG-USERNAME=username
PGPROXY_LOCAL-UPSTREAM_CONFIG-PASSWORD=password
Note: It's recommended to always use the same name for postgresql channel and exchange/queue/topic in BRIDGE_CHANNELS
, for example
app_events->topic:app_events, table_changes->topic:tables_changes
to make it easy to determine where the event originated
Sending messages¶
A message can be sent either by running a query like NOTIFY pgchannel1, 'My message'
or by simply running an INSERT/UPDATE/DELETE
query if you configured WAL streaming by using wal2json
as the name of the channel.
When using using amqp or mqtt as upstreams it's possible to specify the routing key/topic for the message.
For example, having the BRIDGE_CHANNELS
configured as pgchannel->exchange:events
(rabbitmq) or pgchannel->topic:events
(mqtt), by running the following query
NOTIFY pgchannel, 'signup|A new user signup'
, a message will be published to the events
exchange with the routing key signup
in rabbitmq and respectively, a message published to the mqtt messaging server with the topic being events.signup
. This allows for a wide range of routing topologies implemented in your messaging server.
Helper Functions¶
To make it bit more user friendly it's recommended to use events schema functions to send events. This module provides some durability features in case of intermittent disconnects. With this lib you can send events with a query like this
select events.send_message('pgchannel1', 'My message', 'routing-key', true)
See it in action¶
Clone pg-event-proxy-example repository.
Open docker-compose.yml
file and uncomment the desired upstream (by default it's configured to send events to RabbitMQ)
Bring up the stack
docker-compose up -d
In console 1
./generate_events.sh
In console 2
./listen_events.sh