Documentation
¶
Overview ¶
Package pgstore provides PostgreSQL-backed stores for events, snapshots, and checkpoints.
PGNotifier implements subscription.StoreNotifier using PostgreSQL LISTEN/NOTIFY. After each event append, NOTIFY is sent on the configured channel with the latest global sequence. Listeners wake instantly instead of polling.
Package pgstore provides a PostgreSQL-backed EventStore implementation using pgx/v5.
Events are stored in a single table with JSONB data, using transaction-based optimistic concurrency control. The UNIQUE(stream_id, version) constraint acts as a final safety net against duplicate versions.
Index ¶
- Constants
- func NotifyAppend(ctx context.Context, pool *pgxpool.Pool, channel string, sequence int64) error
- type CheckpointStore
- type ClusterOption
- type ClusterStore
- func (s *ClusterStore[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, ...) ([]eskit.Event[E], error)
- func (s *ClusterStore[E]) Close()
- func (s *ClusterStore[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)
- func (s *ClusterStore[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)
- func (s *ClusterStore[E]) Ping(ctx context.Context) error
- type GlobalEvent
- type GlobalReaderOption
- type PGChangeRelay
- type PGChangeRelayOption
- type PGCheckpoint
- type PGGlobalReader
- type PGNotifier
- type PartitionManager
- func (pm *PartitionManager) Close() error
- func (pm *PartitionManager) OwnedPartitions() []int
- func (pm *PartitionManager) OwnsPartition(partitionID int) bool
- func (pm *PartitionManager) PartitionForStream(streamID string) int
- func (pm *PartitionManager) Start(ctx context.Context)
- func (pm *PartitionManager) Wait()
- type PartitionOption
- type PgLockRegistry
- type SnapshotStore
- func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error
- func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error
- func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)
- func (s *SnapshotStore[S]) SaveSnapshot(ctx context.Context, snapshot eskit.Snapshot[S]) error
- type Store
- func (s *Store[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, ...) ([]eskit.Event[E], error)
- func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, ...) ([]eskit.Event[E], error)
- func (s *Store[E]) Archive(ctx context.Context, streamID string, target eskit.EventStore[E]) error
- func (s *Store[E]) ArchiveStream(ctx context.Context, streamID string) error
- func (s *Store[E]) Close() error
- func (s *Store[E]) Delete(ctx context.Context, streamID string) error
- func (s *Store[E]) DeleteStream(ctx context.Context, streamID string) error
- func (s *Store[E]) IsTombstoned(ctx context.Context, streamID string) (*eskit.Tombstone, error)
- func (s *Store[E]) LatestSequence(ctx context.Context) (uint64, error)
- func (s *Store[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)
- func (s *Store[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)
- func (s *Store[E]) LoadRaw(ctx context.Context, streamID string) ([]*eskit.RawEvent, error)
- func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)
- func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)
- func (s *Store[E]) Pool() *pgxpool.Pool
- func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)
- func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)
- func (s *Store[E]) Restore(ctx context.Context, streamID string, source eskit.EventStore[E]) error
- func (s *Store[E]) RestoreStream(ctx context.Context, streamID string) error
- func (s *Store[E]) StreamStatus(ctx context.Context, streamID string) (eskit.StreamState, error)
- func (s *Store[E]) Tombstone(ctx context.Context, streamID string, reason string) error
- func (s *Store[E]) TombstoneStream(ctx context.Context, streamID string) error
- func (s *Store[E]) Truncate(ctx context.Context) error
- type StoreOption
- func WithNotifyChannel(channel string) StoreOption
- func WithPGCodec(c codec.Codec) StoreOption
- func WithPGCodecRegistry(r *codec.Registry) StoreOption
- func WithPGRegistry(reg *eskit.EventRegistry) StoreOption
- func WithPGUpcasters(u *eskit.UpcasterRegistry) StoreOption
- func WithPGWriteCodec(c codec.Codec) StoreOption
- type SubscriptionAdapter
Constants ¶
const ( // DefaultLeaseTimeout is how long a partition lease is valid without renewal. DefaultLeaseTimeout = 30 * time.Second // DefaultRenewInterval is how often to renew the lease. DefaultRenewInterval = 10 * time.Second // DefaultPartitionCount is the default number of partitions. DefaultPartitionCount = 8 )
const (
// DefaultChangeChannel is the default PG NOTIFY channel for projection changes.
DefaultChangeChannel = "eskit_changes"
)
const (
// DefaultChannel is the default NOTIFY channel name.
DefaultChannel = "eskit_events"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CheckpointStore ¶
type CheckpointStore struct {
// contains filtered or unexported fields
}
CheckpointStore is a PostgreSQL-backed checkpoint for event subscriptions. Tracks each consumer's last processed global sequence number. Thread-safe: PostgreSQL handles concurrent access.
func NewCheckpointStore ¶
NewCheckpointStore creates a PostgreSQL-backed checkpoint store. Creates the checkpoints table if it doesn't exist.
type ClusterOption ¶
type ClusterOption func(*clusterConfig)
ClusterOption configures a ClusterStore.
func WithPool ¶
func WithPool(pool *pgxpool.Pool) ClusterOption
WithPool sets a single pool for both reads and writes. Convenience for simple deployments without read replicas.
func WithReadPool ¶
func WithReadPool(pool *pgxpool.Pool) ClusterOption
WithReadPool sets the replica pool for read operations. If not set, reads use the write pool.
func WithWritePool ¶
func WithWritePool(pool *pgxpool.Pool) ClusterOption
WithWritePool sets the primary pool for write operations.
type ClusterStore ¶
type ClusterStore[E any] struct { // contains filtered or unexported fields }
ClusterStore is a PostgreSQL event store with read/write pool splitting. Writes go to the primary, reads can go to replicas for horizontal read scaling.
Compatible with:
- Standard PostgreSQL (primary + replicas)
- Citus (distributed Postgres)
- Neon (serverless Postgres)
- YugabyteDB (Postgres wire protocol)
- CockroachDB (Postgres wire protocol — see advisory lock notes)
For CockroachDB: advisory locks are not supported. Use the CAS-based optimistic concurrency (the UNIQUE constraint) as the sole safety net, or use a separate locks table.
func NewClusterStore ¶
func NewClusterStore[E any](ctx context.Context, opts ...ClusterOption) (*ClusterStore[E], error)
NewClusterStore creates a PostgreSQL event store with read/write pool splitting. Runs migration on the write pool.
func (*ClusterStore[E]) Append ¶
func (s *ClusterStore[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, metadata ...eskit.Metadata) ([]eskit.Event[E], error)
Append persists events using the WRITE pool (primary).
func (*ClusterStore[E]) Close ¶
func (s *ClusterStore[E]) Close()
Close closes both connection pools.
type GlobalEvent ¶
type GlobalEvent[E any] struct { GlobalSequence uint64 StreamID string EventType string Version int Data E Timestamp time.Time }
GlobalEvent is a store-wide event with a global sequence number. Compatible with subscription.GlobalEvent but decoupled to avoid import cycles.
type GlobalReaderOption ¶
type GlobalReaderOption[E any] func(*PGGlobalReader[E])
GlobalReaderOption configures a PGGlobalReader.
func WithGlobalReaderCodecRegistry ¶
func WithGlobalReaderCodecRegistry[E any](reg *codec.Registry) GlobalReaderOption[E]
WithGlobalReaderCodecRegistry sets the codec registry for reading events with different codecs.
func WithGlobalReaderRegistry ¶
func WithGlobalReaderRegistry[E any](reg *eskit.EventRegistry) GlobalReaderOption[E]
WithGlobalReaderRegistry enables type registry for heterogeneous global reads.
type PGChangeRelay ¶
type PGChangeRelay struct {
// contains filtered or unexported fields
}
PGChangeRelay broadcasts projection changes across server instances using PostgreSQL LISTEN/NOTIFY. Use for shared-consumer projections where only one server processes each event and other servers need to be notified.
Flow:
- Server A processes event → OnChange fires → Broadcast() sends PG NOTIFY
- All servers (including A) receive the NOTIFY via Start()
- Each server forwards the change to its local ChangeNotifier
- SSE handlers on all servers get notified
Safe for concurrent use.
func NewPGChangeRelay ¶
func NewPGChangeRelay(pool *pgxpool.Pool, channel string, notifier *eskit.ChangeNotifier, opts ...PGChangeRelayOption) *PGChangeRelay
NewPGChangeRelay creates a relay that bridges ChangeNotifier across servers using PostgreSQL LISTEN/NOTIFY.
The notifier receives all changes detected by this relay (from any server). Call Start() to begin listening.
func (*PGChangeRelay) Broadcast ¶
func (r *PGChangeRelay) Broadcast(change eskit.Change)
Broadcast sends a change notification to all servers via PG NOTIFY. Also notifies the local ChangeNotifier directly for immediate delivery to SSE handlers on this server.
Non-blocking: the PG NOTIFY is fire-and-forget. If the database connection is temporarily lost, the notification is dropped (SSE handlers will catch up on the next successful notification).
func (*PGChangeRelay) Start ¶
func (r *PGChangeRelay) Start(ctx context.Context)
Start begins listening for PG NOTIFY messages and forwarding them to the local ChangeNotifier. Blocks until ctx is cancelled or Close is called. Auto-reconnects on connection loss.
func (*PGChangeRelay) Wait ¶
func (r *PGChangeRelay) Wait()
Wait blocks until the relay's listen loop has fully stopped.
type PGChangeRelayOption ¶
type PGChangeRelayOption func(*PGChangeRelay)
PGChangeRelayOption configures a PGChangeRelay.
func WithBroadcastTimeout ¶
func WithBroadcastTimeout(d time.Duration) PGChangeRelayOption
WithBroadcastTimeout sets the timeout for each PG NOTIFY broadcast. Default: 2s.
func WithReconnectDelay ¶
func WithReconnectDelay(d time.Duration) PGChangeRelayOption
WithReconnectDelay sets the delay between reconnection attempts when the PostgreSQL LISTEN connection is lost. Default: 1s.
type PGCheckpoint ¶
type PGCheckpoint struct {
// contains filtered or unexported fields
}
PGCheckpoint implements subscription.Checkpoint backed by PostgreSQL.
func NewPGCheckpoint ¶
NewPGCheckpoint creates a PostgreSQL-backed checkpoint store.
type PGGlobalReader ¶
type PGGlobalReader[E any] struct { // contains filtered or unexported fields }
PGGlobalReader implements subscription.GlobalReader for PostgreSQL. Reads events by global sequence (the events.id column).
func NewPGGlobalReader ¶
func NewPGGlobalReader[E any](pool *pgxpool.Pool, opts ...GlobalReaderOption[E]) *PGGlobalReader[E]
NewPGGlobalReader creates a global reader backed by a pgxpool.
func (*PGGlobalReader[E]) LatestSequence ¶
func (r *PGGlobalReader[E]) LatestSequence(ctx context.Context) (uint64, error)
LatestSequence returns the highest global sequence in the store, or 0 if empty.
func (*PGGlobalReader[E]) ReadFrom ¶
func (r *PGGlobalReader[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
ReadFrom returns events starting from the given global sequence (inclusive), up to limit.
type PGNotifier ¶
type PGNotifier struct {
// contains filtered or unexported fields
}
PGNotifier implements subscription.StoreNotifier using PostgreSQL LISTEN/NOTIFY. Auto-reconnects on connection loss. Safe for concurrent use.
func NewPGNotifier ¶
func NewPGNotifier(pool *pgxpool.Pool, channel string) *PGNotifier
NewPGNotifier creates a PGNotifier that listens on the given channel. Call Start() to begin listening, or pass to subscription as a StoreNotifier.
func (*PGNotifier) Close ¶
func (n *PGNotifier) Close() error
Close stops the notifier and closes all listener channels.
func (*PGNotifier) Notify ¶
func (n *PGNotifier) Notify(ctx context.Context) <-chan uint64
Notify returns a channel that receives the latest global sequence when new events are appended. Implements subscription.StoreNotifier.
func (*PGNotifier) Start ¶
func (n *PGNotifier) Start(ctx context.Context)
Start begins listening for NOTIFY messages. Blocks until ctx is cancelled or Close is called. Auto-reconnects on connection loss.
func (*PGNotifier) Wait ¶
func (n *PGNotifier) Wait()
Wait blocks until the notifier's listen loop has fully stopped.
type PartitionManager ¶
type PartitionManager struct {
// contains filtered or unexported fields
}
PartitionManager manages competing partition assignment using PostgreSQL. Instances claim partitions with SELECT ... FOR UPDATE SKIP LOCKED and renew leases periodically. Dead instances' partitions are auto-reclaimed.
func NewPartitionManager ¶
func NewPartitionManager(ctx context.Context, pool *pgxpool.Pool, instanceID string, opts ...PartitionOption) (*PartitionManager, error)
NewPartitionManager creates a partition manager. Runs migration to create the partitions table and seed initial partition rows.
func (*PartitionManager) Close ¶
func (pm *PartitionManager) Close() error
Close stops the partition manager and releases all partitions.
func (*PartitionManager) OwnedPartitions ¶
func (pm *PartitionManager) OwnedPartitions() []int
OwnedPartitions returns the partitions currently owned by this instance.
func (*PartitionManager) OwnsPartition ¶
func (pm *PartitionManager) OwnsPartition(partitionID int) bool
OwnsPartition returns true if this instance owns the given partition.
func (*PartitionManager) PartitionForStream ¶
func (pm *PartitionManager) PartitionForStream(streamID string) int
PartitionForStream maps a stream ID to a partition using consistent hashing.
func (*PartitionManager) Start ¶
func (pm *PartitionManager) Start(ctx context.Context)
Start begins claiming and renewing partitions. Blocks until ctx is cancelled or Close.
func (*PartitionManager) Wait ¶
func (pm *PartitionManager) Wait()
Wait blocks until the partition manager has stopped.
type PartitionOption ¶
type PartitionOption func(*PartitionManager)
PartitionOption configures a PartitionManager.
func WithLeaseTimeout ¶
func WithLeaseTimeout(d time.Duration) PartitionOption
WithLeaseTimeout sets the lease timeout.
func WithPartitionCount ¶
func WithPartitionCount(n int) PartitionOption
WithPartitionCount sets the number of partitions.
func WithRenewInterval ¶
func WithRenewInterval(d time.Duration) PartitionOption
WithRenewInterval sets the lease renewal interval.
type PgLockRegistry ¶
type PgLockRegistry struct {
// contains filtered or unexported fields
}
PgLockRegistry is a distributed LockRegistry backed by PostgreSQL advisory locks. Uses pg_advisory_lock which is session-scoped and automatically released on disconnect.
NOTE: CockroachDB does not support advisory locks. For CockroachDB, use natslock.NATSLockRegistry (from git.nullsoft.is/ash/eskit/natslock) or implement a table-based lock with CAS semantics.
func NewPgLockRegistry ¶
func NewPgLockRegistry(pool *pgxpool.Pool) *PgLockRegistry
NewPgLockRegistry creates a PostgreSQL advisory lock registry.
func (*PgLockRegistry) Acquire ¶
func (r *PgLockRegistry) Acquire(ctx context.Context, streamID string) (func(), error)
Acquire obtains an advisory lock for the given stream ID. Blocks until acquired. The lock is held for the duration of the returned connection. Call release to return the connection to the pool and release the lock.
func (*PgLockRegistry) TryAcquire ¶
func (r *PgLockRegistry) TryAcquire(streamID string) (func(), bool)
TryAcquire attempts to obtain the advisory lock without blocking.
type SnapshotStore ¶
type SnapshotStore[S any] struct { // contains filtered or unexported fields }
SnapshotStore is a PostgreSQL-backed snapshot store. Implements eskit.SnapshotStore with schema versioning and timestamps.
func NewSnapshotStore ¶
NewSnapshotStore creates a PostgreSQL-backed snapshot store. Runs migration on creation.
func (*SnapshotStore[S]) Invalidate ¶
func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error
Invalidate deletes the snapshot for a single stream.
func (*SnapshotStore[S]) InvalidateAll ¶
func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error
InvalidateAll deletes all snapshots.
func (*SnapshotStore[S]) LoadSnapshot ¶
func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)
LoadSnapshot loads the latest snapshot for a stream. Returns nil, nil if not found.
func (*SnapshotStore[S]) SaveSnapshot ¶
SaveSnapshot persists a snapshot of decider state. Upserts on stream_id.
type Store ¶
type Store[E any] struct { // contains filtered or unexported fields }
Store is a PostgreSQL-backed event store using pgxpool for connection pooling. Events are serialized as JSONB for queryability and debuggability.
func New ¶
New creates a new PostgreSQL event store from a connection string. It also runs the migration to ensure the events table exists.
func NewFromPool ¶
func NewFromPool[E any](ctx context.Context, pool *pgxpool.Pool, opts ...StoreOption) (*Store[E], error)
NewFromPool wraps an existing pgxpool.Pool. Runs migration on creation.
func (*Store[E]) AppendWithOptions ¶
func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, opts eskit.AppendOptions) ([]eskit.Event[E], error)
AppendWithOptions persists events with idempotency and custom timestamp support.
func (*Store[E]) ArchiveStream ¶
ArchiveStream marks a stream as archived. Future appends are rejected.
func (*Store[E]) Delete ¶
Delete permanently removes all events for a stream. Returns ErrStreamNotFound if stream does not exist.
func (*Store[E]) DeleteStream ¶
DeleteStream permanently removes all events in a stream.
func (*Store[E]) IsTombstoned ¶
IsTombstoned checks if a stream has been tombstoned.
func (*Store[E]) LatestSequence ¶
LatestSequence returns the highest global sequence in the store, or 0 if empty.
func (*Store[E]) LoadRawWithOptions ¶
func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)
LoadRawWithOptions loads raw events with optional filtering.
func (*Store[E]) LoadWithOptions ¶
func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)
LoadWithOptions loads events with server-side filtering (event types, version range, limit).
func (*Store[E]) Pool ¶
Pool returns the underlying connection pool for advanced use cases (e.g. creating a PGNotifier or PGGlobalReader).
func (*Store[E]) ReadFrom ¶
func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)
ReadFrom reads events across all streams by global sequence. Implements subscription.StoreReader for direct use with StoreAdapter.
func (*Store[E]) ReadFromWithOptions ¶
func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)
ReadFromWithOptions reads global events with optional event type filtering.
func (*Store[E]) RestoreStream ¶
RestoreStream brings an archived stream back to active state.
func (*Store[E]) StreamStatus ¶
StreamStatus returns the current lifecycle state of a stream.
func (*Store[E]) Tombstone ¶
Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted.
func (*Store[E]) TombstoneStream ¶
TombstoneStream marks a stream as deleted. Future appends are rejected.
type StoreOption ¶
type StoreOption func(*storeOptions)
StoreOption configures a Store.
func WithNotifyChannel ¶
func WithNotifyChannel(channel string) StoreOption
WithNotifyChannel enables NOTIFY after each Append with the given channel name. Use with PGNotifier for instant subscription wake-up.
func WithPGCodec ¶
func WithPGCodec(c codec.Codec) StoreOption
WithPGUpcasters enables event upcasting for schema evolution during Load. WithPGCodec sets a custom codec for event serialization and registers it for reads. By default, events are serialized as JSON using encoding/json. For multi-codec migration, use WithPGWriteCodec and WithPGCodecRegistry instead.
func WithPGCodecRegistry ¶
func WithPGCodecRegistry(r *codec.Registry) StoreOption
WithPGCodecRegistry sets the registry used to look up codecs when reading events. If not set, a default registry with JSON, JSONiter, and CBOR is used.
func WithPGRegistry ¶
func WithPGRegistry(reg *eskit.EventRegistry) StoreOption
WithPGRegistry enables type registry for heterogeneous event deserialization.
func WithPGUpcasters ¶
func WithPGUpcasters(u *eskit.UpcasterRegistry) StoreOption
func WithPGWriteCodec ¶
func WithPGWriteCodec(c codec.Codec) StoreOption
WithPGWriteCodec sets the codec used for writing NEW events. Existing events are read using the codec stored in each event's codec column.
type SubscriptionAdapter ¶
type SubscriptionAdapter[E any] struct { // contains filtered or unexported fields }
SubscriptionAdapter adapts PGGlobalReader into a subscription.GlobalReader. This bridges the type gap between pgstore.GlobalEvent and subscription.GlobalEvent.
func NewSubscriptionAdapter ¶
func NewSubscriptionAdapter[E any](reader *PGGlobalReader[E]) *SubscriptionAdapter[E]
NewSubscriptionAdapter wraps a PGGlobalReader for use with the subscription system.
func (*SubscriptionAdapter[E]) LatestSequence ¶
func (a *SubscriptionAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)
LatestSequence delegates to the underlying reader.
func (*SubscriptionAdapter[E]) ReadFrom ¶
func (a *SubscriptionAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]subscription.GlobalEvent[E], error)
ReadFrom reads events and converts pgstore.GlobalEvent to subscription.GlobalEvent.