funtask

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: MIT Imports: 25 Imported by: 0

README

funtask

Turn any Go function into an HTTP-callable task - with progress, callbacks, and a built-in dashboard.

CI Go Report Card Go Reference Codecov Go version

Works with n8n, Airflow, Temporal, Prefect, or any system that can POST JSON and receive callbacks. Zero dependencies - stdlib only.

How it works

sequenceDiagram
    participant O as Orchestrator<br>(n8n / Airflow / ...)
    participant F as funtask server

    O->>F: POST /run/my-task<br>{ callbackUrl, params }
    F-->>O: 202 Accepted { jobId }
    Note over F: Task runs...<br>GET /health shows progress
    F->>O: POST callbackUrl { result }

Install

go get github.com/funktionslust/funtask@latest

Quick start

package main

import (
    "log"
    "os"
    "time"

    "github.com/funktionslust/funtask"
)

func main() {
    s := funtask.New("my-server",
        funtask.Task("greet", greet).
            Description("Returns a greeting").
            Example(map[string]any{"name": "World"}),
        funtask.Task("countdown", countdown).
            Description("Counts down from start to 1").
            Example(map[string]any{"start": 5}),
        funtask.WithAuthToken(os.Getenv("FUNTASK_AUTH_TOKEN")),
        funtask.WithDeadLetterDir(os.Getenv("FUNTASK_DEAD_LETTER_DIR")),
        funtask.WithDashboard(),
    )
    log.Fatal(s.ListenAndServe(":8080"))
}

func greet(_ *funtask.Run, params funtask.Params) funtask.Result {
    name, _ := params.String("name")
    if name == "" {
        name = "World"
    }
    return funtask.OK("Hello, %s!", name).WithData("name", name)
}

func countdown(ctx *funtask.Run, params funtask.Params) funtask.Result {
    start, err := params.Int("start")
    if err != nil {
        return funtask.Fail("invalid_params", "%v", err)
    }
    for i := range start {
        ctx.Progress(i+1, start, "T-%d", start-i)
        select {
        case <-ctx.Done():
            return funtask.Fail("cancelled", "stopped at T-%d", start-i)
        case <-time.After(1 * time.Second):
        }
    }
    return funtask.OK("Countdown complete.").WithData("start", start)
}
export FUNTASK_AUTH_TOKEN=secret
export FUNTASK_DEAD_LETTER_DIR=/tmp/dead-letters
mkdir -p $FUNTASK_DEAD_LETTER_DIR
go run .

# Open the dashboard:
open http://localhost:8080/dashboard

# Sync call (blocks until done):
curl -H "Authorization: Bearer secret" \
  -d '{"params":{"name":"World"}}' localhost:8080/run/greet

# Async call (returns immediately, posts result to callback):
curl -H "Authorization: Bearer secret" \
  -d '{"callbackUrl":"https://example.com/webhook","params":{"start":5}}' \
  localhost:8080/run/countdown

Task definition

Register tasks with Task(). Chain Description() and Example() to add metadata visible in /health and the dashboard:

funtask.Task("sync-orders", syncOrders).
    Description("Syncs open orders from Shopify to SAP").
    Example(map[string]any{"since": "2025-01-01", "limit": 100})

Description() shows as a subtitle on the dashboard card. Example() pre-fills the params editor and appears in the /health JSON.

Dashboard

Enable the built-in developer dashboard with WithDashboard():

funtask.WithDashboard()

Open /dashboard in a browser. It provides:

  • Live task status via SSE (no polling)
  • Progress bars and step descriptions
  • Error details for failed tasks
  • Params editor with pre-filled examples
  • Run / Stop buttons per task
  • cURL snippet generator

Dashboard screenshot

The dashboard page is served without bearer-token auth. Authentication is handled client-side - the browser prompts for the token and calls /health before showing any data.

API

Endpoint Method Auth Description
/run/{task} POST Bearer Run a task (sync or async with callbackUrl)
/stop/{task} POST Bearer Cancel a running task
/result/{jobId} GET Bearer Fetch a stored result
/health GET Bearer Server status and per-task progress
/events GET Bearer / ?token= SSE stream of health snapshots (query param because EventSource cannot send headers)
/dashboard GET None (client-side) Developer dashboard UI
/livez GET None Liveness probe
/readyz GET None Readiness probe

Configuration

funtask.New("my-server",
    // Tasks (at least one required)
    funtask.Task("name", fn).
        Description("human-readable description").
        Example(map[string]any{"key": "value"}).
        KeepResults(20),  // per-task result history override

    // Security (required)
    funtask.WithAuthToken("token"),
    funtask.WithDeadLetterDir("/path/to/dead-letters"),

    // Dashboard
    funtask.WithDashboard(),

    // Timeouts
    funtask.WithMaxDuration(10 * time.Minute),
    funtask.WithSyncTimeout(2 * time.Minute),
    funtask.WithShutdownTimeout(30 * time.Second),

    // Callbacks
    funtask.WithCallbackAllowlist("https://hooks.example.com"),
    funtask.WithCallbackRetries(5),
    funtask.WithCallbackTimeout(30 * time.Second),

    // Result history
    funtask.WithResultHistory(10),   // server-wide default per task

    // Extensibility
    funtask.WithReadiness(func() error { return nil }),
    funtask.WithHandler("GET /custom", myHandler),
)

Result patterns

// Success
return funtask.OK("processed %d records", count).
    WithData("count", count)

// Failure with error code
return funtask.Fail("db_error", "connection refused to %s", host)

// Wrap a Go error
return funtask.FailFromError(err)

Progress reporting

ctx.Step("connecting to database")
ctx.Progress(i, total, "processing record %d", i)

Progress is visible in /health, the SSE event stream, and the dashboard.

License

MIT


Built by Wolfgang Stark, Funktionslust GmbH
Enterprise software development and consulting.

Documentation

Overview

Package funtask provides a Go library for building HTTP task workers.

A server hosts one or more named tasks that an orchestrator can trigger via HTTP. Task authors write functions that receive execution context and parameters, and return structured results.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorCoder added in v0.2.0

type ErrorCoder interface {
	ErrorCode() string
}

ErrorCoder is implemented by errors that carry a machine-readable code. FailFromError uses this to extract the error code automatically.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option configures a Server. Pass options to New.

func WithAuthToken

func WithAuthToken(token string) Option

WithAuthToken sets the bearer token for all endpoints.

func WithCallbackAllowlist

func WithCallbackAllowlist(origins ...string) Option

WithCallbackAllowlist sets allowed callback URL origins.

func WithCallbackRetries

func WithCallbackRetries(n int) Option

WithCallbackRetries sets the number of callback delivery attempts. Default: 5.

func WithCallbackTimeout

func WithCallbackTimeout(d time.Duration) Option

WithCallbackTimeout sets the per-attempt callback HTTP timeout. Default: 30s.

func WithDashboard added in v1.0.0

func WithDashboard() Option

WithDashboard enables the built-in developer dashboard at /dashboard. The dashboard shows task status, progress, errors, and allows triggering tasks. It uses Server-Sent Events via /events for live updates and handles authentication client-side by prompting for the bearer token. The /dashboard path becomes reserved when enabled and cannot be used by WithHandler.

func WithDeadLetterDir

func WithDeadLetterDir(path string) Option

WithDeadLetterDir sets the dead letter directory path. Required.

func WithHandler

func WithHandler(pattern string, handler http.Handler) Option

WithHandler registers a custom HTTP handler on the server's mux. The pattern follows net/http.ServeMux syntax (e.g. "GET /api/orders"). Custom handlers are not protected by the server's bearer-token auth; apply your own middleware as needed. Patterns must not conflict with built-in routes (/run, /stop, /result, /health, /livez, /readyz).

func WithMaxDuration

func WithMaxDuration(d time.Duration) Option

WithMaxDuration sets the maximum job duration. Default: no limit.

func WithReadiness

func WithReadiness(fn func() error) Option

WithReadiness sets a custom readiness check for /readyz.

func WithResultHistory added in v1.0.0

func WithResultHistory(n int) Option

WithResultHistory sets the server-wide default for how many recent results to keep per task. Default: 10. Individual tasks can override this with TaskDef.KeepResults.

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) Option

WithShutdownTimeout sets the graceful shutdown timeout. Default: 30s.

func WithSyncTimeout

func WithSyncTimeout(d time.Duration) Option

WithSyncTimeout sets the sync-mode timeout. Default: 2m.

type ParamReader

type ParamReader struct {
	// contains filtered or unexported fields
}

ParamReader reads parameters and collects errors. After the first error, subsequent reads still execute and collect additional errors (returning zero values). This way Err reports all invalid parameters at once, not just the first one.

func (*ParamReader) Bool

func (r *ParamReader) Bool(key string) bool

Bool returns the boolean parameter for the given key. If the key is missing or the value is not a bool, the error is collected and false is returned.

func (*ParamReader) Err

func (r *ParamReader) Err() error

Err returns a combined error for all failed parameter reads, or nil if all reads succeeded. The error message lists all invalid parameters, e.g. "missing or invalid parameters: since (expected time), limit (expected int)".

func (*ParamReader) Float

func (r *ParamReader) Float(key string) float64

Float returns the float64 parameter for the given key. If the key is missing or the value is not a number, the error is collected and zero is returned.

func (*ParamReader) Int

func (r *ParamReader) Int(key string) int

Int returns the integer parameter for the given key. If the key is missing or the value is not an integer, the error is collected and zero is returned.

func (*ParamReader) String

func (r *ParamReader) String(key string) string

String returns the string parameter for the given key. If the key is missing or the value is not a string, the error is collected and an empty string is returned.

func (*ParamReader) Time

func (r *ParamReader) Time(key string) time.Time

Time returns the time parameter for the given key. If the key is missing or the value is not a valid RFC 3339 string, the error is collected and the zero time is returned.

type Params

type Params struct {
	// contains filtered or unexported fields
}

Params holds the input parameters from the orchestrator. Since params arrive as JSON, all numbers are float64 internally. The typed accessors handle conversion — e.g. Int("limit") accepts float64(500) from JSON and returns int(500). Non-integer floats like 3.7 are rejected by Int().

func TestParams

func TestParams(t testing.TB, m map[string]any) Params

TestParams creates a Params value from a map literal. Fails the test immediately if the map contains types that Params cannot represent (e.g., channels, functions).

func TestParamsRaw

func TestParamsRaw(m map[string]any) Params

TestParamsRaw creates a Params value without validation. Use this to test parameter validation paths with bad inputs (e.g., wrong types, missing keys).

func (Params) Bool

func (p Params) Bool(key string) (bool, error)

Bool returns the boolean parameter for the given key. It returns an error if the key is missing or the value is not a bool.

func (Params) Float

func (p Params) Float(key string) (float64, error)

Float returns the float64 parameter for the given key. It returns an error if the key is missing or the value is not a number.

func (Params) Int

func (p Params) Int(key string) (int, error)

Int returns the integer parameter for the given key. JSON numbers arrive as float64, so this method accepts whole-number floats and converts them. Non-integer floats like 3.7 are rejected.

func (Params) Raw

func (p Params) Raw() map[string]any

Raw returns the underlying parameter map. This is useful for custom parameter handling that goes beyond the typed accessors.

func (Params) Reader

func (p Params) Reader() *ParamReader

Reader returns a ParamReader that collects errors from multiple parameter reads, avoiding repetitive if-err-nil blocks.

func (Params) String

func (p Params) String(key string) (string, error)

String returns the string parameter for the given key. It returns an error if the key is missing or the value is not a string.

func (Params) Time

func (p Params) Time(key string) (time.Time, error)

Time returns the time parameter for the given key. The value must be a string in RFC 3339 format (e.g. "2026-01-01T00:00:00Z").

type Result

type Result struct {
	Success   bool
	Message   string
	ErrorCode string
	Data      map[string]any
	LastStep  string
	// contains filtered or unexported fields
}

Result represents the outcome of a task execution. It carries a success flag, a human-readable message, an optional machine-readable error code, optional key-value data, and the last reported step for diagnostic context.

func Fail

func Fail(code string, format string, args ...any) Result

Fail creates a failure result with a machine-readable error code and a formatted human-readable message.

func FailFromError added in v0.2.0

func FailFromError(err error, fallbackCode string) Result

