physalis

package module
v0.0.0-...-05c1de5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: MIT Imports: 23 Imported by: 0

README

Physalis

An experimental state store built on an append-only event log.

Motivation (Why)

The core idea is simple: we persist an immutable event log, and build the application state by reducing (replaying) those events.

Defining events is usually straightforward: you can describe what happened and what data it must carry. Defining the state model up-front is harder: it evolves continuously as requirements change.

With a relational database this typically translates into schema migrations and refactoring existing tables/queries as the domain model shifts. With an append-only event log, the source of truth stays stable: when you change how state should look, you can rebuild it by replaying the already recorded events.

Another motivation is decoupling application state from SQL-centric thinking. In many systems—regardless of the language the app is written in—you end up modeling your domain around an SQL database (e.g., PostgreSQL): you constantly keep in mind how data must fit tables and queries, map language types to SQL types, and shape your data around the database.

Physalis is, in a way, an attempt to answer the question: how could we store and evolve application state if SQL had never been invented?

This approach is useful when you want:

  • High flexibility under changing requirements
  • The ability to re-interpret history with a new state model
  • Less lock-in to early architectural decisions
  • Confidence that you can “redo it properly” without losing data
  • Less coupling between your domain model and SQL/storage schemas

Status: Early-stage research. Expect rapid iteration and breaking changes.

Warning: Not ready for production—do not use in your products yet.

Optimizations

Physalis processes events in groups (chunks).

Within a group, events are reduced in-memory; the reducer state and its KV datasets are written to the DB only after the whole group has been processed.

This is especially useful when rebuilding a reducer from scratch (e.g. after bumping its version to reflect a logic change, or when adding a new reducer).

Example: Basic reducer + per-group KV

Simple example: examples/basic/basic.go

package main

import (
	"fmt"
	"iter"

	bolt "go.etcd.io/bbolt"

	"github.com/lymar/physalis"
)

// AppEvent is the application's event envelope (a union of all event kinds).
type AppEvent struct {
	Win  *Win
	Loss *Loss
}

// Win represents a "player won points" event.
type Win struct {
	Player string
	Points int
}

// Loss represents a "player lost points" event.
type Loss struct {
	Player string
	Points int
}

// AppReducerState is the reducer state for a single group (player).
//
// Reducers are the core building block of Physalis: they build state by replaying
// (reducing) an append-only stream of events.
type AppReducerState struct {
	Points int
}

// AppReducer is a reducer that maintains per-player points.
type AppReducer struct {
	version string
}

// Version returns the reducer version.
//
// When the version changes, Physalis rebuilds the reducer state from scratch by
// replaying the event log. This is how you can evolve your state model safely:
// change reducer logic, bump the version, and let Physalis recompute.
func (rd *AppReducer) Version() string {
	return rd.version
}

// Prepare returns a group key and (optionally) a transformed event.
//
// It acts as a filter.
//
// Reducer state is maintained independently per group. In this example, the
// grouping key is the player name.
func (rd *AppReducer) Prepare(
	ev *physalis.Event[AppEvent],
) (string, *physalis.Event[AppEvent]) {
	if ev.Payload.Win != nil {
		return ev.Payload.Win.Player, nil
	} else if ev.Payload.Loss != nil {
		return ev.Payload.Loss.Player, nil
	}
	panic("invalid event")
}

// Apply applies events to the reducer state.
//
// This is where you implement your state-building logic. In this example we:
// - accumulate player's total points
// - track "lucky" minutes (minutes when the player gained points)
func (rd *AppReducer) Apply(
	runtime *physalis.ReducerRuntime,
	state *AppReducerState,
	groupKey string,
	evs iter.Seq2[uint64, *physalis.Event[AppEvent]]) *AppReducerState {

	// In addition to the main reducer state, Physalis can persist per-group
	// key/value datasets (KV stores). Here we record how many points were earned
	// in each "lucky" minute.
	luckyMinutes := physalis.OpenKV[uint32, int](runtime, "lucky_minutes")

	if state == nil {
		state = &AppReducerState{}
	}

	for _, ev := range evs {
		// Use the event's recorded timestamp.
		minute := uint32(ev.ReadTimestamp().Minute())
		if ev.Payload.Win != nil {
			state.Points += ev.Payload.Win.Points

			prevVal := luckyMinutes.Get(minute)
			if prevVal == nil {
				luckyMinutes.Put(minute, &ev.Payload.Win.Points)
			} else {
				newVal := *prevVal + ev.Payload.Win.Points
				luckyMinutes.Put(minute, &newVal)
			}
		} else if ev.Payload.Loss != nil {
			state.Points -= ev.Payload.Loss.Points
		}
	}

	return state
}

