pgstore

package module
v0.0.0-...-977a178 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package pgstore provides PostgreSQL-backed stores for events, snapshots, and checkpoints.

PGNotifier implements subscription.StoreNotifier using PostgreSQL LISTEN/NOTIFY. After each event append, NOTIFY is sent on the configured channel with the latest global sequence. Listeners wake instantly instead of polling.

Package pgstore provides a PostgreSQL-backed EventStore implementation using pgx/v5.

Events are stored in a single table with JSONB data, using transaction-based optimistic concurrency control. The UNIQUE(stream_id, version) constraint acts as a final safety net against duplicate versions.

Index

Constants

View Source
const (
	// DefaultLeaseTimeout is how long a partition lease is valid without renewal.
	DefaultLeaseTimeout = 30 * time.Second

	// DefaultRenewInterval is how often to renew the lease.
	DefaultRenewInterval = 10 * time.Second

	// DefaultPartitionCount is the default number of partitions.
	DefaultPartitionCount = 8
)
View Source
const (
	// DefaultChangeChannel is the default PG NOTIFY channel for projection changes.
	DefaultChangeChannel = "eskit_changes"
)
View Source
const (
	// DefaultChannel is the default NOTIFY channel name.
	DefaultChannel = "eskit_events"
)

Variables

This section is empty.

Functions

func NotifyAppend

func NotifyAppend(ctx context.Context, pool *pgxpool.Pool, channel string, sequence int64) error

NotifyAppend sends a NOTIFY with the given global sequence. Call this after appending events (inside or outside the transaction).

Types

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore is a PostgreSQL-backed checkpoint for event subscriptions. Tracks each consumer's last processed global sequence number. Thread-safe: PostgreSQL handles concurrent access.

func NewCheckpointStore

func NewCheckpointStore(ctx context.Context, pool *pgxpool.Pool) (*CheckpointStore, error)

NewCheckpointStore creates a PostgreSQL-backed checkpoint store. Creates the checkpoints table if it doesn't exist.

func (*CheckpointStore) Load

func (c *CheckpointStore) Load(ctx context.Context, consumerID string) (uint64, error)

Load returns the last processed sequence for a consumer. Returns 0 if new.

func (*CheckpointStore) Save

func (c *CheckpointStore) Save(ctx context.Context, consumerID string, sequence uint64) error

Save persists the consumer's position atomically via upsert.

type ClusterOption

type ClusterOption func(*clusterConfig)

ClusterOption configures a ClusterStore.

func WithPool

func WithPool(pool *pgxpool.Pool) ClusterOption

WithPool sets a single pool for both reads and writes. Convenience for simple deployments without read replicas.

func WithReadPool

func WithReadPool(pool *pgxpool.Pool) ClusterOption

WithReadPool sets the replica pool for read operations. If not set, reads use the write pool.

func WithWritePool

func WithWritePool(pool *pgxpool.Pool) ClusterOption

WithWritePool sets the primary pool for write operations.

type ClusterStore

type ClusterStore[E any] struct {
	// contains filtered or unexported fields
}

ClusterStore is a PostgreSQL event store with read/write pool splitting. Writes go to the primary, reads can go to replicas for horizontal read scaling.

Compatible with:

  • Standard PostgreSQL (primary + replicas)
  • Citus (distributed Postgres)
  • Neon (serverless Postgres)
  • YugabyteDB (Postgres wire protocol)
  • CockroachDB (Postgres wire protocol — see advisory lock notes)

For CockroachDB: advisory locks are not supported. Use the CAS-based optimistic concurrency (the UNIQUE constraint) as the sole safety net, or use a separate locks table.

func NewClusterStore

func NewClusterStore[E any](ctx context.Context, opts ...ClusterOption) (*ClusterStore[E], error)

NewClusterStore creates a PostgreSQL event store with read/write pool splitting. Runs migration on the write pool.

func (*ClusterStore[E]) Append

func (s *ClusterStore[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, metadata ...eskit.Metadata) ([]eskit.Event[E], error)

Append persists events using the WRITE pool (primary).

func (*ClusterStore[E]) Close

func (s *ClusterStore[E]) Close()

Close closes both connection pools.

func (*ClusterStore[E]) Load

func (s *ClusterStore[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)

Load returns all events using the READ pool (replica).

func (*ClusterStore[E]) LoadFrom

func (s *ClusterStore[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)

LoadFrom returns events from a version using the READ pool (replica).

func (*ClusterStore[E]) Ping

func (s *ClusterStore[E]) Ping(ctx context.Context) error

Ping checks connectivity to both write and read pools.

type GlobalEvent

type GlobalEvent[E any] struct {
	GlobalSequence uint64
	StreamID       string
	EventType      string
	Version        int
	Data           E
	Timestamp      time.Time
}

GlobalEvent is a store-wide event with a global sequence number. Compatible with subscription.GlobalEvent but decoupled to avoid import cycles.

type GlobalReaderOption

type GlobalReaderOption[E any] func(*PGGlobalReader[E])

GlobalReaderOption configures a PGGlobalReader.

func WithGlobalReaderCodecRegistry

func WithGlobalReaderCodecRegistry[E any](reg *codec.Registry) GlobalReaderOption[E]

WithGlobalReaderCodecRegistry sets the codec registry for reading events with different codecs.

func WithGlobalReaderRegistry

func WithGlobalReaderRegistry[E any](reg *eskit.EventRegistry) GlobalReaderOption[E]

WithGlobalReaderRegistry enables type registry for heterogeneous global reads.

type PGChangeRelay

type PGChangeRelay struct {
	// contains filtered or unexported fields
}

PGChangeRelay broadcasts projection changes across server instances using PostgreSQL LISTEN/NOTIFY. Use for shared-consumer projections where only one server processes each event and other servers need to be notified.

Flow:

  1. Server A processes event → OnChange fires → Broadcast() sends PG NOTIFY
  2. All servers (including A) receive the NOTIFY via Start()
  3. Each server forwards the change to its local ChangeNotifier
  4. SSE handlers on all servers get notified

Safe for concurrent use.

func NewPGChangeRelay

func NewPGChangeRelay(pool *pgxpool.Pool, channel string, notifier *eskit.ChangeNotifier, opts ...PGChangeRelayOption) *PGChangeRelay

NewPGChangeRelay creates a relay that bridges ChangeNotifier across servers using PostgreSQL LISTEN/NOTIFY.

The notifier receives all changes detected by this relay (from any server). Call Start() to begin listening.

func (*PGChangeRelay) Broadcast

func (r *PGChangeRelay) Broadcast(change eskit.Change)

Broadcast sends a change notification to all servers via PG NOTIFY. Also notifies the local ChangeNotifier directly for immediate delivery to SSE handlers on this server.

Non-blocking: the PG NOTIFY is fire-and-forget. If the database connection is temporarily lost, the notification is dropped (SSE handlers will catch up on the next successful notification).

func (*PGChangeRelay) Close

func (r *PGChangeRelay) Close() error

Close stops the relay.

func (*PGChangeRelay) Start

func (r *PGChangeRelay) Start(ctx context.Context)

Start begins listening for PG NOTIFY messages and forwarding them to the local ChangeNotifier. Blocks until ctx is cancelled or Close is called. Auto-reconnects on connection loss.

func (*PGChangeRelay) Wait

func (r *PGChangeRelay) Wait()

Wait blocks until the relay's listen loop has fully stopped.

type PGChangeRelayOption

type PGChangeRelayOption func(*PGChangeRelay)

PGChangeRelayOption configures a PGChangeRelay.

func WithBroadcastTimeout

func WithBroadcastTimeout(d time.Duration) PGChangeRelayOption

WithBroadcastTimeout sets the timeout for each PG NOTIFY broadcast. Default: 2s.

func WithReconnectDelay

func WithReconnectDelay(d time.Duration) PGChangeRelayOption

WithReconnectDelay sets the delay between reconnection attempts when the PostgreSQL LISTEN connection is lost. Default: 1s.

type PGCheckpoint

type PGCheckpoint struct {
	// contains filtered or unexported fields
}

PGCheckpoint implements subscription.Checkpoint backed by PostgreSQL.

func NewPGCheckpoint

func NewPGCheckpoint(ctx context.Context, pool *pgxpool.Pool) (*PGCheckpoint, error)

NewPGCheckpoint creates a PostgreSQL-backed checkpoint store.

func (*PGCheckpoint) Load

func (c *PGCheckpoint) Load(ctx context.Context, consumerID string) (uint64, error)

Load returns the last processed sequence for a consumer. Returns 0 if new.

func (*PGCheckpoint) Save

func (c *PGCheckpoint) Save(ctx context.Context, consumerID string, sequence uint64) error

Save persists the consumer's position atomically.

type PGGlobalReader

type PGGlobalReader[E any] struct {
	// contains filtered or unexported fields
}

PGGlobalReader implements subscription.GlobalReader for PostgreSQL. Reads events by global sequence (the events.id column).

func NewPGGlobalReader

func NewPGGlobalReader[E any](pool *pgxpool.Pool, opts ...GlobalReaderOption[E]) *PGGlobalReader[E]

NewPGGlobalReader creates a global reader backed by a pgxpool.

func (*PGGlobalReader[E]) LatestSequence

func (r *PGGlobalReader[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence returns the highest global sequence in the store, or 0 if empty.

func (*PGGlobalReader[E]) ReadFrom

func (r *PGGlobalReader[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)

ReadFrom returns events starting from the given global sequence (inclusive), up to limit.

type PGNotifier

type PGNotifier struct {
	// contains filtered or unexported fields
}

PGNotifier implements subscription.StoreNotifier using PostgreSQL LISTEN/NOTIFY. Auto-reconnects on connection loss. Safe for concurrent use.

func NewPGNotifier

func NewPGNotifier(pool *pgxpool.Pool, channel string) *PGNotifier

NewPGNotifier creates a PGNotifier that listens on the given channel. Call Start() to begin listening, or pass to subscription as a StoreNotifier.

func (*PGNotifier) Close

func (n *PGNotifier) Close() error

Close stops the notifier and closes all listener channels.

func (*PGNotifier) Notify

func (n *PGNotifier) Notify(ctx context.Context) <-chan uint64

Notify returns a channel that receives the latest global sequence when new events are appended. Implements subscription.StoreNotifier.

func (*PGNotifier) Start

func (n *PGNotifier) Start(ctx context.Context)

Start begins listening for NOTIFY messages. Blocks until ctx is cancelled or Close is called. Auto-reconnects on connection loss.

func (*PGNotifier) Wait

func (n *PGNotifier) Wait()

Wait blocks until the notifier's listen loop has fully stopped.

type PartitionManager

type PartitionManager struct {
	// contains filtered or unexported fields
}

PartitionManager manages competing partition assignment using PostgreSQL. Instances claim partitions with SELECT ... FOR UPDATE SKIP LOCKED and renew leases periodically. Dead instances' partitions are auto-reclaimed.

func NewPartitionManager

func NewPartitionManager(ctx context.Context, pool *pgxpool.Pool, instanceID string, opts ...PartitionOption) (*PartitionManager, error)

NewPartitionManager creates a partition manager. Runs migration to create the partitions table and seed initial partition rows.

func (*PartitionManager) Close

func (pm *PartitionManager) Close() error

Close stops the partition manager and releases all partitions.

func (*PartitionManager) OwnedPartitions

func (pm *PartitionManager) OwnedPartitions() []int

OwnedPartitions returns the partitions currently owned by this instance.

func (*PartitionManager) OwnsPartition

func (pm *PartitionManager) OwnsPartition(partitionID int) bool

OwnsPartition returns true if this instance owns the given partition.

func (*PartitionManager) PartitionForStream

func (pm *PartitionManager) PartitionForStream(streamID string) int

PartitionForStream maps a stream ID to a partition using consistent hashing.

func (*PartitionManager) Start

func (pm *PartitionManager) Start(ctx context.Context)

Start begins claiming and renewing partitions. Blocks until ctx is cancelled or Close.

func (*PartitionManager) Wait

func (pm *PartitionManager) Wait()

Wait blocks until the partition manager has stopped.

type PartitionOption

type PartitionOption func(*PartitionManager)

PartitionOption configures a PartitionManager.

func WithLeaseTimeout

func WithLeaseTimeout(d time.Duration) PartitionOption

WithLeaseTimeout sets the lease timeout.

func WithPartitionCount

func WithPartitionCount(n int) PartitionOption

WithPartitionCount sets the number of partitions.

func WithRenewInterval

func WithRenewInterval(d time.Duration) PartitionOption

WithRenewInterval sets the lease renewal interval.

type PgLockRegistry

type PgLockRegistry struct {
	// contains filtered or unexported fields
}

PgLockRegistry is a distributed LockRegistry backed by PostgreSQL advisory locks. Uses pg_advisory_lock which is session-scoped and automatically released on disconnect.

NOTE: CockroachDB does not support advisory locks. For CockroachDB, use natslock.NATSLockRegistry (from git.nullsoft.is/ash/eskit/natslock) or implement a table-based lock with CAS semantics.

func NewPgLockRegistry

func NewPgLockRegistry(pool *pgxpool.Pool) *PgLockRegistry

NewPgLockRegistry creates a PostgreSQL advisory lock registry.

func (*PgLockRegistry) Acquire

func (r *PgLockRegistry) Acquire(ctx context.Context, streamID string) (func(), error)

Acquire obtains an advisory lock for the given stream ID. Blocks until acquired. The lock is held for the duration of the returned connection. Call release to return the connection to the pool and release the lock.

func (*PgLockRegistry) TryAcquire

func (r *PgLockRegistry) TryAcquire(streamID string) (func(), bool)

TryAcquire attempts to obtain the advisory lock without blocking.

type SnapshotStore

type SnapshotStore[S any] struct {
	// contains filtered or unexported fields
}

SnapshotStore is a PostgreSQL-backed snapshot store. Implements eskit.SnapshotStore with schema versioning and timestamps.

func NewSnapshotStore

func NewSnapshotStore[S any](ctx context.Context, pool *pgxpool.Pool) (*SnapshotStore[S], error)

NewSnapshotStore creates a PostgreSQL-backed snapshot store. Runs migration on creation.

func (*SnapshotStore[S]) Invalidate

func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error

Invalidate deletes the snapshot for a single stream.

func (*SnapshotStore[S]) InvalidateAll

func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error

InvalidateAll deletes all snapshots.

func (*SnapshotStore[S]) LoadSnapshot

func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)

LoadSnapshot loads the latest snapshot for a stream. Returns nil, nil if not found.

func (*SnapshotStore[S]) SaveSnapshot

func (s *SnapshotStore[S]) SaveSnapshot(ctx context.Context, snapshot eskit.Snapshot[S]) error

SaveSnapshot persists a snapshot of decider state. Upserts on stream_id.

type Store

type Store[E any] struct {
	// contains filtered or unexported fields
}

Store is a PostgreSQL-backed event store using pgxpool for connection pooling. Events are serialized as JSONB for queryability and debuggability.

func New

func New[E any](ctx context.Context, connString string, opts ...StoreOption) (*Store[E], error)

New creates a new PostgreSQL event store from a connection string. It also runs the migration to ensure the events table exists.

func NewFromPool

func NewFromPool[E any](ctx context.Context, pool *pgxpool.Pool, opts ...StoreOption) (*Store[E], error)

NewFromPool wraps an existing pgxpool.Pool. Runs migration on creation.

func (*Store[E]) Append

func (s *Store[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, metadata ...eskit.Metadata) ([]eskit.Event[E], error)

func (*Store[E]) AppendWithOptions

func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, opts eskit.AppendOptions) ([]eskit.Event[E], error)

AppendWithOptions persists events with idempotency and custom timestamp support.

func (*Store[E]) Archive

func (s *Store[E]) Archive(ctx context.Context, streamID string, target eskit.EventStore[E]) error

Archive moves a stream to the target store and tombstones the primary.

func (*Store[E]) ArchiveStream

func (s *Store[E]) ArchiveStream(ctx context.Context, streamID string) error

ArchiveStream marks a stream as archived. Future appends are rejected.

func (*Store[E]) Close

func (s *Store[E]) Close() error

Close closes the underlying connection pool.

func (*Store[E]) Delete

func (s *Store[E]) Delete(ctx context.Context, streamID string) error

Delete permanently removes all events for a stream. Returns ErrStreamNotFound if stream does not exist.

func (*Store[E]) DeleteStream

func (s *Store[E]) DeleteStream(ctx context.Context, streamID string) error

DeleteStream permanently removes all events in a stream.

func (*Store[E]) IsTombstoned

func (s *Store[E]) IsTombstoned(ctx context.Context, streamID string) (*eskit.Tombstone, error)

IsTombstoned checks if a stream has been tombstoned.

func (*Store[E]) LatestSequence

func (s *Store[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence returns the highest global sequence in the store, or 0 if empty.

func (*Store[E]) Load

func (s *Store[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)

func (*Store[E]) LoadFrom

func (s *Store[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)

func (*Store[E]) LoadRaw

func (s *Store[E]) LoadRaw(ctx context.Context, streamID string) ([]*eskit.RawEvent, error)

LoadRaw loads events without deserializing the Data field.

func (*Store[E]) LoadRawWithOptions

func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)

LoadRawWithOptions loads raw events with optional filtering.

func (*Store[E]) LoadWithOptions

func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)

LoadWithOptions loads events with server-side filtering (event types, version range, limit).

func (*Store[E]) Pool

func (s *Store[E]) Pool() *pgxpool.Pool

Pool returns the underlying connection pool for advanced use cases (e.g. creating a PGNotifier or PGGlobalReader).

func (*Store[E]) ReadFrom

func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)

ReadFrom reads events across all streams by global sequence. Implements subscription.StoreReader for direct use with StoreAdapter.

func (*Store[E]) ReadFromWithOptions

func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)

ReadFromWithOptions reads global events with optional event type filtering.

func (*Store[E]) Restore

func (s *Store[E]) Restore(ctx context.Context, streamID string, source eskit.EventStore[E]) error

Restore moves an archived stream back from the source store.

func (*Store[E]) RestoreStream

func (s *Store[E]) RestoreStream(ctx context.Context, streamID string) error

RestoreStream brings an archived stream back to active state.

func (*Store[E]) StreamStatus

func (s *Store[E]) StreamStatus(ctx context.Context, streamID string) (eskit.StreamState, error)

StreamStatus returns the current lifecycle state of a stream.

func (*Store[E]) Tombstone

func (s *Store[E]) Tombstone(ctx context.Context, streamID string, reason string) error

Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted.

func (*Store[E]) TombstoneStream

func (s *Store[E]) TombstoneStream(ctx context.Context, streamID string) error

TombstoneStream marks a stream as deleted. Future appends are rejected.

func (*Store[E]) Truncate

func (s *Store[E]) Truncate(ctx context.Context) error

Truncate removes all events from the store. Intended for test isolation.

type StoreOption

type StoreOption func(*storeOptions)

StoreOption configures a Store.

func WithNotifyChannel

func WithNotifyChannel(channel string) StoreOption

WithNotifyChannel enables NOTIFY after each Append with the given channel name. Use with PGNotifier for instant subscription wake-up.

func WithPGCodec

func WithPGCodec(c codec.Codec) StoreOption

WithPGUpcasters enables event upcasting for schema evolution during Load. WithPGCodec sets a custom codec for event serialization and registers it for reads. By default, events are serialized as JSON using encoding/json. For multi-codec migration, use WithPGWriteCodec and WithPGCodecRegistry instead.

func WithPGCodecRegistry

func WithPGCodecRegistry(r *codec.Registry) StoreOption

WithPGCodecRegistry sets the registry used to look up codecs when reading events. If not set, a default registry with JSON, JSONiter, and CBOR is used.

func WithPGRegistry

func WithPGRegistry(reg *eskit.EventRegistry) StoreOption

WithPGRegistry enables type registry for heterogeneous event deserialization.

func WithPGUpcasters

func WithPGUpcasters(u *eskit.UpcasterRegistry) StoreOption

func WithPGWriteCodec

func WithPGWriteCodec(c codec.Codec) StoreOption

WithPGWriteCodec sets the codec used for writing NEW events. Existing events are read using the codec stored in each event's codec column.

type SubscriptionAdapter

type SubscriptionAdapter[E any] struct {
	// contains filtered or unexported fields
}

SubscriptionAdapter adapts PGGlobalReader into a subscription.GlobalReader. This bridges the type gap between pgstore.GlobalEvent and subscription.GlobalEvent.

func NewSubscriptionAdapter

func NewSubscriptionAdapter[E any](reader *PGGlobalReader[E]) *SubscriptionAdapter[E]

NewSubscriptionAdapter wraps a PGGlobalReader for use with the subscription system.

func (*SubscriptionAdapter[E]) LatestSequence

func (a *SubscriptionAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence delegates to the underlying reader.

func (*SubscriptionAdapter[E]) ReadFrom

func (a *SubscriptionAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]subscription.GlobalEvent[E], error)

ReadFrom reads events and converts pgstore.GlobalEvent to subscription.GlobalEvent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL