drpc

package
v0.0.0-...-92aad25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: BSD-2-Clause-Patent Imports: 17 Imported by: 20

README

dRPC

dRPC is a means of communication between processes local to the same physical system, via a Unix Domain Socket. At any given time a process may act as a client, a server, or both, though each listening dRPC server needs its own Unix Domain Socket.

The server will fail to create the socket if something already exists at that location in the filesystem, even if it is an older incarnation of the socket. Optionally, your application may wish to unlink that filesystem location before creating the socket.

dRPC calls are defined by module and method identifiers. The dRPC module can be thought of as a package of related functions. The dRPC method indicates a specific function to be executed by the server. If the method requires an input, it should be marshalled in the body of the dRPC call. The server will respond with a dRPC response structure, which may include a method-specific response in the body.

The DAOS dRPC implementation is dependent on Protocol Buffers to define the structures passed over the dRPC channel. Any structure to be sent via dRPC as part of a call or response must be defined in a .proto file.

Go API

In Go, the drpc package includes both client and server functionality, which is outlined below. For documentation of the C API, see here.

The dRPC call and response are represented by the Protobuf-generated drpc.Call and drpc.Response structures.

Go Client

The dRPC client is represented by the drpc.ClientConnection object.

Basic Client Workflow
  1. Create a new client connection with the path to the dRPC server's Unix Domain Socket:
    conn := drpc.NewClientConnection("/var/run/my_socket.sock")
    
  2. Connect to the dRPC server:
    err := conn.Connect()
    
  3. Create your drpc.Call and send it to the server:
    call := drpc.Call{}
    // Set up the Call with module, method, and body
    resp, err := drpc.SendMsg(call)
    
    An error indicates that the drpc.Call couldn't be sent, or an invalid drpc.Response was received. If there is no error returned, the content of the drpc.Response should still be checked for errors reported by the server.
  4. Send as many calls as desired.
  5. Close the connection when finished:
    conn.Close()
    
Go Server

The dRPC server is represented by the drpc.DomainSocketServer object.

Individual dRPC modules must be registered with the server in order to handle incoming dRPC calls for that module. To create a dRPC module, create an object that implements the drpc.Module interface. The module ID must be unique.

Basic Server Workflow
  1. Create the new DomainSocketServer with the server's Unix Domain Socket (file permissions 0600):
    drpcServer, err := drpc.NewDomainSocketServer(log, "/var/run/my_socket.sock", 0600)
    
  2. Register the dRPC modules that the server needs to handle:
    drpcServer.RegisterRPCModule(&MyExampleModule{})
    drpcServer.RegisterRPCModule(&AnotherExampleModule{})
    
  3. Start the server to kick off the Goroutine to start listening for and handling incoming connections:
    err = drpc.Start()
    
  4. When it is time to shut down the server, close down the listening Goroutine:
    drpcServer.Shutdown()
    

Documentation

Index

Constants

View Source
const (
	// MaxChunkSize is the maximum drpc message chunk size that may be sent.
	// Using a packetsocket over the unix domain socket means that we receive
	// a whole message at a time without knowing its size.
	MaxChunkSize = C.UNIXCOMM_MAXMSGSIZE
)

Variables

View Source
var (
	Status_name = map[int32]string{
		0: "SUCCESS",
		1: "SUBMITTED",
		2: "FAILURE",
		3: "UNKNOWN_MODULE",
		4: "UNKNOWN_METHOD",
		5: "FAILED_UNMARSHAL_CALL",
		6: "FAILED_UNMARSHAL_PAYLOAD",
		7: "FAILED_MARSHAL",
	}
	Status_value = map[string]int32{
		"SUCCESS":                  0,
		"SUBMITTED":                1,
		"FAILURE":                  2,
		"UNKNOWN_MODULE":           3,
		"UNKNOWN_METHOD":           4,
		"FAILED_UNMARSHAL_CALL":    5,
		"FAILED_UNMARSHAL_PAYLOAD": 6,
		"FAILED_MARSHAL":           7,
	}
)

Enum value maps for Status.

Functions

func FaultSocketFileInUse

func FaultSocketFileInUse(path string) *fault.Fault

FaultSocketFileInUse indicates that the dRPC socket file was already in use when we tried to start the dRPC server.

func Marshal

func Marshal(message proto.Message) ([]byte, error)

Marshal is a utility function that can be used by dRPC method handlers to marshal their method-specific response to be passed back to the ModuleService.

Types

type Call

type Call struct {
	Module   int32  `protobuf:"varint,1,opt,name=module,proto3" json:"module,omitempty"`     // ID of the module to process the call.
	Method   int32  `protobuf:"varint,2,opt,name=method,proto3" json:"method,omitempty"`     // ID of the method to be executed.
	Sequence int64  `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"` // Sequence number for matching a response to this call.
	Body     []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"`          // Input payload to be used by the method.
	// contains filtered or unexported fields
}