func main() {
	registry := physalis.NewReducerRegistry[AppEvent]()
	// Register a reducer in the registry (you can have many reducers).
	// AddReducer returns a reader used to query that reducer's state.
	reader, err := physalis.AddReducer(registry, "points",
		&AppReducer{version: "v1"})
	if err != nil {
		panic(err)
	}

	// Open a Physalis database (internally it's a bbolt DB).
	phs, err := physalis.Open("basic.db", registry)
	if err != nil {
		panic(err)
	}
	defer phs.Close()

	// Write events in a single transaction.
	// Reducer states are updated automatically and synchronously.
	err = phs.Write(physalis.Transaction[AppEvent]{
		Events: []*physalis.Event[AppEvent]{
			{Payload: AppEvent{Win: &Win{Player: "Alice", Points: 10}}},
			{Payload: AppEvent{Win: &Win{Player: "Bob", Points: 5}}},
			{Payload: AppEvent{Loss: &Loss{Player: "Alice", Points: 4}}},
		},
	})

	if err != nil {
		panic(err)
	}

	// Read reducer state.
	phs.View(func(tx *bolt.Tx) error {
		alice, err := reader.State(tx, "Alice")
		if err != nil {
			return err
		}
		fmt.Println("Alice points:", alice.Points)

		bob, err := reader.State(tx, "Bob")
		if err != nil {
			return err
		}
		fmt.Println("Bob points:", bob.Points)

		// Open a KV view to read the per-group key/value dataset.
		aliceLuckyMinutes := physalis.OpenKVView[uint32, int](reader, tx,
			"Alice", "lucky_minutes")
		for k, v := range aliceLuckyMinutes.Ascend() {
			fmt.Printf("Alice lucky minute %d: %d points\n", k, *v)
		}

		return nil
	})
}

Documentation

Index

Constants

View Source
const GlobalGroup = "."
View Source
const SkipEvent = ""

Variables

View Source
var ErrReducerAlreadyExists = errors.New("reducer already exists")

Functions

func BlobView

func BlobView(tx *bolt.Tx, key string) []byte

Slice returned by BlobView is valid only during the transaction lifetime.

func CBORMarshal

func CBORMarshal(v any) ([]byte, error)

func CBORUnmarshal

func CBORUnmarshal(data []byte, v any) error

func RestoreDatabase

func RestoreDatabase(
	ctx context.Context,
	dbFile string,
	r io.Reader,
) error

func VersionFromFS

func VersionFromFS(fsys fs.FS) string

VersionFromFS returns a deterministic "version" string for the contents of the provided filesystem (fs.FS). The version is derived from every file's bytes together with its relative path, so any change to file contents or to the set of files (add/remove/rename) results in a different version.

Intended for use with embed.FS, but works with any fs.FS.

On any traversal or read error, the function panics.

Types

type Event

type Event[EV any] struct {
	Payload   EV    `cbor:"1,keyasint"`
	Timestamp int64 `cbor:"2,keyasint"`
}

func (*Event[EV]) ReadTimestamp

func (ev *Event[EV]) ReadTimestamp() time.Time

type KType

type KType interface {
	~int32 | ~int64 | ~uint32 | ~uint64 | ~string
}

type Physalis

type Physalis[EV any] struct {
	// contains filtered or unexported fields
}

func Open

func Open[EV any](dbFile string, registry *ReducerRegistry[EV]) (*Physalis[EV], error)

func (*Physalis[EV]) BackupTo

func (phs *Physalis[EV]) BackupTo(
	ctx context.Context,
	zstdCompressionLevel int,
	w io.Writer,
) error

func (*Physalis[EV]) Close

func (phs *Physalis[EV]) Close() error

func (*Physalis[EV]) View

func (phs *Physalis[EV]) View(vf func(tx *bolt.Tx) error) error

func (*Physalis[EV]) Write

func (phs *Physalis[EV]) Write(transaction Transaction[EV]) error

type Reducer

type Reducer[ST any, EV any] interface {
	Version() string
	// Возвращает
	// 1. ключ группы событий, если он пустой то событие пропускается (SkipEvent)
	// 2. опционально может вернуть другое событие, которое будет использовано вместо текущего. Менять исходное нельзя! Оно шариться между всеми редюссерами!
	Prepare(*Event[EV]) (string, *Event[EV])
	Apply(runtime *ReducerRuntime, state *ST, groupKey string,
		evs iter.Seq2[uint64, *Event[EV]]) *ST
}

type ReducerReader

type ReducerReader[ST any] struct {
	// contains filtered or unexported fields
}

func AddReducer

func AddReducer[ST any, EV any](
	reg *ReducerRegistry[EV],
	name string,
	reducer Reducer[ST, EV],
) (*ReducerReader[ST], error)

func (*ReducerReader[ST]) AllStates

func (rr *ReducerReader[ST]) AllStates(tx *bolt.Tx) SortedKVView[string, ST]

func (*ReducerReader[ST]) State

func (rr *ReducerReader[ST]) State(
	tx *bolt.Tx,
	groupKey string,
) (*ST, error)

func (*ReducerReader[ST]) Subscribe

func (rr *ReducerReader[ST]) Subscribe(ctx context.Context) <-chan string

type ReducerRegistry

type ReducerRegistry[EV any] struct {
	// contains filtered or unexported fields
}

func NewReducerRegistry

func NewReducerRegistry[EV any]() *ReducerRegistry[EV]

type ReducerRuntime

type ReducerRuntime struct {
	// contains filtered or unexported fields
}

type SortedKV

type SortedKV[K KType, V any] interface {
	Get(key K) *V
	Delete(key K)
	Put(key K, data *V)
	AscendRange(greaterOrEqual, lessThan K) iter.Seq2[K, *V]
	Ascend() iter.Seq2[K, *V]
	Descend() iter.Seq2[K, *V]
	AscendGreaterOrEqual(greaterOrEqual K) iter.Seq2[K, *V]
	AscendLessThan(lessThan K) iter.Seq2[K, *V]
	DescendRange(lessOrEqual, greaterThan K) iter.Seq2[K, *V]
	DescendGreaterThan(greaterThan K) iter.Seq2[K, *V]
	DescendLessOrEqual(lessOrEqual K) iter.Seq2[K, *V]
}

func OpenKV

func OpenKV[K KType, V any](rr *ReducerRuntime, kvName string) SortedKV[K, V]

type SortedKVView

type SortedKVView[K KType, V any] interface {
	Get(key K) *V
	AscendRange(greaterOrEqual, lessThan K) iter.Seq2[K, *V]
	Ascend() iter.Seq2[K, *V]
	Descend() iter.Seq2[K, *V]
	AscendGreaterOrEqual(greaterOrEqual K) iter.Seq2[K, *V]
	AscendLessThan(lessThan K) iter.Seq2[K, *V]
	DescendRange(lessOrEqual, greaterThan K) iter.Seq2[K, *V]
	DescendGreaterThan(greaterThan K) iter.Seq2[K, *V]
	DescendLessOrEqual(lessOrEqual K) iter.Seq2[K, *V]
}

func OpenKVView

func OpenKVView[K KType, V any, ST any](
	reader *ReducerReader[ST],
	tx *bolt.Tx,
	groupKey string,
	kvName string,
) SortedKVView[K, V]

type Transaction

type Transaction[EV any] struct {
	Events      []*Event[EV]
	BlobStorage map[string][]byte // nil - для удаления
}

Directories

Path Synopsis
examples
basic command
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL