Skip to content

PostgreSql events proxy

Send NOTIFY and WAL events from PostgreSQL to upstream services

Purpose

This tool enables a decoupled architecture, think sending emails when a user signs up. Instead of having explicit code in your signup function that does the work (and slows down your response), you just have to worry about inserting the row into the database. This event (either the WAL or a NOTIFY generated by a trigger) will be proxied across to an upstream service. From there, you can have multiple consumers reacting to that event (send signup email, send sms notification). Those consumers tend to be very short, self contained scripts.

The larger goal is to enable the development of backends around PostgREST / subZero philosophy. Check out the PostgREST Starter Kit / subZero Starter Kit to see how pg-event-proxy fits in a larger project.

Currently the supported upstreams are (others like lambda, sns, webhook planned) - amqp 0.9 (RabbitMQ) - mqtt (Apache ActiveMQ, Cassandana, HiveMQ, Mosquitto, RabbitMQ, AWS IoT, Amazon MQ, ...) - redis pubsub (Redis)

This gives you a wide range of "destinations" for the database events

Configuration

Configuration is done through environment variables (useful in docker context) or a ini configuration file

Environment variables

Configuration for the event source (PostgreSql)

  • PGPROXY_LOCAL-POSTGRESQL_URI=postgresql://username:password@domain.tld:port/database

these configurations are only used when streaming WAL

  • PGPROXY_LOCAL-REPLICATION_SLOT=my_replication_slot (default pg_event_proxy)
  • PGPROXY_LOCAL-WAL2JSON_PARAMS=\"format-version\" '2' , \"include-types\" '1' (default \"format-version\" '2' , \"include-types\" '0')

Configuration for the event destination (upstream)

Each upstream has a slightly different configuration but they all share the BRIDGE_CHANNELS configuration. The format of "BRIDGE_CHANNELS" config is as follows:

pgchannel1->exchange:topic_exchange_name, pgchannel2->queue:queue_name, pgchannel3->topic:topic_name, ...

wal2json has a special meaning when used as a value for pgchannel. In this case the replication events (WAL) will be streamed (as opposed to events from NOTIFY wal2json, ... query). For example wal2json->exchange:amqp.topic will stream WAL to "amqp.topic" exchange. When streaming WAL, the database user needs to have REPLICATION privileges

Configuration for RabbitMQ upstream

  • PGPROXY_LOCAL-UPSTREAM=amqp
  • PGPROXY_LOCAL-UPSTREAM_CONFIG-URI=amqp://user:pass@rabbitmq//
  • PGPROXY_LOCAL-UPSTREAM_CONFIG-BRIDGE_CHANNELS=events->exchange:amq.topic, wal2json->exchange:amq.topic
  • PGPROXY_LOCAL-UPSTREAM_CONFIG-DELIVERY_MODE=1 # message delivery mode, 1 = non-persistent, 2 = persistent

Configuration for Redis upstream - PGPROXY_LOCAL-UPSTREAM=redis - PGPROXY_LOCAL-UPSTREAM_CONFIG-URI=redis://redis:6379 - PGPROXY_LOCAL-UPSTREAM_CONFIG-BRIDGE_CHANNELS=events->topic:events, wal2json->topic:wal

Configuration for Mqtt upstream

  • PGPROXY_LOCAL-UPSTREAM=mqtt
  • PGPROXY_LOCAL-UPSTREAM_CONFIG-URI=tcp://mosquitto:1883
  • PGPROXY_LOCAL-UPSTREAM_CONFIG-BRIDGE_CHANNELS=events->topic:events, wal2json->topic:wal
  • PGPROXY_LOCAL-UPSTREAM_CONFIG-USERNAME=username
  • PGPROXY_LOCAL-UPSTREAM_CONFIG-PASSWORD=password

Note: It's recommended to always use the same name for postgresql channel and exchange/queue/topic in BRIDGE_CHANNELS, for example app_events->topic:app_events, table_changes->topic:tables_changes to make it easy to determine where the event originated

Sending messages

A message can be sent either by running a query like NOTIFY pgchannel1, 'My message' or by simply running an INSERT/UPDATE/DELETE query if you configured WAL streaming by using wal2json as the name of the channel.

When using using amqp or mqtt as upstreams it's possible to specify the routing key/topic for the message. For example, having the BRIDGE_CHANNELS configured as pgchannel->exchange:events (rabbitmq) or pgchannel->topic:events (mqtt), by running the following query NOTIFY pgchannel, 'signup|A new user signup', a message will be published to the events exchange with the routing key signup in rabbitmq and respectively, a message published to the mqtt messaging server with the topic being events.signup. This allows for a wide range of routing topologies implemented in your messaging server.

Helper Functions

To make it bit more user friendly it's recommended to use events schema functions to send events. This module provides some durability features in case of intermittent disconnects. With this lib you can send events with a query like this

select events.send_message('pgchannel1', 'My message', 'routing-key', true)

See it in action

Clone pg-event-proxy-example repository.

Open docker-compose.yml file and uncomment the desired upstream (by default it's configured to send events to RabbitMQ)

Bring up the stack

docker-compose up -d

In console 1

./generate_events.sh

In console 2

./listen_events.sh