Call describes a function call to be executed over the dRPC channel.

func (*Call) Descriptor deprecated

func (*Call) Descriptor() ([]byte, []int)

Deprecated: Use Call.ProtoReflect.Descriptor instead.

func (*Call) GetBody

func (x *Call) GetBody() []byte

func (*Call) GetMethod

func (x *Call) GetMethod() int32

func (*Call) GetModule

func (x *Call) GetModule() int32

func (*Call) GetSequence

func (x *Call) GetSequence() int64

func (*Call) ProtoMessage

func (*Call) ProtoMessage()

func (*Call) ProtoReflect

func (x *Call) ProtoReflect() protoreflect.Message

func (*Call) Reset

func (x *Call) Reset()

func (*Call) String

func (x *Call) String() string

type ClientConnection

type ClientConnection struct {
	sync.Mutex
	// contains filtered or unexported fields
}

ClientConnection represents a client connection to a dRPC server

func (*ClientConnection) Close

func (c *ClientConnection) Close() error

Close shuts down the connection to the Unix Domain Socket

func (*ClientConnection) Connect

func (c *ClientConnection) Connect(ctx context.Context) error

Connect opens a connection to the internal Unix Domain Socket path

func (*ClientConnection) GetSocketPath

func (c *ClientConnection) GetSocketPath() string

GetSocketPath returns client dRPC socket file path.

func (*ClientConnection) IsConnected

func (c *ClientConnection) IsConnected() bool

IsConnected indicates whether the client connection is currently active

func (*ClientConnection) SendMsg

func (c *ClientConnection) SendMsg(ctx context.Context, msg *Call) (*Response, error)

SendMsg sends a message to the connected dRPC server, and returns the response to the caller.

type DomainSocketClient

type DomainSocketClient interface {
	sync.Locker
	IsConnected() bool
	Connect(context.Context) error
	Close() error
	SendMsg(context.Context, *Call) (*Response, error)
	GetSocketPath() string
}

DomainSocketClient is the interface to a dRPC client communicating over a Unix Domain Socket

func NewClientConnection

func NewClientConnection(socket string) DomainSocketClient

NewClientConnection creates a new dRPC client

type DomainSocketServer

type DomainSocketServer struct {
	// contains filtered or unexported fields
}

DomainSocketServer is the object that listens for incoming dRPC connections, maintains the connections for sessions, and manages the message processing.

func NewDomainSocketServer

func NewDomainSocketServer(log logging.Logger, sock string, sockMode os.FileMode) (*DomainSocketServer, error)

NewDomainSocketServer returns a new unstarted instance of a DomainSocketServer for the specified unix domain socket path.

func (*DomainSocketServer) Listen

func (d *DomainSocketServer) Listen(ctx context.Context)

Listen listens for incoming connections on the UNIX domain socket and creates individual sessions for each one.

func (*DomainSocketServer) RegisterRPCModule

func (d *DomainSocketServer) RegisterRPCModule(mod Module)

RegisterRPCModule takes a Module and associates it with the given DomainSocketServer so it can be used to process incoming dRPC calls.

func (*DomainSocketServer) Start

func (d *DomainSocketServer) Start(ctx context.Context) error

Start sets up the dRPC server socket and kicks off the listener goroutine.

type Failure

type Failure struct {
	// contains filtered or unexported fields
}

Failure represents a dRPC protocol failure.

func MarshalingFailure

func MarshalingFailure() Failure

MarshalingFailure creates a Failure for a failed attempt at marshaling a response.

func NewFailure

func NewFailure(status Status) Failure

NewFailure returns a Failure with the given status and a corresponding message.

func NewFailureWithMessage

func NewFailureWithMessage(message string) Failure

NewFailureWithMessage returns a generic failure with a custom message

func UnknownMethodFailure

func UnknownMethodFailure() Failure

UnknownMethodFailure creates a Failure for unknown dRPC method.

func UnknownModuleFailure

func UnknownModuleFailure() Failure

UnknownModuleFailure creates a Failure for unknown dRPC module.

func UnmarshalingCallFailure

func UnmarshalingCallFailure() Failure

UnmarshalingCallFailure creates a Failure for a failed attempt to unmarshal an incoming call.

func UnmarshalingPayloadFailure

func UnmarshalingPayloadFailure() Failure

UnmarshalingPayloadFailure creates a Failure for a failed attempt to unmarshal a call payload.

func (Failure) Error

func (e Failure) Error() string

Error provides a descriptive string associated with the failure.

func (Failure) GetStatus

func (e Failure) GetStatus() Status

GetStatus provides a dRPC status code associated with the failure.

type Method

type Method interface {
	ID() int32
	Module() int32
	String() string
}

Method is an interface that allows methods to describe themselves.

type Module

type Module interface {
	HandleCall(context.Context, *Session, Method, []byte) ([]byte, error)
	ID() int32
	GetMethod(int32) (Method, error)
	String() string
}

Module is an interface that describes a dRPC module's capabilities.

type ModuleService

type ModuleService struct {
	// contains filtered or unexported fields
}

ModuleService is the collection of Modules used by DomainSocketServer to be used to process messages.

func NewModuleService

func NewModuleService(log logging.Logger) *ModuleService

NewModuleService creates an initialized ModuleService instance

func (*ModuleService) GetModule

func (r *ModuleService) GetModule(id int32) (Module, bool)

GetModule fetches the module for the given ID. Returns true if found, false otherwise.

func (*ModuleService) ProcessMessage

func (r *ModuleService) ProcessMessage(ctx context.Context, session *Session, msgBytes []byte) ([]byte, error)

ProcessMessage is the main entry point into the ModuleService. It accepts a marshaled drpc.Call instance, processes it, calls the handler in the appropriate Module, and marshals the result into the body of a drpc.Response.

func (*ModuleService) RegisterModule

func (r *ModuleService) RegisterModule(mod Module)

RegisterModule will take in a type that implements the Module interface and ensure that no other module is already registered with that module identifier.

type Response

type Response struct {
	Sequence int64  `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`              // Sequence number of the Call that triggered this response.
	Status   Status `protobuf:"varint,2,opt,name=status,proto3,enum=drpc.Status" json:"status,omitempty"` // High-level status of the RPC. If SUCCESS, method-specific status may be included in the body.
	Body     []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`                       // Output payload produced by the method.
	// contains filtered or unexported fields
}

Response describes the result of a dRPC call.

func (*Response) Descriptor deprecated

func (*Response) Descriptor() ([]byte, []int)

Deprecated: Use Response.ProtoReflect.Descriptor instead.

func (*Response) GetBody

func (x *Response) GetBody() []byte

func (*Response) GetSequence

func (x *Response) GetSequence() int64

func (*Response) GetStatus

func (x *Response) GetStatus() Status

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) ProtoReflect

func (x *Response) ProtoReflect() protoreflect.Message

func (*Response) Reset

func (x *Response) Reset()

func (*Response) String

func (x *Response) String() string

type Session

type Session struct {
	Conn net.Conn
	// contains filtered or unexported fields
}

Session represents an individual client connection to the Domain Socket Server.

func NewSession

func NewSession(conn net.Conn, svc *ModuleService) *Session

NewSession creates a new dRPC Session object

func (*Session) Close

func (s *Session) Close()

Close closes the session

func (*Session) ProcessIncomingMessage

func (s *Session) ProcessIncomingMessage(ctx context.Context) error

ProcessIncomingMessage listens for an incoming message on the session, calls its handler, and sends the response.

type Status

type Status int32

Status represents the valid values for a response status.

const (
	Status_SUCCESS                  Status = 0 // The method executed and provided a response payload, if needed. Otherwise, the method simply succeeded.
	Status_SUBMITTED                Status = 1 // The method has been queued for asynchronous execution.
	Status_FAILURE                  Status = 2 // The method has failed and did not provide a response payload.
	Status_UNKNOWN_MODULE           Status = 3 // The requested module does not exist.
	Status_UNKNOWN_METHOD           Status = 4 // The requested method does not exist.
	Status_FAILED_UNMARSHAL_CALL    Status = 5 // Could not unmarshal the incoming call.
	Status_FAILED_UNMARSHAL_PAYLOAD Status = 6 // Could not unmarshal the method-specific payload of the incoming call.
	Status_FAILED_MARSHAL           Status = 7 // Generated a response payload, but couldn't marshal it into the response.
)

func ErrorToStatus

func ErrorToStatus(err error) Status

ErrorToStatus translates an error to a dRPC Status. In practice it checks to see if it was a dRPC Failure error, and uses the Status if so. Otherwise it is assumed to be a generic failure.

func (Status) Descriptor

func (Status) Descriptor() protoreflect.EnumDescriptor

func (Status) Enum

func (x Status) Enum() *Status

func (Status) EnumDescriptor deprecated

func (Status) EnumDescriptor() ([]byte, []int)

Deprecated: Use Status.Descriptor instead.

func (Status) Number

func (x Status) Number() protoreflect.EnumNumber

func (Status) String

func (x Status) String() string

func (Status) Type

func (Status) Type() protoreflect.EnumType

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL