Streaming¶
vgi-rpc-go supports two streaming patterns: producer (server pushes data) and exchange (lockstep bidirectional). Both use the OutputCollector to emit batches and the StreamResult to carry state.
Producer Streams¶
A producer stream is a server-driven flow of output batches initiated by a single request. The handler returns a *StreamResult containing a ProducerState:
type counterState struct {
Count int
Current int
}
func (s *counterState) Produce(ctx context.Context, out *vgirpc.OutputCollector, callCtx *vgirpc.CallContext) error {
if s.Current >= s.Count {
return out.Finish() // signal end-of-stream
}
// ... build arrays ...
if err := out.EmitArrays(arrays, numRows); err != nil {
return err
}
s.Current++
return nil
}
The server calls Produce in a lockstep loop: read one tick from the client, call Produce, flush all output, repeat. Each call must either emit exactly one data batch or call out.Finish().
Register with vgirpc.Producer or vgirpc.ProducerWithHeader.
Exchange Streams¶
An exchange stream processes client-sent input batches one at a time. The handler returns a *StreamResult containing an ExchangeState:
type scaleState struct {
Factor float64
}
func (s *scaleState) Exchange(ctx context.Context, input arrow.RecordBatch, out *vgirpc.OutputCollector, callCtx *vgirpc.CallContext) error {
// ... process input, build output arrays ...
return out.EmitArrays(arrays, numRows)
}
Each Exchange call must emit exactly one data batch. It must NOT call out.Finish() — the client controls stream termination.
Register with vgirpc.Exchange or vgirpc.ExchangeWithHeader.
OutputCollector¶
The OutputCollector enforces the one-data-batch-per-call rule and supports:
| Method | Description |
|---|---|
Emit(batch) |
Emit a pre-built RecordBatch |
EmitArrays(arrays, numRows) |
Build a batch from arrays using the output schema |
EmitMap(data) |
Build a batch from column name/value pairs |
Finish() |
Signal end-of-stream (producer only) |
ClientLog(level, message, extras...) |
Emit a log batch |
StreamResult¶
The StreamResult returned by init handlers carries:
| Field | Description |
|---|---|
OutputSchema |
The Arrow schema for output batches |
State |
A ProducerState or ExchangeState |
InputSchema |
For exchange methods; nil for producers |
Header |
An optional ArrowSerializable value sent before data |
Stream Headers¶
Both producer and exchange methods can return a header — an ArrowSerializable value sent as a separate IPC stream before the main data stream. Use ProducerWithHeader or ExchangeWithHeader to register:
type MyHeader struct {
TotalExpected int64 `arrow:"total_expected"`
Description string `arrow:"description"`
}
func (h MyHeader) ArrowSchema() *arrow.Schema { /* ... */ }
Set the Header field on StreamResult in your init handler: