work

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 16 Imported by: 0

README

work

A fork of gocraft/work — a background job processing library for Go, backed by Redis.

What's New

This fork adds several production-grade features on top of the original gocraft/work:

  • Enhanced Retry & Backoff — per-job retry configuration with exponential, linear, and fixed backoff strategies
  • Runtime Scheduled Tasks — manage cron-based recurring jobs at runtime via API and Web UI (powered by robfig/cron/v3)
  • Modern Web UI — rebuilt with React 19, TypeScript, Vite, and Tailwind CSS
  • Job History & Search — track completed/failed jobs with paginated history and job ID lookup
  • Docker & CI/CD — Dockerfile, Docker Compose, and GitHub Actions pipeline for automated builds
  • Redis URL Support — connect using redis://:password@host:port URLs alongside plain host:port

Features

  • Fast and efficient, durable job processing backed by Redis
  • Middleware on jobs for metrics, logging, etc.
  • Configurable retries with backoff strategies (exponential, linear, fixed)
  • Schedule jobs to run in the future
  • Enqueue unique jobs (only one with a given name/arguments at a time)
  • Periodically enqueue jobs on a cron schedule (manageable at runtime)
  • Job history tracking with search by job ID
  • Web UI to monitor workers, queues, retry/dead jobs, schedules, and history
  • Pause/unpause jobs and control concurrency within and across processes

Installation

go get github.com/LarnTechKe/work

Requires Go 1.24+ and Redis.

Enqueue Jobs

package main

import (
	"log"

	"github.com/gomodule/redigo/redis"
	"github.com/LarnTechKe/work"
)

var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle:   5,
	Wait:      true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
	_, err := enqueuer.Enqueue("send_email", work.Q{"address": "[email protected]", "subject": "hello world"})
	if err != nil {
		log.Fatal(err)
	}
}

Enqueue with Retry Options

Override retry behaviour per job at enqueue time:

_, err := enqueuer.EnqueueWithOptions("send_webhook", work.Q{"url": "https://example.com/hook"}, work.EnqueueOptions{
	Retry: &work.RetryOptions{
		MaxRetries: 5,
		Strategy:   work.BackoffExponential, // or BackoffLinear, BackoffFixed
		BaseDelay:  3,                       // seconds
	},
})

Enqueue with Delay

_, err := enqueuer.EnqueueWithOptions("send_report", work.Q{"format": "pdf"}, work.EnqueueOptions{
	Delay: 5 * time.Minute,
})

Process Jobs

package main

import (
	"fmt"
	"os"
	"os/signal"

	"github.com/gomodule/redigo/redis"
	"github.com/LarnTechKe/work"
)

var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle:   5,
	Wait:      true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

type Context struct {
	customerID int64
}

func main() {
	pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

	pool.Middleware((*Context).Log)
	pool.Job("send_email", (*Context).SendEmail)
	pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)

	// Periodic jobs (cron schedule)
	pool.PeriodicallyEnqueue("0 0 * * * *", "calculate_caches") // every hour

	pool.Start()

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt)
	<-signalChan

	pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
	fmt.Println("Starting job: ", job.Name)
	return next()
}

func (c *Context) SendEmail(job *work.Job) error {
	addr := job.ArgString("address")
	if err := job.ArgError(); err != nil {
		return err
	}
	fmt.Println("Sending email to", addr)
	return nil
}

func (c *Context) Export(job *work.Job) error {
	return nil
}

Backoff Strategies

Strategy Behaviour
BackoffExponential Quartic polynomial with jitter (default)
BackoffLinear baseDelay * failCount
BackoffFixed Constant delay between retries

You can also provide a custom BackoffCalculator:

pool.JobWithOptions("custom_job", work.JobOptions{
	MaxFails: 10,
	Backoff: func(job *work.Job) int64 {
		return int64(job.Fails) * 60 // 60s per failure
	},
}, (*Context).CustomJob)

Scheduled Tasks (Runtime Cron)

In addition to code-defined periodic jobs (PeriodicallyEnqueue), you can manage schedules at runtime via the Client API:

client := work.NewClient("my_app_namespace", redisPool)

// Add a schedule
client.AddPeriodicSchedule(&work.PeriodicSchedule{
	Name:    "nightly_cleanup",
	JobName: "cleanup",
	Spec:    "0 0 0 * * *", // midnight daily
	Enabled: true,
})

// List all schedules
schedules, _ := client.PeriodicSchedules()

// Disable / enable / delete
client.DisablePeriodicSchedule("nightly_cleanup")
client.EnablePeriodicSchedule("nightly_cleanup")
client.DeletePeriodicSchedule("nightly_cleanup")

Schedules are stored in Redis and can also be managed via the Web UI.

Job History

Completed and failed jobs are recorded in a history log. Query via the Client:

client := work.NewClient("my_app_namespace", redisPool)

// Paginated history (page 1, 20 items per page)
jobs, count, _ := client.HistoryJobs(1, "")

// Filter by job name
jobs, count, _ = client.HistoryJobs(1, "send_email")

// Lookup by job ID
job, _ := client.HistoryJobByID("abc123")

// Total history count
total, _ := client.HistoryCount()

History is also accessible from the Web UI dashboard.

Unique Jobs

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id": "123"})
job, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id": "123"}) // job == nil (duplicate)

Job Concurrency

Control how many jobs of a given type can run concurrently (across all worker pools sharing the same Redis):

pool.JobWithOptions("export", work.JobOptions{MaxConcurrency: 1}, (*Context).Export) // single-threaded

Run the Web UI

From Source
go run ./cmd/workwebui -redis "redis://:password@host:6379" -ns "my_namespace" -listen ":5040"

Flags:

Flag Default Description
-redis :6379 Redis address — host:port or redis://:pass@host:port
-database 0 Redis DB number (ignored for redis:// URLs)
-ns work Redis namespace
-listen :5040 HTTP listen address

Navigate to http://localhost:5040/.

With Docker
# Build
docker build -t work-webui .

# Run
docker run -p 5040:5040 work-webui \
  -redis "redis://:password@host:6379" \
  -ns "my_namespace"
With Docker Compose
# Configure .env with your Redis URL and namespace, then:
docker compose up -d

# Open http://localhost:5040

See docker-compose.yml and .env for configuration.

Redis Cluster

If using Redis Cluster, use Hash Tags to force keys onto a single node:

pool := work.NewWorkerPool(Context{}, 10, "{my_app_namespace}", redisPool)

This is not needed for Redis Sentinel deployments.

Design and Concepts

Enqueueing

Jobs are serialized to JSON and added to a Redis list (LPUSH). Each job name gets its own queue automatically.

Scheduling Algorithm

Each queue has a priority (1–100000). Workers pick queues probabilistically based on relative priority. Empty queues are skipped.

Processing a Job
  1. A Lua script atomically moves a job from its queue to an in-progress queue (checking pause state and concurrency limits)
  2. The worker runs the job
  3. On success, the job is removed from in-progress and recorded in history
  4. On failure, it's either retried (with backoff) or moved to the dead queue
Retry & Dead Jobs

Failed jobs go to a retry z-set (scored by when to retry). After exhausting retries, they move to the dead queue. Both can be managed via the Web UI or Client API.

The Reaper

If a process crashes, its in-progress jobs are recovered by the reaper, which monitors heartbeats and requeues orphaned work.

Job History

Completed and failed jobs are recorded in Redis with three data structures: a main z-set (sorted by timestamp), per-name z-sets (for filtering), and an ID hash (for direct lookup). History is automatically cleaned up after 15 days.

CI/CD

The repository includes a GitHub Actions workflow (.github/workflows/webui.yml) that:

  1. Builds and tests the Go code
  2. Builds the Docker image (frontend + backend)
  3. Pushes to Docker Hub with a timestamped tag and latest
  4. Creates a GitHub release

Required secrets: DOCKERHUB_USERNAME, DOCKERHUB_TOKEN.

License

MIT. See LICENSE.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotDeleted = fmt.Errorf("nothing deleted")

ErrNotDeleted is returned by functions that delete jobs to indicate that although the redis commands were successful, no object was actually deleted by those commmands.

View Source
var ErrNotRetried = fmt.Errorf("nothing retried")

ErrNotRetried is returned by functions that retry jobs to indicate that although the redis commands were successful, no object was actually retried by those commmands.

Functions

This section is empty.

Types

type BackoffCalculator

type BackoffCalculator func(job *Job) int64

You may provide your own backoff function for retrying failed jobs or use the builtin one. Returns the number of seconds to wait until the next attempt.

The builtin backoff calculator provides an exponentially increasing wait function.

func ExponentialBackoff

func ExponentialBackoff() BackoffCalculator

ExponentialBackoff returns the default exponential backoff calculator.

func FixedBackoff

func FixedBackoff(seconds int64) BackoffCalculator

FixedBackoff returns a BackoffCalculator that always waits the specified number of seconds.

func LinearBackoff

func LinearBackoff(baseSeconds int64) BackoffCalculator

LinearBackoff returns a BackoffCalculator that waits baseSeconds * failCount.

type BackoffStrategy

type BackoffStrategy int

BackoffStrategy defines predefined backoff strategies for retrying failed jobs.

const (
	// BackoffExponential uses the default quartic polynomial backoff with jitter.
	BackoffExponential BackoffStrategy = iota
	// BackoffLinear uses a linear backoff: baseDelay * failCount.
	BackoffLinear
	// BackoffFixed uses a constant delay between retries.
	BackoffFixed
)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements all of the functionality of the web UI. It can be used to inspect the status of a running cluster and retry dead jobs.

func NewClient

func NewClient(namespace string, pool *redis.Pool) *Client

NewClient creates a new Client with the specified redis namespace and connection pool.

func (*Client) AddPeriodicSchedule

func (c *Client) AddPeriodicSchedule(schedule *PeriodicSchedule) error

AddPeriodicSchedule adds or updates a named periodic schedule in Redis. The schedule will be picked up by the periodic enqueuer on its next cycle.

func (*Client) DeadJobs

func (c *Client) DeadJobs(page uint) ([]*DeadJob, int64, error)

DeadJobs returns a list of DeadJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of dead jobs is also returned.

func (*Client) DeleteAllDeadJobs

func (c *Client) DeleteAllDeadJobs() error

DeleteAllDeadJobs deletes all dead jobs.

func (*Client) DeleteDeadJob

func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error

DeleteDeadJob deletes a dead job from Redis.

func (*Client) DeletePeriodicSchedule

func (c *Client) DeletePeriodicSchedule(name string) error

DeletePeriodicSchedule removes a named periodic schedule from Redis.

func (*Client) DeleteRetryJob

func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error

DeleteRetryJob deletes a job in the retry queue.

func (*Client) DeleteScheduledJob

func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error

DeleteScheduledJob deletes a job in the scheduled queue.

func (*Client) DisablePeriodicSchedule

func (c *Client) DisablePeriodicSchedule(name string) error

DisablePeriodicSchedule disables a schedule without removing it.

func (*Client) EnablePeriodicSchedule

func (c *Client) EnablePeriodicSchedule(name string) error

EnablePeriodicSchedule enables a previously disabled schedule.

func (*Client) HistoryCount

func (c *Client) HistoryCount() (int64, error)

HistoryCount returns the total number of entries in the history.

func (*Client) HistoryJobByID

func (c *Client) HistoryJobByID(jobID string) (*HistoryJob, error)

HistoryJobByID looks up a single history entry by job ID using the index HASH.

func (*Client) HistoryJobs

func (c *Client) HistoryJobs(page uint, jobName string) ([]*HistoryJob, int64, error)

HistoryJobs returns a paginated list of history entries, newest first. If jobName is non-empty, it filters to only that job name using the per-name secondary index.

func (*Client) PeriodicSchedules

func (c *Client) PeriodicSchedules() ([]*PeriodicSchedule, error)

PeriodicSchedules returns all periodic schedules stored in Redis.

func (*Client) Queues

func (c *Client) Queues() ([]*Queue, error)

Queues returns the Queue's it finds.

func (*Client) RedisInfo

func (c *Client) RedisInfo() (*RedisStats, error)

RedisInfo executes the Redis INFO command and returns parsed server metrics.

func (*Client) RetryAllDeadJobs

func (c *Client) RetryAllDeadJobs() error

RetryAllDeadJobs requeues all dead jobs. In other words, it puts them all back on the normal work queue for workers to pull from and process.

func (*Client) RetryDeadJob

func (c *Client) RetryDeadJob(diedAt int64, jobID string) error

RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker.

func (*Client) RetryJobs

func (c *Client) RetryJobs(page uint) ([]*RetryJob, int64, error)

RetryJobs returns a list of RetryJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of retry jobs is also returned.

func (*Client) ScheduledJobs

func (c *Client) ScheduledJobs(page uint) ([]*ScheduledJob, int64, error)

ScheduledJobs returns a list of ScheduledJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of scheduled jobs is also returned.

func (*Client) WorkerObservations

func (c *Client) WorkerObservations() ([]*WorkerObservation, error)

WorkerObservations returns all of the WorkerObservation's it finds for all worker pools' workers.

func (*Client) WorkerPoolHeartbeats

func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error)

WorkerPoolHeartbeats queries Redis and returns all WorkerPoolHeartbeat's it finds (even for those worker pools which don't have a current heartbeat).

type DeadJob

type DeadJob struct {
	DiedAt int64 `json:"died_at"`
	*Job
}

DeadJob represents a job in the dead queue.

type EnqueueOptions

type EnqueueOptions struct {
	// Delay specifies how long to wait before the job becomes eligible for processing.
	// Zero means the job is processed immediately.
	Delay time.Duration

	// Retry configures per-job retry behavior. If nil, the job type defaults are used.
	Retry *RetryOptions

	// Unique ensures only one job with this name and arguments is enqueued at a time.
	Unique bool

	// UniqueKey provides a custom key map for uniqueness checks.
	// If nil and Unique is true, the job arguments are used as the key.
	UniqueKey map[string]interface{}
}

EnqueueOptions combines all options for enqueuing a job.

type Enqueuer

type Enqueuer struct {
	Namespace string // eg, "myapp-work"
	Pool      *redis.Pool
	// contains filtered or unexported fields
}

Enqueuer can enqueue jobs.

func NewEnqueuer

func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer

NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.

func (*Enqueuer) Enqueue

func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error)

Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. Example: e.Enqueue("send_email", work.Q{"addr": "[email protected]"})

func (*Enqueuer) EnqueueIn

func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error)

EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.

func (*Enqueuer) EnqueueUnique

func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error)

EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. EnqueueUnique returns the job if it was enqueued and nil if it wasn't

func (*Enqueuer) EnqueueUniqueByKey

func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error)

EnqueueUniqueByKey enqueues a job unless a job is already enqueued with the same name and key, updating arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and key can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't

func (*Enqueuer) EnqueueUniqueIn

func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error)

EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.

func (*Enqueuer) EnqueueUniqueInByKey

func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error)

EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. Subsequent calls with same key will update arguments

func (*Enqueuer) EnqueueWithOptions

func (e *Enqueuer) EnqueueWithOptions(jobName string, args map[string]interface{}, opts EnqueueOptions) (*Job, error)

EnqueueWithOptions enqueues a job with full configuration control. It supports delayed execution, per-job retry configuration, and uniqueness.

type GenericHandler

type GenericHandler func(*Job) error

GenericHandler is a job handler without any custom context.

type GenericMiddlewareHandler

type GenericMiddlewareHandler func(*Job, NextMiddlewareFunc) error

GenericMiddlewareHandler is a middleware without any custom context.

type HistoryJob

type HistoryJob struct {
	CompletedAt int64  `json:"completed_at"`
	Status      string `json:"status"` // "success", "retry", "dead"
	*Job
}

HistoryJob represents a completed job in the history log.

type Job

type Job struct {
	// Inputs when making a new job
	Name       string                 `json:"name,omitempty"`
	ID         string                 `json:"id"`
	EnqueuedAt int64                  `json:"t"`
	Args       map[string]interface{} `json:"args"`
	Unique     bool                   `json:"unique,omitempty"`
	UniqueKey  string                 `json:"unique_key,omitempty"`

	// Inputs when retrying
	Fails    int64  `json:"fails,omitempty"` // number of times this job has failed
	LastErr  string `json:"err,omitempty"`
	FailedAt int64  `json:"failed_at,omitempty"`

	// Per-job retry configuration (set at enqueue time, optional).
	// When nil, the job type defaults from the WorkerPool are used.
	MaxRetries  *int64 `json:"max_retries,omitempty"`
	BackoffBase *int64 `json:"backoff_base,omitempty"`
	BackoffType *int   `json:"backoff_type,omitempty"` // 0=exponential, 1=linear, 2=fixed
	// contains filtered or unexported fields
}

Job represents a job.

func (*Job) ArgBool

func (j *Job) ArgBool(key string) bool

ArgBool returns j.Args[key] typed to a bool. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) ArgError

func (j *Job) ArgError() error

ArgError returns the last error generated when extracting typed params. Returns nil if extracting the args went fine.

func (*Job) ArgFloat64

func (j *Job) ArgFloat64(key string) float64

ArgFloat64 returns j.Args[key] typed to a float64. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) ArgInt64

func (j *Job) ArgInt64(key string) int64

ArgInt64 returns j.Args[key] typed to an int64. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) ArgString

func (j *Job) ArgString(key string) string

ArgString returns j.Args[key] typed to a string. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) Checkin

func (j *Job) Checkin(msg string)

Checkin will update the status of the executing job to the specified messages. This message is visible within the web UI. This is useful for indicating some sort of progress on very long running jobs. For instance, on a job that has to process a million records over the course of an hour, the job could call Checkin with the current job number every 10k jobs.

type JobOptions

type JobOptions struct {
	Priority       uint              // Priority from 1 to 10000
	MaxFails       uint              // 1: send straight to dead (unless SkipDead)
	SkipDead       bool              // If true, don't send failed jobs to the dead queue when retries are exhausted.
	MaxConcurrency uint              // Max number of jobs to keep in flight (default is 0, meaning no max)
	Backoff        BackoffCalculator // If not set, uses the default backoff algorithm
}

JobOptions can be passed to JobWithOptions.

type NextMiddlewareFunc

type NextMiddlewareFunc func() error

NextMiddlewareFunc is a function type (whose instances are named 'next') that you call to advance to the next middleware.

type PeriodicSchedule

type PeriodicSchedule struct {
	Name      string                 `json:"name"`
	JobName   string                 `json:"job_name"`
	Spec      string                 `json:"spec"`
	Args      map[string]interface{} `json:"args,omitempty"`
	Enabled   bool                   `json:"enabled"`
	CreatedAt int64                  `json:"created_at"`
	UpdatedAt int64                  `json:"updated_at"`
}

PeriodicSchedule represents a named recurring job schedule stored in Redis. Unlike code-defined schedules via PeriodicallyEnqueue, these can be created, modified, and deleted at runtime through the Client API.

type Q

type Q map[string]interface{}

Q is a shortcut to easily specify arguments for jobs when enqueueing them. Example: e.Enqueue("send_email", work.Q{"addr": "[email protected]", "track": true})

type Queue

type Queue struct {
	JobName string `json:"job_name"`
	Count   int64  `json:"count"`
	Latency int64  `json:"latency"`
}

Queue represents a queue that holds jobs with the same name. It indicates their name, count, and latency (in seconds). Latency is a measurement of how long ago the next job to be processed was enqueued.

type RedisStats

type RedisStats struct {
	Version          string `json:"version"`
	UptimeSeconds    int64  `json:"uptime_seconds"`
	ConnectedClients int64  `json:"connected_clients"`
	UsedMemory       int64  `json:"used_memory"`
	UsedMemoryHuman  string `json:"used_memory_human"`
	UsedMemoryPeak   string `json:"used_memory_peak_human"`
	TotalCommands    int64  `json:"total_commands_processed"`
	OpsPerSec        int64  `json:"instantaneous_ops_per_sec"`
	KeyspaceHits     int64  `json:"keyspace_hits"`
	KeyspaceMisses   int64  `json:"keyspace_misses"`
	DBKeys           int64  `json:"db_keys"`
}

RedisStats contains key metrics parsed from the Redis INFO command.

type RetryJob

type RetryJob struct {
	RetryAt int64 `json:"retry_at"`
	*Job
}

RetryJob represents a job in the retry queue.

type RetryOptions

type RetryOptions struct {
	// MaxRetries is the maximum number of times to retry the job.
	// 0 means use the job type default.
	MaxRetries int64

	// Strategy selects a predefined backoff strategy.
	// Only used when Backoff is nil.
	Strategy BackoffStrategy

	// Backoff is a custom backoff function. If set, it takes precedence over Strategy.
	Backoff BackoffCalculator

	// BaseDelay is the base delay in seconds for linear and fixed strategies.
	// Defaults to 60 seconds if not set.
	BaseDelay int64
}

RetryOptions configures retry behavior for a job at enqueue time. When set, these override the job type defaults configured on the WorkerPool.

type ScheduledJob

type ScheduledJob struct {
	RunAt int64 `json:"run_at"`
	*Job
}

ScheduledJob represents a job in the scheduled queue.

type WorkerObservation

type WorkerObservation struct {
	WorkerID string `json:"worker_id"`
	IsBusy   bool   `json:"is_busy"`

	// If IsBusy:
	JobName   string `json:"job_name"`
	JobID     string `json:"job_id"`
	StartedAt int64  `json:"started_at"`
	ArgsJSON  string `json:"args_json"`
	Checkin   string `json:"checkin"`
	CheckinAt int64  `json:"checkin_at"`
}

WorkerObservation represents the latest observation taken from a worker. The observation indicates whether the worker is busy processing a job, and if so, information about that job.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.

func NewWorkerPool

func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool

NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently.

func NewWorkerPoolWithOptions

func NewWorkerPoolWithOptions(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool, workerPoolOpts WorkerPoolOptions) *WorkerPool

NewWorkerPoolWithOptions creates a new worker pool as per the NewWorkerPool function, but permits you to specify additional options such as sleep backoffs.

func (*WorkerPool) Daily

func (wp *WorkerPool) Daily(jobName string, hour, minute int) *WorkerPool

Daily enqueues jobName at the specified hour and minute (UTC) every day.

func (*WorkerPool) Drain

func (wp *WorkerPool) Drain()

Drain drains all jobs in the queue before returning. Note that if jobs are added faster than we can process them, this function wouldn't return.

func (*WorkerPool) Every

func (wp *WorkerPool) Every(jobName string, d time.Duration) *WorkerPool

Every enqueues jobName at the given duration interval. The duration must be >= 1 minute.

func (*WorkerPool) Hourly

func (wp *WorkerPool) Hourly(jobName string, minute int) *WorkerPool

Hourly enqueues jobName at the specified minute of every hour.

func (*WorkerPool) Job

func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool

Job registers the job name to the specified handler fn. For instance, when workers pull jobs from the name queue they'll be processed by the specified handler function. fn can take one of these forms: (*ContextType).func(*Job) error, (ContextType matches the type of ctx specified when creating a pool) func(*Job) error, for the generic handler format.

func (*WorkerPool) JobWithOptions

func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool

JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them.

func (*WorkerPool) Middleware

func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool

Middleware appends the specified function to the middleware chain. The fn can take one of these forms: (*ContextType).func(*Job, NextMiddlewareFunc) error, (ContextType matches the type of ctx specified when creating a pool) func(*Job, NextMiddlewareFunc) error, for the generic middleware format.

func (*WorkerPool) PeriodicallyEnqueue

func (wp *WorkerPool) PeriodicallyEnqueue(spec string, jobName string) *WorkerPool

PeriodicallyEnqueue will periodically enqueue jobName according to the cron-based spec. The spec format is based on https://godoc.org/github.com/robfig/cron, which is a relatively standard cron format. Note that the first value is the seconds! If you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.

func (*WorkerPool) PeriodicallyEnqueueWithArgs

func (wp *WorkerPool) PeriodicallyEnqueueWithArgs(spec string, jobName string, args map[string]interface{}) *WorkerPool

PeriodicallyEnqueueWithArgs works like PeriodicallyEnqueue but also sets the provided args on each enqueued job instance.

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start starts the workers and associated processes.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop stops the workers and associated processes.

type WorkerPoolHeartbeat

type WorkerPoolHeartbeat struct {
	WorkerPoolID string   `json:"worker_pool_id"`
	StartedAt    int64    `json:"started_at"`
	HeartbeatAt  int64    `json:"heartbeat_at"`
	JobNames     []string `json:"job_names"`
	Concurrency  uint     `json:"concurrency"`
	Host         string   `json:"host"`
	Pid          int      `json:"pid"`
	WorkerIDs    []string `json:"worker_ids"`
}

WorkerPoolHeartbeat represents the heartbeat from a worker pool. WorkerPool's write a heartbeat every 5 seconds so we know they're alive and includes config information.

type WorkerPoolOptions

type WorkerPoolOptions struct {
	SleepBackoffs []int64 // Sleep backoffs in milliseconds
}

WorkerPoolOptions can be passed to NewWorkerPoolWithOptions.

Directories

Path Synopsis
cmd
workenqueue command
workfakedata command
workwebui command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL