api

package
v0.0.0-...-0597b97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Copied from: https://github.com/ThreeDotsLabs/watermill/blob/master/log.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HashPartitioner

func HashPartitioner(key string, numPartitions int) int

HashPartitioner is the default implementation using FNV-1a hashing. It guarantees that the same key always ends up on the same partition.

Algorithm: FNV-1a_32(key) % numPartitions

Types

type CaptureLoggerAdapter

type CaptureLoggerAdapter struct {
	// contains filtered or unexported fields
}

CaptureLoggerAdapter is a logger which captures all logs. This logger is mostly useful for testing logging.

func NewCaptureLogger

func NewCaptureLogger() *CaptureLoggerAdapter

func (*CaptureLoggerAdapter) Captured

func (c *CaptureLoggerAdapter) Captured() map[LogLevel][]CapturedMessage

func (*CaptureLoggerAdapter) Debug

func (c *CaptureLoggerAdapter) Debug(msg string, fields LogFields)

func (*CaptureLoggerAdapter) Error

func (c *CaptureLoggerAdapter) Error(msg string, err error, fields LogFields)

func (*CaptureLoggerAdapter) Has

func (*CaptureLoggerAdapter) HasError

func (c *CaptureLoggerAdapter) HasError(err error) bool

func (*CaptureLoggerAdapter) Info

func (c *CaptureLoggerAdapter) Info(msg string, fields LogFields)

func (*CaptureLoggerAdapter) PrintCaptured

func (c *CaptureLoggerAdapter) PrintCaptured(t Logfer)

func (*CaptureLoggerAdapter) Trace

func (c *CaptureLoggerAdapter) Trace(msg string, fields LogFields)

func (*CaptureLoggerAdapter) With

type CapturedMessage

type CapturedMessage struct {
	Level  LogLevel
	Time   time.Time
	Fields LogFields
	Msg    string
	Err    error
}

func (CapturedMessage) ContentEquals

func (c CapturedMessage) ContentEquals(other CapturedMessage) bool

type Combiner

type Combiner func(key string, values Iterator, emit Emitter) error

Combiner is a function that aggregates values for a specific key.

Arguments:

  • key: The intermediate key (e.g., a word in WordCount).
  • values: An Iterator allowing sequential access to all values for this key.

Returns:

  • error: Any error encountered during aggregation.

type Emitter

type Emitter func(key, value string) error

Emitter is the callback function passed to the Mapper.

It allows the Mapper to "emit" intermediate key-value pairs.

type Filesystem

type Filesystem interface {
	// Glob returns a list of file paths matching the specified pattern.
	//
	// The pattern syntax is implementation-specific but generally follows shell
	// globbing rules (e.g., "input/*.txt" or "s3://bucket/data/2023-*").
	Glob(pattern string) ([]string, error)

	// Size returns the size of the file in bytes.
	//
	// This is primarily used by the Master to calculate Input Splits (assigning
	// byte ranges to workers) without downloading the file content.
	Size(filename string) (int64, error)

	// Open opens the named file for reading.
	//
	// It returns an io.ReadSeekCloser, allowing the caller to efficiently Seek()
	// to a specific offset. This is crucial for workers to read only their
	// assigned partition (chunk) of a large input file.
	Open(filename string) (io.ReadSeekCloser, error)

	// Create opens the named file for writing.
	//
	// If the file already exists, it should be truncated. This is used by
	// workers to write intermediate shuffle data and final MapReduce outputs.
	Create(filename string) (io.WriteCloser, error)

	// Delete removes the named file from the storage system.
	//
	// This is used for cleaning up intermediate files after the Reduce phase
	// completes or for removing temporary artifacts upon job failure.
	Delete(filename string) error

	// Abs returns an absolute representation of the path.
	Abs(path string) (string, error)
}

Filesystem abstracts the underlying storage layer, providing a unified API for interacting with local disks, distributed file systems (like HDFS), or object storage (like S3).

Implementations of this interface must be safe for concurrent use by multiple goroutines.

type Iterator

type Iterator interface {
	// Next returns the next value in the stream.
	//
	// It returns false as the second return value when stream is exhausted.
	Next() (string, bool)

	// Close cleans up any resources (file handles, network connections)
	// associated with the iterator.
	io.Closer
}

Iterator represents a stream of values associated with a specific key.

It allows the Reducer to process data larger than available RAM by strictly processing one item at a time.

type LogFields

type LogFields map[string]any

LogFields is the logger's key-value list of fields.

func (LogFields) Add

func (l LogFields) Add(newFields LogFields) LogFields

Add adds new fields to the list of LogFields.

func (LogFields) Copy

func (l LogFields) Copy() LogFields

Copy copies the LogFields.

type LogLevel

type LogLevel uint
const (
	TraceLogLevel LogLevel = iota + 1
	DebugLogLevel
	InfoLogLevel
	ErrorLogLevel
)

type Logfer

type Logfer interface {
	Logf(format string, a ...any)
}

type LoggerAdapter

type LoggerAdapter interface {
	Error(msg string, err error, fields LogFields)
	Info(msg string, fields LogFields)
	Debug(msg string, fields LogFields)
	Trace(msg string, fields LogFields)
	With(fields LogFields) LoggerAdapter
}

LoggerAdapter is an interface, that you need to implement to support Shard logging.

You can use StdLoggerAdapter as a reference implementation.

func NewStdLogger

func NewStdLogger(debug, trace bool) LoggerAdapter

NewStdLogger creates StdLoggerAdapter which sends all logs to stderr.

func NewStdLoggerWithOut

func NewStdLoggerWithOut(out io.Writer, debug bool, trace bool) LoggerAdapter

NewStdLoggerWithOut creates StdLoggerAdapter which sends all logs to provided io.Writer.

type Mapper

type Mapper func(key, value string, emit Emitter) error

Mapper is a function that transforms raw input into intermediate key-value pairs.

Arguments:

  • key: The identifier for the input data (e.g., filename, offset).
  • value: The raw data content (e.g., line of text, entire file).
  • emit: A callback to output 0 or more key-value pairs.

Returns:

  • error: Any error encountered during aggregation.

type NopLogger

type NopLogger struct{}

NopLogger is a logger which discards all logs.

func (NopLogger) Debug

func (NopLogger) Debug(msg string, fields LogFields)

func (NopLogger) Error

func (NopLogger) Error(msg string, err error, fields LogFields)

func (NopLogger) Info

func (NopLogger) Info(msg string, fields LogFields)

func (NopLogger) Trace

func (NopLogger) Trace(msg string, fields LogFields)

func (NopLogger) With

func (l NopLogger) With(fields LogFields) LoggerAdapter

type Partitioner

type Partitioner func(key string, numPartitions int) int

Partitioner determines which Reduce task should process a specific key.

Arguments:

  • key: The intermediate key to hash.
  • numPartitions: The total number of available Reduce tasks.

Returns:

  • int: The partition index (0 to numPartitions-1).

type Reducer

type Reducer func(key string, values Iterator, emit Emitter) error

Reducer is a function that aggregates values for a specific key.

Arguments:

  • key: The intermediate key (e.g., a word in WordCount).
  • values: An Iterator allowing sequential access to all values for this key.

Returns:

  • error: Any error encountered during aggregation.

type StdLoggerAdapter

type StdLoggerAdapter struct {
	ErrorLogger *log.Logger
	InfoLogger  *log.Logger
	DebugLogger *log.Logger
	TraceLogger *log.Logger
	// contains filtered or unexported fields
}

StdLoggerAdapter is a logger implementation, which sends all logs to provided standard output.

func (*StdLoggerAdapter) Debug

func (l *StdLoggerAdapter) Debug(msg string, fields LogFields)

func (*StdLoggerAdapter) Error

func (l *StdLoggerAdapter) Error(msg string, err error, fields LogFields)

func (*StdLoggerAdapter) Info

func (l *StdLoggerAdapter) Info(msg string, fields LogFields)

func (*StdLoggerAdapter) Trace

func (l *StdLoggerAdapter) Trace(msg string, fields LogFields)

func (*StdLoggerAdapter) With

func (l *StdLoggerAdapter) With(fields LogFields) LoggerAdapter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL