Documentation
¶
Index ¶
- Constants
- Variables
- func NewFakeTransport(failCnt int, rateLimitCnt int) *fakeTransport
- func Unwrap(err error) []error
- type BoolFunc
- type ErrFieldTypeMismatchType
- type Handler
- func NewEventTrackHandler(client *api.Events, logger logger.Logger) Handler
- func NewListSubscribeHandler(client *api.Lists, logger logger.Logger) Handler
- func NewListUnSubscribeBatchHandler(client *api.Lists, logger logger.Logger) Handler
- func NewSubscriptionUpdateHandler(client *api.Users, logger logger.Logger) Handler
- func NewUserEmailUpdateHandler(client *api.Users, logger logger.Logger) Handler
- func NewUserUpdateHandler(client *api.Users, logger logger.Logger) Handler
- type Message
- type ProcessBatchResponse
- type Processor
- type ProcessorConfig
- type Response
- type StatusCannotRetry
- type StatusPartialSuccess
- type StatusRetryBatch
- type StatusRetryIndividual
- type StatusSuccess
- type TestBatchHandler
Constants ¶
const ( ErrStrBatchError = "Batch error" ErrStrDisallowedEventName = "Disallowed Event Name" ErrStrConflictEmails = "Email conflicts" ErrStrConflictUserIds = "UserId conflicts" ErrStrForgottenEmails = "Email Forgotten" ErrStrForgottenUserIds = "UserId Forgotten" ErrStrInvalidDataEmails = "Invalid Data" ErrStrInvalidDataUserIds = "Invalid Data" ErrStrInvalidEmails = "Malformed Email" ErrStrInvalidUserIds = "Malformed UserId" ErrStrNotFoundEmails = "Email not found" ErrStrNotFoundUserIds = "UserId not found" ErrStrFieldTypeMismatch = iterable_errors.ITERABLE_FieldTypeMismatchErrStr ErrStrValidEmailFailures = "Internal Error with Email" ErrStrValidUserIdFailures = "Internal Error with UserId" ErrStrInvalidDataType = "Invalid data type in batch request" ErrStrInvalidListId = "List ID not valid for Iterable project" ErrStrInvalidBatchProcessorState = "InvalidBatchProcessorStateErr" ErrStrProcessOneNotAllowed = "ProcessOne is not allowed" ErrStrProcessBatchNotAllowed = "ProcessBatch is not allowed" )
const ( SuccessfulTestData = "success" IndividualSuccessTestData = "individual_success" RetryTestData = "retry" FailTestData = "fail" )
Variables ¶
var ( ErrApiError = &iterable_errors.ApiError{} ErrBatchError = errors.New(ErrStrBatchError) ErrInvalidListId = errors.New(ErrStrInvalidListId) ErrConflictEmails = errors.New(ErrStrConflictEmails) ErrConflictUserIds = errors.New(ErrStrConflictUserIds) ErrForgottenEmails = errors.New(ErrStrForgottenEmails) ErrForgottenUserIds = errors.New(ErrStrForgottenUserIds) ErrInvalidDataEmails = errors.New(ErrStrInvalidDataEmails) ErrInvalidDataUserIds = errors.New(ErrStrInvalidDataUserIds) ErrInvalidEmails = errors.New(ErrStrInvalidEmails) ErrInvalidUserIds = errors.New(ErrStrInvalidUserIds) ErrNotFoundEmails = errors.New(ErrStrNotFoundEmails) ErrNotFoundUserIds = errors.New(ErrStrNotFoundUserIds) ErrFieldTypeMismatch = &ErrFieldTypeMismatchType{} ErrInvalidDataType = errors.New(ErrStrInvalidDataType) ErrValidEmailFailures = errors.New(ErrStrValidEmailFailures) ErrValidUserIdFailures = errors.New(ErrStrValidUserIdFailures) ErrDisallowedEventName = errors.New(ErrStrDisallowedEventName) ErrInvalidBatchProcessorState = errors.New(ErrStrInvalidBatchProcessorState) ErrProcessOneNotAllowed = errors.New(ErrStrProcessOneNotAllowed) ErrProcessBatchNotAllowed = errors.New(ErrStrProcessBatchNotAllowed) // ErrClientValidationApiErr represents a client-side validation error that occurs before sending the request. // This error indicates that the request data failed validation checks on the client side. // It uses "fake" HttpStatusCode=400 to emphasize that retrying won't fix the issue. ErrClientValidationApiErr = &iterable_errors.ApiError{ Stage: iterable_errors.STAGE_BEFORE_REQUEST, Type: "data_validation_error", HttpStatusCode: 400, IterableCode: "iterable_client_data_validation_error", } // ErrServerValidationApiErr represents a server-side validation error that occurs after sending the request. // This error indicates that the request data failed validation checks on the server side with a 400 status code. // "Real" HttpStatusCode might be 200, but the error uses "fake" HttpStatusCode=400 // to emphasize that retrying won't fix the issue. ErrServerValidationApiErr = &iterable_errors.ApiError{ Stage: iterable_errors.STAGE_AFTER_REQUEST, Type: "data_validation_error", HttpStatusCode: 400, IterableCode: "iterable_server_data_validation_error", } // ErrClientMustRetryBatchApiErr indicates that the operation must be retried as a Batch request. // Usually, this happens when sending single (non-batch) requests has some negative consequences, // and we want to force the client of the library to send the same message as part of a Batch operation. ErrClientMustRetryBatchApiErr = &iterable_errors.ApiError{ Stage: iterable_errors.STAGE_BEFORE_REQUEST, Type: "must_retry", HttpStatusCode: 500, IterableCode: "iterable_client_must_retry_batch", } // ErrClientMustRetryOneApiErr indicates that the operation must be retried as an individual // request. Usually, this happens when sending a Batch request has some negative consequences, // and we want to force the client of the library to send the same request individually. ErrClientMustRetryOneApiErr = &iterable_errors.ApiError{ Stage: iterable_errors.STAGE_BEFORE_REQUEST, Type: "must_retry", HttpStatusCode: 500, IterableCode: "iterable_client_must_retry_one", } )
Functions ¶
func NewFakeTransport ¶
Types ¶
type ErrFieldTypeMismatchType ¶ added in v0.2.0
type ErrFieldTypeMismatchType struct {
// contains filtered or unexported fields
}
func NewErrFieldTypeMismatch ¶ added in v0.2.0
func NewErrFieldTypeMismatch(fields types.MismatchedFieldsParams) *ErrFieldTypeMismatchType
func (*ErrFieldTypeMismatchType) Error ¶ added in v0.2.0
func (e *ErrFieldTypeMismatchType) Error() string
func (*ErrFieldTypeMismatchType) Is ¶ added in v0.2.0
func (e *ErrFieldTypeMismatchType) Is(other error) bool
Is method is required by errors.Is() to properly distinguish between different types -vs- same pointer to the same type. Without it, errors.Is(err, ErrFieldTypeMismatch) returns false: ok := errors.Is(errors.Join(ErrFieldTypeMismatch), ErrFieldTypeMismatch) ^ would be false
func (*ErrFieldTypeMismatchType) MismatchedFields ¶ added in v0.2.0
func (e *ErrFieldTypeMismatchType) MismatchedFields() types.MismatchedFieldsParams
type Handler ¶
type Handler interface {
// ProcessBatch processes multiple messages in a single batch operation
// for efficiency. Returns a slice of Response objects (one per input message),
// an error if the entire batch failed, and a boolean indicating
// if the batch operation can be retried.
ProcessBatch(batch []Message) (ProcessBatchResponse, error)
// ProcessOne processes a single message individually, typically used
// for retry scenarios when batch processing fails or when individual messages
// need special handling.
// Returns a single Response object with the result of processing the message.
ProcessOne(message Message) Response
}
Handler defines the contract for implementing batch processing logic for specific API operations. Implementations should handle both batch processing (for efficiency) and individual processing (for retry scenarios). The handler is responsible for making actual API calls and converting results into Response objects.
Usage Example:
type MyHandler struct {
apiClient *api.Events
}
func (h *MyHandler) ProcessBatch(messages []Message) (ProcessBatchResponse, error) {
// Convert messages to types.EventTrackBulkRequest
results, err := h.apiClient.TrackBulk(batch)
if err != nil {
return batch.StatusRetryIndividual{results}, err // error, can retry
}
// Convert results to Response objects
return batch.StatusSuccess{results}, nil
}
func (h *MyHandler) ProcessOne(message Message) Response {
// Convert message to types.EventTrackRequest
result, err := h.apiClient.Track(request)
return Response{Data: result, OriginalReq: message, Error: err, Retry: err != nil}
}
func NewEventTrackHandler ¶
func NewListSubscribeHandler ¶
type Message ¶
type Message struct {
// Data contains the actual request payload to be sent to Iterable
// (e.g., user data, event data, etc.)
Data any
// MetaData holds optional contextual information that
// can be used for tracking, correlation, or response handling
MetaData any
}
Message represents a generic request to be processed in a batch operation. It serves as a wrapper that carries both the actual data to be sent to Iterable and optional metadata that can be used for tracking or response correlation.
Usage Example:
message := batch.Message{
Data: userUpdateRequest, // The actual API request data
MetaData: "user-123", // Optional tracking identifier
}
processor.Add(message)
type ProcessBatchResponse ¶ added in v0.2.0
type ProcessBatchResponse interface {
// contains filtered or unexported methods
}
ProcessBatchResponse is an interface that defines the contract for different batch processing outcomes. It provides a unified way to handle various response scenarios (success, retry, partial success, etc.) while encapsulating the actual response data. Each implementation represents a specific outcome state and contains the appropriate response data and any relevant error information. The interface method response() returns the slice of Response objects for the batch operation.
type Processor ¶
type Processor interface {
// Start begins the batch processing loop. The processor
// will start listening for messages and automatically flush batches
// when FlushQueueSize is reached or FlushInterval elapses.
// This method is idempotent - calling Start() multiple times
// has no effect if already running.
Start()
// Stop gracefully shuts down the processor. It closes the message channel,
// waits for all in-flight batches to complete (both sync and async),
// and prepares for potential restart.
// This method is idempotent - calling Stop() multiple times
// has no effect if already stopped.
Stop()
// Add queues a message for batch processing. Messages are accumulated
// until FlushQueueSize is reached or FlushInterval elapses,
// then processed as a batch by the configured Handler.
// This method is thread-safe and will block if the internal buffer is full.
Add(req Message)
}
Processor provides a batching mechanism for processing messages efficiently. It accumulates individual messages and processes them in batches based on size or time thresholds, with support for retries, async processing, and response handling.
Usage Example:
// Create a processor with a handler and configuration
processor := batch.NewProcessor(
myHandler, // Handler that implements ProcessBatch and ProcessOne
responseChan, // Optional channel to receive processing results
batch.ProcessorConfig{
FlushQueueSize: 100, // Process when 100 messages accumulate
FlushInterval: 5*time.Second, // Or process every 5 seconds
MaxRetries: 3, // Retry failed batches up to 3 times
Async: batch.Async, // Process batches asynchronously
},
)
// Start the processor (begins listening for messages)
processor.Start()
// Add messages for batch processing
processor.Add(message1)
processor.Add(message2)
// ... messages will be automatically batched and processed
// Stop the processor (waits for in-flight batches to complete)
processor.Stop()
func NewProcessor ¶
func NewProcessor( handler Handler, respChan chan<- Response, config ProcessorConfig, ) Processor
type ProcessorConfig ¶
type ProcessorConfig struct {
// FlushQueueSize defines the maximum number of messages
// to accumulate before triggering a batch flush
// default: 100
FlushQueueSize int
// FlushInterval specifies the maximum time to wait
// before flushing a batch, even if FlushQueueSize hasn't been reached
// default: 5 seconds
FlushInterval time.Duration
// MaxRetries sets the maximum number of retry attempts
// for failed batch operations
// default: 1
MaxRetries int
// Retry configures the retry strategy (exponential backoff, delays, etc.)
// for failed requests
// default: retry.NewExponentialRetry
Retry retry.Retry
// SendIndividual is a function that determines whether the processor
// needs to send messages one-by-one (individually) if batch request succeeds,
// but some messages in the batch have errors.
// This is useful when there's 1 "bad" message in the batch which fails
// the entire batch.
// default: true
SendIndividual BoolFunc
// NumOfIndividualGoroutines specifies the number of concurrent goroutines used
// when retrying failed messages individually after a partial batch failure.
// Only applies when SendIndividual is true and some messages in a batch fail.
// Values must be between 1 and 100.
// Higher values increase parallelism but also resource usage.
// default: 1 (processes retries sequentially)
NumOfIndividualGoroutines int
// MaxBufferSize determines the buffer size of the internal request channel
// to prevent blocking on Add() calls
// default: 2000
MaxBufferSize int
// Async is a function that determines whether batch processing
// should run asynchronously or synchronously
// default: true
Async BoolFunc
// MaxAsyncRequests limits the number of concurrent goroutines
// when processing batches asynchronously.
// default: 50
MaxAsyncRequests int
// Logger provides logging functionality for debugging
// and monitoring batch processing operations
// default: logger.Noop
Logger logger.Logger
// contains filtered or unexported fields
}
type Response ¶
type Response struct {
// Data contains the successful response data from the API operation
// or nil if error occurred
Data any
// OriginalReq holds a reference to the original Message that was processed
OriginalReq Message
// Error contains any error that occurred during processing
// or nil if successful
Error error
// Retry indicates whether this failed request should be retried
// (only relevant when Error is not nil)
Retry bool
}
Response represents the result of processing a batch request, containing both successful results and error information. It maintains a reference to the original request for correlation and includes retry information for error handling.
Usage Example:
// Successful response
response := batch.Response{
Data: apiResponseData,
OriginalReq: originalMessage,
Error: nil,
Retry: false,
}
// Failed response that should be retried
response := batch.Response{
Data: nil,
OriginalReq: originalMessage,
Error: networkError,
Retry: true,
}
type StatusCannotRetry ¶ added in v0.2.0
type StatusCannotRetry struct {
Response []Response
}
StatusCannotRetry represents a permanent failure state where retrying would not help. This occurs in cases like authentication failures, malformed requests or batch requests where All non-batch messages failed client validation.
type StatusPartialSuccess ¶ added in v0.2.0
type StatusPartialSuccess struct {
Response []Response
}
StatusPartialSuccess indicates that some messages in the batch were processed successfully while others failed. The Response slice contains a mix of successful and failed results, allowing for granular handling of the partial success scenario.
type StatusRetryBatch ¶ added in v0.2.0
StatusRetryBatch indicates that the entire batch operation should be retried as a single unit. This typically occurs due to temporary issues like network problems or rate limiting.
type StatusRetryIndividual ¶ added in v0.2.0
type StatusRetryIndividual struct {
Response []Response
}
StatusRetryIndividual indicates that the batch operation failed in a way that requires retrying each message individually. This typically occurs when the batch operation cannot determine the success/failure status of individual messages. For example - when Server returns 413:ContentTooLarge, we must retry sending the messages individually.
type StatusSuccess ¶ added in v0.2.0
type StatusSuccess struct {
Response []Response
}
StatusSuccess represents a completely successful batch operation where all messages
type TestBatchHandler ¶
type TestBatchHandler struct{}
func NewTestBatchHandler ¶
func NewTestBatchHandler() *TestBatchHandler
func (*TestBatchHandler) ProcessBatch ¶
func (h *TestBatchHandler) ProcessBatch(batch []Message) (ProcessBatchResponse, error)
func (*TestBatchHandler) ProcessOne ¶
func (h *TestBatchHandler) ProcessOne(message Message) Response