Skip to content

API Reference

Full Go documentation is available on pkg.go.dev.

Registration Functions

These generic functions register RPC methods on a Server:

Function Description
Unary[P, R](s, name, handler) Register a unary method returning a result
UnaryVoid[P](s, name, handler) Register a unary method with no result
Producer[P](s, name, outputSchema, handler) Register a producer stream
ProducerWithHeader[P](s, name, outputSchema, headerSchema, handler) Register a producer with a header
Exchange[P](s, name, outputSchema, inputSchema, handler) Register an exchange stream
ExchangeWithHeader[P](s, name, outputSchema, inputSchema, headerSchema, handler) Register an exchange with a header

Handler signatures:

// Unary
func(ctx context.Context, callCtx *CallContext, params P) (R, error)

// UnaryVoid
func(ctx context.Context, callCtx *CallContext, params P) error

// Producer / Exchange
func(ctx context.Context, callCtx *CallContext, params P) (*StreamResult, error)

Server

func NewServer() *Server
Method Description
SetServerID(id string) Set the server identifier included in response metadata
RunStdio() Run the server loop on stdin/stdout
Serve(r io.Reader, w io.Writer) Run the server on any reader/writer pair
ServeWithContext(ctx context.Context, r io.Reader, w io.Writer) Run the server with a context for cancellation

HttpServer

func NewHttpServer(server *Server) *HttpServer
func NewHttpServerWithKey(server *Server, signingKey []byte) *HttpServer
func RegisterStateType(v interface{})
Method Description
SetTokenTTL(d time.Duration) Set state token maximum age
ServeHTTP(w http.ResponseWriter, r *http.Request) Implements http.Handler

Stream Interfaces

ProducerState

type ProducerState interface {
    Produce(ctx context.Context, out *OutputCollector, callCtx *CallContext) error
}

ExchangeState

type ExchangeState interface {
    Exchange(ctx context.Context, input arrow.RecordBatch, out *OutputCollector, callCtx *CallContext) error
}

StreamResult

Returned by producer/exchange init handlers:

type StreamResult struct {
    OutputSchema *arrow.Schema
    State        interface{}      // ProducerState or ExchangeState
    InputSchema  *arrow.Schema    // exchange only; nil for producers
    Header       ArrowSerializable // optional header sent before data
}

OutputCollector

Method Description
Emit(batch arrow.RecordBatch) error Emit a pre-built RecordBatch
EmitArrays(arrays []arrow.Array, numRows int64) error Build and emit a batch from arrays
EmitMap(data map[string][]interface{}) error Build and emit a batch from column maps
Finish() error Signal end-of-stream (producer only)
Finished() bool Whether Finish() has been called
ClientLog(level LogLevel, message string, extras ...KV) error Emit a log batch to the client

ArrowSerializable

type ArrowSerializable interface {
    ArrowSchema() *arrow.Schema
}

CallContext

type CallContext struct {
    Ctx       context.Context
    RequestID string
    ServerID  string
    Method    string
    LogLevel  LogLevel
}
Method Description
ClientLog(level LogLevel, msg string, extras ...KV) Record a log message for the client

RpcError

type RpcError struct {
    Type      string
    Message   string
    Traceback string
    RequestID string
}
Method Description
Error() string Returns error string
Is(target error) bool Supports errors.Is

Sentinel: ErrRpc — use with errors.Is(err, vgirpc.ErrRpc)

Request

type Request struct {
    Method    string
    Version   string
    RequestID string
    LogLevel  string
    Batch     arrow.RecordBatch
    Metadata  map[string]string
}

Logging

LogLevel

type LogLevel string

const (
    LogException LogLevel = "exception"
    LogError     LogLevel = "error"
    LogWarn      LogLevel = "warn"
    LogInfo      LogLevel = "info"
    LogDebug     LogLevel = "debug"
    LogTrace     LogLevel = "trace"
)

KV

type KV struct {
    Key   string
    Value string
}

Method Types

type MethodType int

const (
    MethodUnary    MethodType = iota
    MethodProducer
    MethodExchange
)

Batch Kinds

type BatchKind int

const (
    BatchData            BatchKind = iota
    BatchLog
    BatchError
    BatchExternalPointer
    BatchShmPointer
    BatchStateToken
)

Metadata Keys

Constant Value
MetaMethod vgi_rpc.method
MetaRequestVersion vgi_rpc.version
MetaRequestID vgi_rpc.request_id
MetaLogLevel vgi_rpc.log_level
MetaLogMessage vgi_rpc.log_message
MetaLogExtra vgi_rpc.log_extra
MetaServerID vgi_rpc.server_id
MetaStreamState vgi_rpc.stream_state
MetaProtocolName vgi_rpc.protocol_name
MetaDescribeVersion vgi_rpc.describe_version
ProtocolVersion "1"
DescribeVersion "2"

Wire Functions

Function Description
ReadRequest(r io.Reader) (*Request, error) Read one IPC stream and parse the request
WriteUnaryResponse(w, schema, logs, result, serverID, requestID) Write a unary response
WriteErrorResponse(w, schema, err, serverID, requestID) Write an error response
WriteVoidResponse(w, logs, serverID, requestID) Write a void response