Documentation
¶
Overview ¶
Package framework provides orchestration for running analysis pipelines.
Index ¶
- Constants
- Variables
- func BackoffDuration(attempt int) time.Duration
- func CanResumeWithCheckpoint(totalAnalyzers, checkpointableCount int) bool
- func DefaultMemoryBudget() int64
- func MaybeStartCPUProfile(path string) (func(), error)
- func MaybeWriteHeapProfile(path string, logger *slog.Logger)
- func ParseOptionalSize(sizeValue string) (int64, error)
- func RunStreaming(ctx context.Context, runner *Runner, commits []*gitlib.Commit, ...) (map[analyze.HistoryAnalyzer]analyze.Report, error)
- func SafeInt(v uint64) int
- func SafeInt64(v uint64) int64
- func StallError(reqType string, retries int) error
- type BlobData
- type BlobPipeline
- type BudgetSolver
- type CacheStats
- type CheckpointParams
- type CommitBatch
- type CommitData
- type CommitStreamer
- func (s *CommitStreamer) Stream(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch
- func (s *CommitStreamer) StreamFromIterator(ctx context.Context, iter *gitlib.CommitIter, limit int) <-chan CommitBatch
- func (s *CommitStreamer) StreamSingle(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch
- type ConfigParams
- type Coordinator
- type CoordinatorConfig
- type DiffCache
- type DiffCacheStats
- type DiffKey
- type DiffPipeline
- type GlobalBlobCache
- func (c *GlobalBlobCache) CacheHits() int64
- func (c *GlobalBlobCache) CacheMisses() int64
- func (c *GlobalBlobCache) Clear()
- func (c *GlobalBlobCache) Get(hash gitlib.Hash) *gitlib.CachedBlob
- func (c *GlobalBlobCache) GetMulti(hashes []gitlib.Hash) (found map[gitlib.Hash]*gitlib.CachedBlob, missing []gitlib.Hash)
- func (c *GlobalBlobCache) Put(hash gitlib.Hash, blob *gitlib.CachedBlob)
- func (c *GlobalBlobCache) PutMulti(blobs map[gitlib.Hash]*gitlib.CachedBlob)
- func (c *GlobalBlobCache) Stats() CacheStats
- type PipelineStats
- type Runner
- func (runner *Runner) Finalize() (map[analyze.HistoryAnalyzer]analyze.Report, error)
- func (runner *Runner) Initialize() error
- func (runner *Runner) ProcessChunk(ctx context.Context, commits []*gitlib.Commit, indexOffset, chunkIndex int) (PipelineStats, error)
- func (runner *Runner) ProcessChunkFromData(ctx context.Context, data []CommitData, indexOffset, chunkIndex int) (PipelineStats, error)
- func (runner *Runner) Run(ctx context.Context, commits []*gitlib.Commit) (map[analyze.HistoryAnalyzer]analyze.Report, error)
- type StreamingConfig
- type UASTPipeline
- type Watchdog
- func (wd *Watchdog) StalledCount() int
- func (wd *Watchdog) WaitForDiffResponse(ch <-chan gitlib.DiffBatchResponse) (gitlib.DiffBatchResponse, bool)
- func (wd *Watchdog) WaitForResponse(ch <-chan gitlib.BlobBatchResponse) (gitlib.BlobBatchResponse, bool)
- func (wd *Watchdog) WaitForTreeDiffResponse(ch <-chan gitlib.TreeDiffResponse) (gitlib.TreeDiffResponse, bool)
- type WatchdogConfig
Constants ¶
const ( FileModeCommit = 0o160000 FileModeTree = 0o040000 FileModeBlob = 0o100644 FileModeExec = 0o100755 FileModeLink = 0o120000 )
File mode constants for git tree entries.
const DefaultBlobBatchArenaSize = 4 * 1024 * 1024
DefaultBlobBatchArenaSize is the default size of the memory arena for blob loading (4MB).
const DefaultDiffCacheSize = 10000
DefaultDiffCacheSize is the default maximum number of diff entries to cache.
const DefaultGlobalCacheSize = 128 * 1024 * 1024
DefaultGlobalCacheSize is the default maximum memory size for the global blob cache (128 MB).
const (
// MaxStallRetries is the maximum number of retry attempts before giving up.
MaxStallRetries = 3
)
Watchdog constants.
Variables ¶
var ( ErrInvalidSizeFormat = errors.New("invalid size format") ErrInvalidGCPercent = errors.New("invalid GC percent") )
Sentinel errors for configuration.
var ErrNotParallelizable = errors.New("leaf does not implement Parallelizable")
ErrNotParallelizable is returned when a leaf analyzer does not implement analyze.Parallelizable.
var ErrWorkerStalled = errors.New("worker stalled: CGO call did not return within timeout")
ErrWorkerStalled is returned when a worker does not respond within the timeout and all retry attempts with exponential backoff are exhausted.
Functions ¶
func BackoffDuration ¶
BackoffDuration returns the backoff duration for the given retry attempt (0-indexed). Sequence: 0s, 1s, 4s.
func CanResumeWithCheckpoint ¶
CanResumeWithCheckpoint returns true if all analyzers support checkpointing.
func DefaultMemoryBudget ¶
func DefaultMemoryBudget() int64
DefaultMemoryBudget returns a sensible memory budget based on available system memory. Returns min(50% of total RAM, 2 GiB), or 0 if detection fails.
func MaybeStartCPUProfile ¶
MaybeStartCPUProfile starts CPU profiling to the given file. Returns a stop function that must be deferred. Returns a no-op if path is empty.
func MaybeWriteHeapProfile ¶
MaybeWriteHeapProfile writes a heap profile to the given file. No-op if path is empty. Uses the provided logger for error reporting.
func ParseOptionalSize ¶
ParseOptionalSize parses a human-readable size string, returning 0 for empty or "0".
func RunStreaming ¶
func RunStreaming( ctx context.Context, runner *Runner, commits []*gitlib.Commit, analyzers []analyze.HistoryAnalyzer, config StreamingConfig, ) (map[analyze.HistoryAnalyzer]analyze.Report, error)
RunStreaming executes the pipeline in streaming chunks with optional checkpoint support. When the memory budget is sufficient and multiple chunks are needed, it enables double-buffered chunk pipelining to overlap pipeline execution with analyzer consumption.
func StallError ¶
StallError creates a descriptive error for a stalled worker.
Types ¶
type BlobData ¶
type BlobData struct {
Commit *gitlib.Commit
Index int
Changes gitlib.Changes
BlobCache map[gitlib.Hash]*gitlib.CachedBlob
Error error
}
BlobData holds loaded blob data for a commit.
type BlobPipeline ¶
type BlobPipeline struct {
SeqWorkerChan chan<- gitlib.WorkerRequest
PoolWorkerChan chan<- gitlib.WorkerRequest
BufferSize int
WorkerCount int
BlobCache *GlobalBlobCache
ArenaSize int
}
BlobPipeline processes commit batches to load blobs.
func NewBlobPipeline ¶
func NewBlobPipeline( seqChan chan<- gitlib.WorkerRequest, poolChan chan<- gitlib.WorkerRequest, bufferSize int, workerCount int, ) *BlobPipeline
NewBlobPipeline creates a new blob pipeline.
func NewBlobPipelineWithCache ¶
func NewBlobPipelineWithCache( seqChan chan<- gitlib.WorkerRequest, poolChan chan<- gitlib.WorkerRequest, bufferSize int, workerCount int, cache *GlobalBlobCache, ) *BlobPipeline
NewBlobPipelineWithCache creates a new blob pipeline with an optional global blob cache.
func (*BlobPipeline) Process ¶
func (p *BlobPipeline) Process(ctx context.Context, commits <-chan CommitBatch) <-chan BlobData
Process receives commit batches and outputs blob data.
type BudgetSolver ¶
type BudgetSolver func(budgetBytes int64) (CoordinatorConfig, error)
BudgetSolver resolves a memory budget (in bytes) to a CoordinatorConfig.
type CacheStats ¶
CacheStats holds cache performance metrics.
func (CacheStats) HitRate ¶
func (s CacheStats) HitRate() float64
HitRate returns the cache hit rate (0.0 to 1.0).
type CheckpointParams ¶
CheckpointParams holds checkpoint-related configuration.
type CommitBatch ¶
type CommitBatch struct {
// Commits in this batch.
Commits []*gitlib.Commit
// StartIndex is the index of the first commit in the full sequence.
StartIndex int
// BatchID identifies this batch for ordering.
BatchID int
}
CommitBatch represents a batch of commits for processing.
type CommitData ¶
type CommitData struct {
Commit *gitlib.Commit
Index int
Changes gitlib.Changes
BlobCache map[gitlib.Hash]*gitlib.CachedBlob
FileDiffs map[string]plumbing.FileDiffData
UASTChanges []uast.Change // Pre-computed UAST changes (nil if not computed).
Error error
}
CommitData holds all processed data for a commit.
type CommitStreamer ¶
type CommitStreamer struct {
// BatchSize is the number of commits per batch.
BatchSize int
// Lookahead is the number of batches to prefetch.
Lookahead int
}
CommitStreamer iterates commits and groups them into batches for efficient processing.
func NewCommitStreamer ¶
func NewCommitStreamer() *CommitStreamer
NewCommitStreamer creates a new commit streamer with default settings.
func (*CommitStreamer) Stream ¶
func (s *CommitStreamer) Stream(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch
Stream takes a slice of commits and streams them as batches. The output channel is closed when all commits have been sent.
func (*CommitStreamer) StreamFromIterator ¶
func (s *CommitStreamer) StreamFromIterator(ctx context.Context, iter *gitlib.CommitIter, limit int) <-chan CommitBatch
StreamFromIterator streams commits from a commit iterator. This is more memory-efficient for large repositories.
func (*CommitStreamer) StreamSingle ¶
func (s *CommitStreamer) StreamSingle(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch
StreamSingle streams commits one at a time (batch size = 1). This is compatible with the existing sequential processing model.
type ConfigParams ¶
type ConfigParams struct {
Workers int
BufferSize int
CommitBatchSize int
BlobCacheSize string
DiffCacheSize int
BlobArenaSize string
MemoryBudget string
GCPercent int
BallastSize string
}
ConfigParams holds raw CLI parameter values for building a CoordinatorConfig. All size strings use humanize format (e.g. "256MB", "1GiB").
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator orchestrates the full data processing pipeline.
func NewCoordinator ¶
func NewCoordinator(repo *gitlib.Repository, config CoordinatorConfig) *Coordinator
NewCoordinator creates a new coordinator for the repository.
func (*Coordinator) Config ¶
func (c *Coordinator) Config() CoordinatorConfig
Config returns the coordinator configuration.
func (*Coordinator) Process ¶
func (c *Coordinator) Process(ctx context.Context, commits []*gitlib.Commit) <-chan CommitData
Process runs the full pipeline on a slice of commits. After the returned channel is fully drained, call Stats() to retrieve pipeline timing and cache metrics.
func (*Coordinator) ProcessSingle ¶
func (c *Coordinator) ProcessSingle(ctx context.Context, commit *gitlib.Commit, _ int) CommitData
ProcessSingle processes a single commit.
func (*Coordinator) Stats ¶
func (c *Coordinator) Stats() PipelineStats
Stats returns the pipeline stats collected during Process(). Only valid after the channel returned by Process() is fully drained.
type CoordinatorConfig ¶
type CoordinatorConfig struct {
// BatchConfig configures batch sizes for blob and diff operations.
BatchConfig gitlib.BatchConfig
// CommitBatchSize is the number of commits to process in each batch.
CommitBatchSize int
// Workers is the number of parallel workers for processing.
Workers int
// BufferSize is the size of internal channels.
BufferSize int
// BlobCacheSize is the maximum size of the global blob cache in bytes.
// Set to 0 to disable caching.
BlobCacheSize int64
// DiffCacheSize is the maximum number of diff results to cache.
// Set to 0 to disable caching.
DiffCacheSize int
// BlobArenaSize is the size of the memory arena for blob loading.
// Defaults to 16MB if 0.
BlobArenaSize int
// UASTPipelineWorkers is the number of goroutines for parallel UAST parsing
// in the pipeline stage. Set to 0 to disable the UAST pipeline stage.
UASTPipelineWorkers int
// LeafWorkers is the number of goroutines for parallel leaf analyzer consumption.
// Each worker processes a disjoint subset of commits via Fork/Merge.
// Set to 0 to disable parallel leaf consumption (serial path).
LeafWorkers int
// GCPercent controls Go's GC aggressiveness.
// Set to 0 to use auto mode (200 when system memory > 32 GiB).
GCPercent int
// BallastSize reserves bytes in a long-lived slice to smooth GC behavior.
// Set to 0 to disable ballast allocation.
BallastSize int64
// WorkerTimeout is the maximum time to wait for a worker response before
// considering it stalled. Set to 0 to disable the watchdog.
WorkerTimeout time.Duration
}
CoordinatorConfig configures the pipeline coordinator.
func BuildConfigFromParams ¶
func BuildConfigFromParams(params ConfigParams, budgetSolver BudgetSolver) (CoordinatorConfig, int64, error)
BuildConfigFromParams builds a CoordinatorConfig from raw parameters. Returns the config and the memory budget in bytes (0 if not set). The budgetSolver is called when params.MemoryBudget is set; pass nil if memory-budget is not supported.
func DefaultCoordinatorConfig ¶
func DefaultCoordinatorConfig() CoordinatorConfig
DefaultCoordinatorConfig returns the default coordinator configuration.
func (CoordinatorConfig) EstimatedOverhead ¶
func (c CoordinatorConfig) EstimatedOverhead() int64
EstimatedOverhead returns the estimated memory consumed by the pipeline infrastructure (runtime, workers, caches, buffers) — everything except analyzer state. This allows the streaming planner to accurately compute how much memory remains for analyzer state growth.
type DiffCache ¶
type DiffCache struct {
// contains filtered or unexported fields
}
DiffCache provides an LRU cache for diff results. It caches computed diffs to avoid redundant diff computations.
func NewDiffCache ¶
NewDiffCache creates a new diff cache with the specified maximum entries.
func (*DiffCache) CacheMisses ¶
CacheMisses returns the total cache miss count (atomic, lock-free).
func (*DiffCache) Get ¶
func (c *DiffCache) Get(key DiffKey) (plumbing.FileDiffData, bool)
Get retrieves a cached diff result.
func (*DiffCache) Put ¶
func (c *DiffCache) Put(key DiffKey, diff plumbing.FileDiffData)
Put adds a diff result to the cache.
func (*DiffCache) Stats ¶
func (c *DiffCache) Stats() DiffCacheStats
Stats returns current cache statistics.
type DiffCacheStats ¶
DiffCacheStats holds statistics about diff cache usage.
func (DiffCacheStats) HitRate ¶
func (s DiffCacheStats) HitRate() float64
HitRate returns the cache hit rate as a fraction.
type DiffPipeline ¶
type DiffPipeline struct {
PoolWorkerChan chan<- gitlib.WorkerRequest
BufferSize int
DiffCache *DiffCache
}
DiffPipeline processes blob data to compute file diffs.
func NewDiffPipeline ¶
func NewDiffPipeline(workerChan chan<- gitlib.WorkerRequest, bufferSize int) *DiffPipeline
NewDiffPipeline creates a new diff pipeline.
func NewDiffPipelineWithCache ¶
func NewDiffPipelineWithCache(workerChan chan<- gitlib.WorkerRequest, bufferSize int, cache *DiffCache) *DiffPipeline
NewDiffPipelineWithCache creates a new diff pipeline with an optional diff cache.
func (*DiffPipeline) Process ¶
func (p *DiffPipeline) Process(ctx context.Context, blobs <-chan BlobData) <-chan CommitData
Process receives blob data and outputs commit data with computed diffs.
type GlobalBlobCache ¶
type GlobalBlobCache struct {
// contains filtered or unexported fields
}
GlobalBlobCache provides a cross-commit LRU cache for blob data. It tracks memory usage and evicts least recently used entries when the limit is exceeded.
func NewGlobalBlobCache ¶
func NewGlobalBlobCache(maxSize int64) *GlobalBlobCache
NewGlobalBlobCache creates a new global blob cache with the specified maximum size in bytes.
func (*GlobalBlobCache) CacheHits ¶
func (c *GlobalBlobCache) CacheHits() int64
CacheHits returns the total cache hit count (atomic, lock-free).
func (*GlobalBlobCache) CacheMisses ¶
func (c *GlobalBlobCache) CacheMisses() int64
CacheMisses returns the total cache miss count (atomic, lock-free).
func (*GlobalBlobCache) Clear ¶
func (c *GlobalBlobCache) Clear()
Clear removes all entries from the cache.
func (*GlobalBlobCache) Get ¶
func (c *GlobalBlobCache) Get(hash gitlib.Hash) *gitlib.CachedBlob
Get retrieves a blob from the cache. Returns nil if not found.
func (*GlobalBlobCache) GetMulti ¶
func (c *GlobalBlobCache) GetMulti(hashes []gitlib.Hash) (found map[gitlib.Hash]*gitlib.CachedBlob, missing []gitlib.Hash)
GetMulti retrieves multiple blobs from the cache. Returns a map of found blobs and a slice of missing hashes.
func (*GlobalBlobCache) Put ¶
func (c *GlobalBlobCache) Put(hash gitlib.Hash, blob *gitlib.CachedBlob)
Put adds a blob to the cache. If the cache exceeds maxSize, LRU entries are evicted.
func (*GlobalBlobCache) PutMulti ¶
func (c *GlobalBlobCache) PutMulti(blobs map[gitlib.Hash]*gitlib.CachedBlob)
PutMulti adds multiple blobs to the cache.
func (*GlobalBlobCache) Stats ¶
func (c *GlobalBlobCache) Stats() CacheStats
Stats returns cache statistics.
type PipelineStats ¶
type PipelineStats struct {
BlobDuration time.Duration
DiffDuration time.Duration
UASTDuration time.Duration
BlobCacheHits int64
BlobCacheMisses int64
DiffCacheHits int64
DiffCacheMisses int64
}
PipelineStats holds cumulative pipeline metrics for a single Coordinator run. Populated during Process(); valid after the returned channel is fully drained.
func (*PipelineStats) Add ¶
func (s *PipelineStats) Add(other PipelineStats)
Add accumulates another PipelineStats into this one (cross-chunk aggregation).
type Runner ¶
type Runner struct {
Repo *gitlib.Repository
RepoPath string
Analyzers []analyze.HistoryAnalyzer
Config CoordinatorConfig
// Tracer is the OTel tracer for creating pipeline spans.
// When nil, falls back to otel.Tracer("codefang").
Tracer trace.Tracer
// CoreCount is the number of leading analyzers in the Analyzers slice that are
// core (plumbing) analyzers. These run sequentially. Analyzers after CoreCount
// are leaf analyzers that can be parallelized via Fork/Merge.
// Set to 0 to disable parallel leaf consumption.
CoreCount int
// contains filtered or unexported fields
}
Runner orchestrates multiple HistoryAnalyzers over a commit sequence. It always uses the Coordinator pipeline (batch blob load + batch diff in C).
func NewRunner ¶
func NewRunner(repo *gitlib.Repository, repoPath string, analyzers ...analyze.HistoryAnalyzer) *Runner
NewRunner creates a new Runner for the given repository and analyzers. Uses DefaultCoordinatorConfig(). Use NewRunnerWithConfig for custom configuration.
func NewRunnerWithConfig ¶
func NewRunnerWithConfig( repo *gitlib.Repository, repoPath string, config CoordinatorConfig, analyzers ...analyze.HistoryAnalyzer, ) *Runner
NewRunnerWithConfig creates a new Runner with custom coordinator configuration.
func (*Runner) Initialize ¶
Initialize initializes all analyzers. Call once before processing chunks.
func (*Runner) ProcessChunk ¶
func (runner *Runner) ProcessChunk(ctx context.Context, commits []*gitlib.Commit, indexOffset, chunkIndex int) (PipelineStats, error)
ProcessChunk processes a chunk of commits without Initialize/Finalize. Use this for streaming mode where Initialize is called once at start and Finalize once at end. The indexOffset is added to the commit index to maintain correct ordering across chunks. chunkIndex is the zero-based chunk number used for span naming.
func (*Runner) ProcessChunkFromData ¶
func (runner *Runner) ProcessChunkFromData(ctx context.Context, data []CommitData, indexOffset, chunkIndex int) (PipelineStats, error)
ProcessChunkFromData consumes pre-fetched CommitData through analyzers, bypassing Coordinator creation. Used by double-buffered chunk pipelining where the pipeline has already run and collected data. Returns zero PipelineStats since the real stats come from the prefetch Coordinator.
type StreamingConfig ¶
type StreamingConfig struct {
MemBudget int64
Checkpoint CheckpointParams
RepoPath string
AnalyzerNames []string
// Logger is the structured logger for streaming operations.
// When nil, a discard logger is used.
Logger *slog.Logger
// DebugTrace enables 100% trace sampling for debugging.
DebugTrace bool
// AnalysisMetrics records analysis-specific OTel metrics (commits, chunks,
// cache stats). Nil-safe: when nil, no metrics are recorded.
AnalysisMetrics *observability.AnalysisMetrics
}
StreamingConfig holds configuration for streaming pipeline execution.
type UASTPipeline ¶
UASTPipeline pre-computes UAST changes for each commit in the pipeline, enabling cross-commit parallelism. It sits between DiffPipeline and the serial analyzer consumption loop.
func NewUASTPipeline ¶
func NewUASTPipeline(parser *uast.Parser, workers, bufferSize int) *UASTPipeline
NewUASTPipeline creates a new UAST pipeline stage.
func (*UASTPipeline) Process ¶
func (p *UASTPipeline) Process(ctx context.Context, diffs <-chan CommitData) <-chan CommitData
Process receives commit data with blobs and diffs, and adds pre-computed UAST changes. Multiple commits are processed concurrently by worker goroutines. Output order matches input order via a slot-based approach.
type Watchdog ¶
type Watchdog struct {
// contains filtered or unexported fields
}
Watchdog monitors worker pool health and recreates stalled workers. It wraps the pool worker channel with timeout-aware dispatch and provides exponential backoff retry on stall detection.
func NewWatchdog ¶
func NewWatchdog(cfg WatchdogConfig) *Watchdog
NewWatchdog creates a Watchdog that monitors the worker pool. Returns nil if timeout is zero (disabled).
func (*Watchdog) StalledCount ¶
StalledCount returns the total number of stall events observed.
func (*Watchdog) WaitForDiffResponse ¶
func (wd *Watchdog) WaitForDiffResponse(ch <-chan gitlib.DiffBatchResponse) (gitlib.DiffBatchResponse, bool)
WaitForDiffResponse waits for a diff response with timeout.
func (*Watchdog) WaitForResponse ¶
func (wd *Watchdog) WaitForResponse(ch <-chan gitlib.BlobBatchResponse) (gitlib.BlobBatchResponse, bool)
WaitForResponse waits for a response on the given channel with timeout. Returns true if the response was received, false if the worker stalled. On stall, it recreates one pool worker and returns false so the caller can retry.
func (*Watchdog) WaitForTreeDiffResponse ¶
func (wd *Watchdog) WaitForTreeDiffResponse(ch <-chan gitlib.TreeDiffResponse) (gitlib.TreeDiffResponse, bool)
WaitForTreeDiffResponse waits for a tree diff response with timeout.
type WatchdogConfig ¶
type WatchdogConfig struct {
Span trace.Span
RepoPath string
Config CoordinatorConfig
PoolRepos []*gitlib.Repository
PoolWorkers []*gitlib.Worker
PoolRequests chan gitlib.WorkerRequest
Logger *slog.Logger
}
WatchdogConfig holds parameters for creating a Watchdog.