Skip to content

Commit e6e5587

Browse files
kylezeroshade
authored andcommitted
add option to read custom metadata from record batch message in ipc reader
1 parent 4acb3d1 commit e6e5587

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

arrow/ipc/ipc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type config struct {
7272
noAutoSchema bool
7373
emitDictDeltas bool
7474
minSpaceSavings float64
75+
readCustomMetadata bool
7576
}
7677

7778
func newConfig(opts ...Option) *config {
@@ -93,6 +94,13 @@ func newConfig(opts ...Option) *config {
9394
// and streams.
9495
type Option func(*config)
9596

97+
// WithCustomRecordBatchMetadata allows returning custom metadata for RecordBatch.
98+
func WithCustomRecordBatchMetadata(cm bool) Option {
99+
return func(cfg *config) {
100+
cfg.readCustomMetadata = cm
101+
}
102+
}
103+
96104
// WithFooterOffset specifies the Arrow footer position in bytes.
97105
func WithFooterOffset(offset int64) Option {
98106
return func(cfg *config) {

arrow/ipc/reader.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Reader struct {
4141

4242
refCount atomic.Int64
4343
rec arrow.RecordBatch
44+
meta *arrow.Metadata
4445
err error
4546

4647
// types dictTypeMap
@@ -50,6 +51,7 @@ type Reader struct {
5051
swapEndianness bool
5152
ensureNativeEndian bool
5253
expectedSchema *arrow.Schema
54+
readCustomMetadata bool
5355

5456
mem memory.Allocator
5557
}
@@ -76,6 +78,7 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (reader *Reader
7678
mem: cfg.alloc,
7779
ensureNativeEndian: cfg.ensureNativeEndian,
7880
expectedSchema: cfg.schema,
81+
readCustomMetadata: cfg.readCustomMetadata,
7982
}
8083
rr.refCount.Add(1)
8184

@@ -194,6 +197,9 @@ func (r *Reader) Next() bool {
194197
r.rec.Release()
195198
r.rec = nil
196199
}
200+
if r.meta != nil {
201+
r.meta = nil
202+
}
197203

198204
if r.err != nil || r.done {
199205
return false
@@ -275,7 +281,14 @@ func (r *Reader) next() bool {
275281
r.err = fmt.Errorf("arrow/ipc: invalid message type (got=%v, want=%v", got, want)
276282
return false
277283
}
278-
284+
if r.readCustomMetadata {
285+
rootMsg := flatbuf.GetRootAsMessage(msg.meta.Bytes(), 0)
286+
meta, err := metadataFromFB(rootMsg)
287+
if err != nil {
288+
panic(err)
289+
}
290+
r.meta = &meta
291+
}
279292
r.rec = newRecordBatch(r.schema, &r.memo, msg.meta, msg.body, r.swapEndianness, r.mem)
280293
return true
281294
}
@@ -287,6 +300,12 @@ func (r *Reader) RecordBatch() arrow.RecordBatch {
287300
return r.rec
288301
}
289302

303+
// RecordBatchCustomMetadata returns the current record batch custom metadata from the
304+
// underlying stream.
305+
func (r *Reader) RecordBatchCustomMetadata() (*arrow.Metadata, error) {
306+
return r.meta, nil
307+
}
308+
290309
// Record returns the current record that has been extracted from the
291310
// underlying stream.
292311
// It is valid until the next call to Next.
@@ -303,7 +322,9 @@ func (r *Reader) Read() (arrow.RecordBatch, error) {
303322
r.rec.Release()
304323
r.rec = nil
305324
}
306-
325+
if r.meta != nil {
326+
r.meta = nil
327+
}
307328
if !r.next() {
308329
if r.done && r.err == nil {
309330
return nil, io.EOF

0 commit comments

Comments
 (0)