From a5e270dbb9c1233dd865374899aefeaa869f43b7 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Jul 2022 23:03:51 -0700 Subject: [PATCH 01/10] chore: Fixes and Readme for python<>go interface Signed-off-by: Achal Shah --- go/README.md | 110 +++++++++++++++++++++++++++++++++ go/embedded/online_features.go | 3 + 2 files changed, 113 insertions(+) create mode 100644 go/README.md diff --git a/go/README.md b/go/README.md new file mode 100644 index 00000000000..3f2aed1e789 --- /dev/null +++ b/go/README.md @@ -0,0 +1,110 @@ +This dir contains the go logic that's executed by the `EmbeddedOnlineFeatureServer` from python. + +## Building and Linking +[gopy](https://github.com/go-python/gopy) generates (and compiles) a CPython extension module from a go package. That's what we're using here, as visible in [setup.py](../setup.py) + +Under the hood, gopy invoked `go build`, and then templates `cgo` stubs for the go module that exposes the public functions from the Go module as C functions. +For our project, this stuff can be found at `sdk/python/feast/embedded_go/lib/embedded.go` & `sdk/python/feast/embedded_go/lib/embedded_go.h` after running `python setup.py develop` + +## Arrow memory management +Understanding this is the trickiest part of this integration. + +At a high level, when using the Python<>Go integration, the python layer exports request data into an [Arrow Record batch](https://arrow.apache.org/docs/python/data.html) which is transferred to go using Arrow's zero copy mechanism. +Similarly, the go layer converts feature values read from the online store into a Record Batch that's exported to Python using the same mechanics. + +The first thing to note is that from the python perspective, all the export logic assumes that we're exporting to & importing from C, not Go. This is because pyarrow only interops with C, and the fact we're using Go is an implementation detail not relevant to the python layer. + +### Export Entities & Request data from Python to Go +The code exporting to C is this, in [online_feature_service.py](../sdk/python/feast/embedded_go/online_features_service.py) +``` +( + entities_c_schema, + entities_ptr_schema, + entities_c_array, + entities_ptr_array, +) = allocate_schema_and_array() +( + req_data_c_schema, + req_data_ptr_schema, + req_data_c_array, + req_data_ptr_array, +) = allocate_schema_and_array() + +batch, schema = map_to_record_batch(entities, join_keys_types) +schema._export_to_c(entities_ptr_schema) +batch._export_to_c(entities_ptr_array) + +batch, schema = map_to_record_batch(request_data) +schema._export_to_c(req_data_ptr_schema) +batch._export_to_c(req_data_ptr_array) +``` + +Under the hood, `allocate_schema_and_array` allocates a pointer (`struct ArrowSchema*` and `struct ArrowArray*`) in native memory (i.e. the C layer) using cffi. +Next, the RecordBatch us exporting to this pointer using [`_export_to_c`](https://github.com/apache/arrow/blob/master/python/pyarrow/table.pxi#L2509), which uses [`ExportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ExportRecordBatchRK11RecordBatchP10ArrowArrayP11ArrowSchema) under the hood. + +As per the documentation for ExportRecordBatch: +> Status ExportRecordBatch(const RecordBatch &batch, struct ArrowArray *out, struct ArrowSchema *out_schema = NULLPTR) +> Export C++ RecordBatch using the C data interface format. +> +> The record batch is exported as if it were a struct array. The resulting ArrowArray struct keeps the record batch data and buffers alive until its release callback is called by the consumer. + +This is why `GetOnlineFeatures()` in `online_features.go` calls `record.Release()` the following: +``` +resp, err := s.fs.GetOnlineFeatures( + context.Background(), + featureRefs, + featureService, + entitiesProto, + requestDataProto, + fullFeatureNames) + +entitiesRecord.Release() +requestDataRecords.Release() + +``` + +Additionally, we need to pass in a pair of pointers into which are set in `GetOnlineFeatures()` so that the resultant feature values can be passed back to Python (via the C layer) using zero-copy semantics. +That happens in a snipper like this: +``` +( + features_c_schema, + features_ptr_schema, + features_c_array, + features_ptr_array, +) = allocate_schema_and_array() + +... + +record_batch = pa.RecordBatch._import_from_c( + features_ptr_array, features_ptr_schema +) +``` + +The corresponding Go code that exports this data is: +``` +result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows)) + +cdata.ExportArrowRecordBatch(result, + cdata.ArrayFromPtr(output.DataPtr), + cdata.SchemaFromPtr(output.SchemaPtr)) + +``` + +The documentation for `ExportArrowRecordBatch` is great - it has this super useful caveat: + +> // The release function on the populated CArrowArray will properly decrease the reference counts, +> // and release the memory if the record has already been released. But since this must be explicitly +> // done, make sure it is released so that you do not create a memory leak. + +This implies that the reciever is on the hook for explicitly releasing this memory. + +However, we're using `_import_from_c`, which uses [`ImportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ImportRecordBatchP10ArrowArrayP11ArrowSchema), which implies that the receiver of the RecordBatch is the new owner of the data. +This is wrapped by pyarrow - and when the corresponding python object goes out of scope, it should clean up the underlying record batch. + +Another thing to note (which I'm not sure may be the source of issues) is that Arrow has the concept of [Memory Pools](https://arrow.apache.org/docs/python/api/memory.html#memory-pools). +Memory pools can be set in python as well as in Go. I *believe* that if we use the CGoArrowAllocator, that uses whatever pool C++ uses, which should be the same as the one used by PyArrow. But this should be vetted. + + +### References +- https://arrow.apache.org/docs/format/CDataInterface.html#memory-management +- https://arrow.apache.org/docs/python/memory.html \ No newline at end of file diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index e5860507e45..5cc25c72b34 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -174,6 +174,9 @@ func (s *OnlineFeatureService) GetOnlineFeatures( requestDataProto, fullFeatureNames) + entitiesRecord.Release() + requestDataRecords.Release() + if err != nil { return err } From 492dc91175b57517486a9b6a06ad90163405460b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 13 Jul 2022 11:19:35 -0700 Subject: [PATCH 02/10] cr Signed-off-by: Achal Shah --- go/README.md | 28 +++++++++---------- go/embedded/online_features.go | 5 ++-- .../embedded_go/online_features_service.py | 1 + 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/go/README.md b/go/README.md index 3f2aed1e789..d87b0882cb3 100644 --- a/go/README.md +++ b/go/README.md @@ -40,7 +40,7 @@ batch._export_to_c(req_data_ptr_array) ``` Under the hood, `allocate_schema_and_array` allocates a pointer (`struct ArrowSchema*` and `struct ArrowArray*`) in native memory (i.e. the C layer) using cffi. -Next, the RecordBatch us exporting to this pointer using [`_export_to_c`](https://github.com/apache/arrow/blob/master/python/pyarrow/table.pxi#L2509), which uses [`ExportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ExportRecordBatchRK11RecordBatchP10ArrowArrayP11ArrowSchema) under the hood. +Next, the RecordBatch exports to this pointer using [`_export_to_c`](https://github.com/apache/arrow/blob/master/python/pyarrow/table.pxi#L2509), which uses [`ExportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ExportRecordBatchRK11RecordBatchP10ArrowArrayP11ArrowSchema) under the hood. As per the documentation for ExportRecordBatch: > Status ExportRecordBatch(const RecordBatch &batch, struct ArrowArray *out, struct ArrowSchema *out_schema = NULLPTR) @@ -48,22 +48,22 @@ As per the documentation for ExportRecordBatch: > > The record batch is exported as if it were a struct array. The resulting ArrowArray struct keeps the record batch data and buffers alive until its release callback is called by the consumer. -This is why `GetOnlineFeatures()` in `online_features.go` calls `record.Release()` the following: +This is why `GetOnlineFeatures()` in `online_features.go` calls `record.Release()` as below: ``` -resp, err := s.fs.GetOnlineFeatures( - context.Background(), - featureRefs, - featureService, - entitiesProto, - requestDataProto, - fullFeatureNames) - -entitiesRecord.Release() -requestDataRecords.Release() - +entitiesRecord, err := readArrowRecord(entities) +if err != nil { + return err +} +defer entitiesRecord.Release() +... +requestDataRecords, err := readArrowRecord(requestData) +if err != nil { + return err +} +defer requestDataRecords.Release() ``` -Additionally, we need to pass in a pair of pointers into which are set in `GetOnlineFeatures()` so that the resultant feature values can be passed back to Python (via the C layer) using zero-copy semantics. +Additionally, we need to pass in a pair of pointers to `GetOnlineFeatures()` that are populated by the Go layer, and the resultant feature values can be passed back to Python (via the C layer) using zero-copy semantics. That happens in a snipper like this: ``` ( diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 5cc25c72b34..00e8d23d147 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -143,6 +143,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures( if err != nil { return err } + defer entitiesRecord.Release() numRows := entitiesRecord.Column(0).Len() @@ -155,6 +156,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures( if err != nil { return err } + defer requestDataRecords.Release() requestDataProto, err := recordToProto(requestDataRecords) if err != nil { @@ -174,9 +176,6 @@ func (s *OnlineFeatureService) GetOnlineFeatures( requestDataProto, fullFeatureNames) - entitiesRecord.Release() - requestDataRecords.Release() - if err != nil { return err } diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index 3081843778c..d9b34b2414c 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -147,6 +147,7 @@ def get_online_features( features_ptr_array, features_ptr_schema ) resp = record_batch_to_online_response(record_batch) + del record_batch return OnlineResponse(resp) def start_grpc_server( From f205e41dde9b3db2b285a0a1fbc1077f7617a8af Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 13 Jul 2022 14:27:13 -0700 Subject: [PATCH 03/10] Switch to cgo allocator Signed-off-by: Felix Wang --- go/internal/feast/featurestore.go | 2 +- go/internal/feast/server/logging/memorybuffer.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index ad1f94a4ba2..ed38411460a 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -113,7 +113,7 @@ func (fs *FeatureStore) GetOnlineFeatures( } result := make([]*onlineserving.FeatureVector, 0) - arrowMemory := memory.NewGoAllocator() + arrowMemory := memory.NewCgoArrowAllocator() featureViews := make([]*model.FeatureView, len(requestedFeatureViews)) index := 0 for _, featuresAndView := range requestedFeatureViews { diff --git a/go/internal/feast/server/logging/memorybuffer.go b/go/internal/feast/server/logging/memorybuffer.go index 9ffb0ff73b8..c9f00218dfc 100644 --- a/go/internal/feast/server/logging/memorybuffer.go +++ b/go/internal/feast/server/logging/memorybuffer.go @@ -128,7 +128,7 @@ func getArrowSchema(schema *FeatureServiceSchema) (*arrow.Schema, error) { // and writes them to arrow table. // Returns arrow table that contains all of the logs in columnar format. func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) { - arrowMemory := memory.NewGoAllocator() + arrowMemory := memory.NewCgoArrowAllocator() numRows := len(b.logs) columns := make(map[string][]*types.Value) From ae71efa50aedb4e5b6b8513e9cd705fe27430afc Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 13 Jul 2022 14:38:37 -0700 Subject: [PATCH 04/10] Fix memory leak Signed-off-by: Felix Wang --- go/embedded/online_features.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 00e8d23d147..4f00d9b9a9f 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -33,6 +33,11 @@ type OnlineFeatureService struct { grpcStopCh chan os.Signal httpStopCh chan os.Signal + statusColumnBuildersToRelease []*array.Int32Builder + tsColumnBuildersToRelease []*array.Int64Builder + arraysToRelease []arrow.Array + resultsToRelease []arrow.Record + err error } @@ -180,6 +185,24 @@ func (s *OnlineFeatureService) GetOnlineFeatures( return err } + // Release all objects that are no longer required. + for _, statusColumnBuilderToRelease := range s.statusColumnBuildersToRelease { + statusColumnBuilderToRelease.Release() + } + for _, tsColumnBuilderToRelease := range s.tsColumnBuildersToRelease { + tsColumnBuilderToRelease.Release() + } + for _, arrayToRelease := range s.arraysToRelease { + arrayToRelease.Release() + } + for _, resultsToRelease := range s.resultsToRelease { + resultsToRelease.Release() + } + s.statusColumnBuildersToRelease = nil + s.tsColumnBuildersToRelease = nil + s.arraysToRelease = nil + s.resultsToRelease = nil + outputFields := make([]arrow.Field, 0) outputColumns := make([]arrow.Array, 0) pool := memory.NewCgoArrowAllocator() @@ -212,9 +235,16 @@ func (s *OnlineFeatureService) GetOnlineFeatures( } tsColumn := tsColumnBuilder.NewArray() outputColumns = append(outputColumns, tsColumn) + + // Mark builders and arrays for release. + s.statusColumnBuildersToRelease = append(s.statusColumnBuildersToRelease, statusColumnBuilder) + s.tsColumnBuildersToRelease = append(s.tsColumnBuildersToRelease, tsColumnBuilder) + s.arraysToRelease = append(s.arraysToRelease, statusColumn) + s.arraysToRelease = append(s.arraysToRelease, tsColumn) } result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows)) + s.resultsToRelease = append(s.resultsToRelease, result) cdata.ExportArrowRecordBatch(result, cdata.ArrayFromPtr(output.DataPtr), From a6232d964cf24d27eca5b77247fb2a03354eba75 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 13 Jul 2022 16:14:40 -0700 Subject: [PATCH 05/10] Fix another memory leak Signed-off-by: Felix Wang --- go/embedded/online_features.go | 5 ++--- go/internal/feast/onlineserving/serving.go | 10 ++++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 4f00d9b9a9f..7fd34d16e40 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -241,14 +241,13 @@ func (s *OnlineFeatureService) GetOnlineFeatures( s.tsColumnBuildersToRelease = append(s.tsColumnBuildersToRelease, tsColumnBuilder) s.arraysToRelease = append(s.arraysToRelease, statusColumn) s.arraysToRelease = append(s.arraysToRelease, tsColumn) + s.arraysToRelease = append(s.arraysToRelease, featureVector.Values) } result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows)) s.resultsToRelease = append(s.resultsToRelease, result) - cdata.ExportArrowRecordBatch(result, - cdata.ArrayFromPtr(output.DataPtr), - cdata.SchemaFromPtr(output.SchemaPtr)) + cdata.ExportArrowRecordBatch(result, cdata.ArrayFromPtr(output.DataPtr), cdata.SchemaFromPtr(output.SchemaPtr)) return nil } diff --git a/go/internal/feast/onlineserving/serving.go b/go/internal/feast/onlineserving/serving.go index e2a2df923be..3c6f5451537 100644 --- a/go/internal/feast/onlineserving/serving.go +++ b/go/internal/feast/onlineserving/serving.go @@ -415,6 +415,8 @@ func KeepOnlyRequestedFeatures( vectorsByName := make(map[string]*FeatureVector) expectedVectors := make([]*FeatureVector, 0) + usedVectors := make(map[string]bool) + for _, vector := range vectors { vectorsByName[vector.Name] = vector } @@ -438,6 +440,14 @@ func KeepOnlyRequestedFeatures( return nil, fmt.Errorf("requested feature %s can't be retrieved", featureRef) } expectedVectors = append(expectedVectors, vectorsByName[qualifiedName]) + usedVectors[qualifiedName] = true + } + + // Free arrow arrays for vectors that were not used. + for _, vector := range vectors { + if _, ok := usedVectors[vector.Name]; !ok { + vector.Values.Release() + } } return expectedVectors, nil From b0283ed65e0c274336473e11fa0f501776d7caf6 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 14 Jul 2022 12:17:01 -0700 Subject: [PATCH 06/10] Do not use cgo allocator for memory buffers Signed-off-by: Felix Wang --- go/internal/feast/server/logging/memorybuffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/feast/server/logging/memorybuffer.go b/go/internal/feast/server/logging/memorybuffer.go index c9f00218dfc..9ffb0ff73b8 100644 --- a/go/internal/feast/server/logging/memorybuffer.go +++ b/go/internal/feast/server/logging/memorybuffer.go @@ -128,7 +128,7 @@ func getArrowSchema(schema *FeatureServiceSchema) (*arrow.Schema, error) { // and writes them to arrow table. // Returns arrow table that contains all of the logs in columnar format. func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) { - arrowMemory := memory.NewCgoArrowAllocator() + arrowMemory := memory.NewGoAllocator() numRows := len(b.logs) columns := make(map[string][]*types.Value) From 9951db94003db0e2e998e013ce3ac193cc1287e2 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 14 Jul 2022 14:12:15 -0700 Subject: [PATCH 07/10] Switch to cgo allocator for memory buffers Signed-off-by: Felix Wang --- go/internal/feast/server/logging/memorybuffer.go | 2 +- go/internal/feast/server/logging/memorybuffer_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/internal/feast/server/logging/memorybuffer.go b/go/internal/feast/server/logging/memorybuffer.go index 9ffb0ff73b8..c9f00218dfc 100644 --- a/go/internal/feast/server/logging/memorybuffer.go +++ b/go/internal/feast/server/logging/memorybuffer.go @@ -128,7 +128,7 @@ func getArrowSchema(schema *FeatureServiceSchema) (*arrow.Schema, error) { // and writes them to arrow table. // Returns arrow table that contains all of the logs in columnar format. func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) { - arrowMemory := memory.NewGoAllocator() + arrowMemory := memory.NewCgoArrowAllocator() numRows := len(b.logs) columns := make(map[string][]*types.Value) diff --git a/go/internal/feast/server/logging/memorybuffer_test.go b/go/internal/feast/server/logging/memorybuffer_test.go index 94f0f86ef02..b56f8ed9d2a 100644 --- a/go/internal/feast/server/logging/memorybuffer_test.go +++ b/go/internal/feast/server/logging/memorybuffer_test.go @@ -118,7 +118,7 @@ func TestSerializeToArrowTable(t *testing.T) { LogTimestamp: time.Now(), }) - pool := memory.NewGoAllocator() + pool := memory.NewCgoArrowAllocator() builder := array.NewRecordBuilder(pool, b.arrowSchema) defer builder.Release() From 656cb40f02973e7385ac5627cc2f00cbdf2d20c6 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 14 Jul 2022 14:52:44 -0700 Subject: [PATCH 08/10] Switch test to pass Signed-off-by: Felix Wang --- go/internal/feast/server/logging/memorybuffer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/feast/server/logging/memorybuffer_test.go b/go/internal/feast/server/logging/memorybuffer_test.go index b56f8ed9d2a..ba6ba6da0d5 100644 --- a/go/internal/feast/server/logging/memorybuffer_test.go +++ b/go/internal/feast/server/logging/memorybuffer_test.go @@ -159,7 +159,7 @@ func TestSerializeToArrowTable(t *testing.T) { expectedRecord := builder.NewRecord() assert.Nil(t, err) for colIdx := 0; colIdx < int(record.NumCols()); colIdx++ { - assert.Equal(t, expectedRecord.Column(colIdx), record.Column(colIdx), "Columns with idx %d are not equal", colIdx) + assert.Equal(t, true, array.Equal(expectedRecord.Column(colIdx), record.Column(colIdx)), "Columns with idx %d are not equal", colIdx) } } From 9c0b757c0b1056fe87b88bda3108d6d584bfaf29 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 14 Jul 2022 16:53:08 -0700 Subject: [PATCH 09/10] Use more idiomatic way to test truth Signed-off-by: Felix Wang --- go/internal/feast/server/logging/memorybuffer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/feast/server/logging/memorybuffer_test.go b/go/internal/feast/server/logging/memorybuffer_test.go index ba6ba6da0d5..ec83680f4ff 100644 --- a/go/internal/feast/server/logging/memorybuffer_test.go +++ b/go/internal/feast/server/logging/memorybuffer_test.go @@ -159,7 +159,7 @@ func TestSerializeToArrowTable(t *testing.T) { expectedRecord := builder.NewRecord() assert.Nil(t, err) for colIdx := 0; colIdx < int(record.NumCols()); colIdx++ { - assert.Equal(t, true, array.Equal(expectedRecord.Column(colIdx), record.Column(colIdx)), "Columns with idx %d are not equal", colIdx) + assert.True(t, array.Equal(expectedRecord.Column(colIdx), record.Column(colIdx)), "Columns with idx %d are not equal", colIdx) } } From 78b046d4ab8ac7d3b27a45959b3b2d187345e7eb Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 14 Jul 2022 17:05:05 -0700 Subject: [PATCH 10/10] Update docs Signed-off-by: Felix Wang --- go/README.md | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/go/README.md b/go/README.md index d87b0882cb3..0bca470919f 100644 --- a/go/README.md +++ b/go/README.md @@ -1,18 +1,18 @@ -This dir contains the go logic that's executed by the `EmbeddedOnlineFeatureServer` from python. +This directory contains the Go logic that's executed by the `EmbeddedOnlineFeatureServer` from Python. ## Building and Linking -[gopy](https://github.com/go-python/gopy) generates (and compiles) a CPython extension module from a go package. That's what we're using here, as visible in [setup.py](../setup.py) +[gopy](https://github.com/go-python/gopy) generates (and compiles) a CPython extension module from a Go package. That's what we're using here, as visible in [setup.py](../setup.py). -Under the hood, gopy invoked `go build`, and then templates `cgo` stubs for the go module that exposes the public functions from the Go module as C functions. -For our project, this stuff can be found at `sdk/python/feast/embedded_go/lib/embedded.go` & `sdk/python/feast/embedded_go/lib/embedded_go.h` after running `python setup.py develop` +Under the hood, gopy invokes `go build`, and then templates `cgo` stubs for the Go module that exposes the public functions from the Go module as C functions. +For our project, this stuff can be found at `sdk/python/feast/embedded_go/lib/embedded.go` & `sdk/python/feast/embedded_go/lib/embedded_go.h` after running `make compile-go-lib`. ## Arrow memory management Understanding this is the trickiest part of this integration. -At a high level, when using the Python<>Go integration, the python layer exports request data into an [Arrow Record batch](https://arrow.apache.org/docs/python/data.html) which is transferred to go using Arrow's zero copy mechanism. -Similarly, the go layer converts feature values read from the online store into a Record Batch that's exported to Python using the same mechanics. +At a high level, when using the Python<>Go integration, the Python layer exports request data into an [Arrow Record batch](https://arrow.apache.org/docs/python/data.html) which is transferred to Go using Arrow's zero copy mechanism. +Similarly, the Go layer converts feature values read from the online store into a Record Batch that's exported to Python using the same mechanics. -The first thing to note is that from the python perspective, all the export logic assumes that we're exporting to & importing from C, not Go. This is because pyarrow only interops with C, and the fact we're using Go is an implementation detail not relevant to the python layer. +The first thing to note is that from the Python perspective, all the export logic assumes that we're exporting to & importing from C, not Go. This is because pyarrow only interops with C, and the fact we're using Go is an implementation detail not relevant to the Python layer. ### Export Entities & Request data from Python to Go The code exporting to C is this, in [online_feature_service.py](../sdk/python/feast/embedded_go/online_features_service.py) @@ -39,7 +39,7 @@ schema._export_to_c(req_data_ptr_schema) batch._export_to_c(req_data_ptr_array) ``` -Under the hood, `allocate_schema_and_array` allocates a pointer (`struct ArrowSchema*` and `struct ArrowArray*`) in native memory (i.e. the C layer) using cffi. +Under the hood, `allocate_schema_and_array` allocates a pointer (`struct ArrowSchema*` and `struct ArrowArray*`) in native memory (i.e. the C layer) using `cffi`. Next, the RecordBatch exports to this pointer using [`_export_to_c`](https://github.com/apache/arrow/blob/master/python/pyarrow/table.pxi#L2509), which uses [`ExportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ExportRecordBatchRK11RecordBatchP10ArrowArrayP11ArrowSchema) under the hood. As per the documentation for ExportRecordBatch: @@ -64,7 +64,7 @@ defer requestDataRecords.Release() ``` Additionally, we need to pass in a pair of pointers to `GetOnlineFeatures()` that are populated by the Go layer, and the resultant feature values can be passed back to Python (via the C layer) using zero-copy semantics. -That happens in a snipper like this: +That happens as follows: ``` ( features_c_schema, @@ -87,10 +87,9 @@ result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int cdata.ExportArrowRecordBatch(result, cdata.ArrayFromPtr(output.DataPtr), cdata.SchemaFromPtr(output.SchemaPtr)) - ``` -The documentation for `ExportArrowRecordBatch` is great - it has this super useful caveat: +The documentation for `ExportArrowRecordBatch` is great. It has this super useful caveat: > // The release function on the populated CArrowArray will properly decrease the reference counts, > // and release the memory if the record has already been released. But since this must be explicitly