FailFromError creates a failure result from a Go error. If the error implements ErrorCoder, its ErrorCode() is used; otherwise fallbackCode applies. If the error implements UserMessager, its UserMessage() becomes the result message; otherwise err.Error() is used. The original error is stored as the cause for server-side logging but never serialized.

func OK

func OK(format string, args ...any) Result

OK creates a success result with a formatted message.

func (Result) WithCause added in v0.2.0

func (r Result) WithCause(err error) Result

WithCause returns a copy of the result with the given error attached as the underlying cause. The cause is logged server-side but never included in callback payloads or JSON serialization.

func (Result) WithData

func (r Result) WithData(key string, value any) Result

WithData returns a copy of the result with the given key-value pair added to its data map. Multiple calls can be chained. The original result is never modified.

Note: after JSON round-trip (e.g. callback payloads), numeric values become float64 per encoding/json conventions.

type Run

type Run struct {
	context.Context
	// contains filtered or unexported fields
}

Run is the execution context for a single job. It embeds context.Context so it works anywhere a context.Context is expected — pass it directly to database drivers, HTTP clients, etc.

func TestRun

func TestRun(t testing.TB) *Run

TestRun returns a *Run backed by the test's context. Cancelled when the test's deadline expires (if set via -timeout). Steps are recorded and logged via t.Log for debugging. Call TestRun(...).Steps() to retrieve the recorded steps for assertions.

func TestRunWithContext

func TestRunWithContext(t testing.TB, ctx context.Context) *Run

TestRunWithContext returns a *Run backed by a custom context. Use this to test cancellation handling.

func (*Run) Progress

func (r *Run) Progress(current, total int, msg string, args ...any)

Progress reports structured progress for countable iterations. It updates the step description and records current/total/percent, all visible via GET /health. Use this instead of Step when iterating over a countable set of items.

func (*Run) Step

func (r *Run) Step(msg string, args ...any)

Step reports what the task is currently doing. The step description is visible via GET /health and included in error results for diagnostic context. Only the latest step is retained.

func (*Run) Steps

func (r *Run) Steps() []string

Steps returns all step descriptions recorded by Step() and Progress() calls, in order. Use this in tests to assert that a task function reported expected progress.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server hosts one or more named tasks that an orchestrator can trigger via HTTP. Configure it with New and start it with ListenAndServe.

func New

func New(name string, opts ...Option) *Server

New creates a server. At least one Task is required.

func (*Server) ListenAndServe

func (f *Server) ListenAndServe(addr string) error

ListenAndServe validates configuration, starts the HTTP server, and blocks until SIGTERM is received or the server fails. On SIGTERM the server enters draining state, cancels in-flight jobs, and shuts down the HTTP server gracefully.

type TaskDef added in v1.0.0

type TaskDef struct {
	// contains filtered or unexported fields
}

TaskDef configures a named task. Create with Task, pass to New.

func Task added in v1.0.0

func Task(name string, fn TaskFunc) *TaskDef

Task creates a named task definition. Pass the result to New.

func (*TaskDef) Description added in v1.0.0

func (td *TaskDef) Description(desc string) *TaskDef

Description sets a human-readable description for the task.

func (*TaskDef) Example added in v1.0.0

func (td *TaskDef) Example(params map[string]any) *TaskDef

Example sets example parameters for the task. The dashboard uses this to pre-fill the Params textarea and show a Params button. The map is copied immediately; later mutations to the original are safe.

func (*TaskDef) KeepResults added in v1.0.0

func (td *TaskDef) KeepResults(n int) *TaskDef

KeepResults sets the number of recent results to retain for this task. Overrides the server-wide default set by WithResultHistory.

type TaskFunc

type TaskFunc func(ctx *Run, params Params) Result

TaskFunc is the function signature for task implementations. The function receives a Run context for cancellation and progress reporting, and Params for typed parameter access. It must respect ctx cancellation — when ctx is cancelled (via /stop or shutdown), the function should return promptly.

type UserMessager added in v0.2.0

type UserMessager interface {
	UserMessage() string
}

UserMessager is implemented by errors that carry a user-facing message separate from the technical error string. FailFromError uses this to extract a safe message for the result.

Directories

Path Synopsis
Counter is a reference example for funtask.
Counter is a reference example for funtask.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL