From 92fe8fa54e70490dae90a7050be0638e9583df6f Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 31 Mar 2022 14:01:51 -0700 Subject: [PATCH 1/2] support on demand transformatios in go feature server Signed-off-by: pyalex --- go.mod | 7 +- go.sum | 22 +- go/cmd/server/main.go | 6 +- go/cmd/server/server.go | 19 +- go/cmd/server/server_test.go | 2 +- go/embedded/online_features.go | 121 +++- go/internal/feast/basefeatureview.go | 40 +- go/internal/feast/entity.go | 12 +- go/internal/feast/featureservice.go | 4 +- go/internal/feast/featurestore.go | 579 ++++++++++-------- go/internal/feast/featurestore_test.go | 63 +- go/internal/feast/featureview.go | 24 +- go/internal/feast/featureviewprojection.go | 32 +- go/internal/feast/ondemandfeatureview.go | 19 +- go/internal/feast/transformation.go | 200 ++++++ go/types/typeconversion.go | 10 +- go/types/typeconversion_test.go | 2 +- .../embedded_go/online_features_service.py | 267 ++++++-- sdk/python/feast/feature_store.py | 14 +- sdk/python/feast/go_server.py | 297 --------- .../feature_repos/repo_configuration.py | 4 + .../offline_store/test_s3_custom_endpoint.py | 2 +- .../online_store/test_universal_online.py | 6 +- .../registration/test_universal_types.py | 2 +- 24 files changed, 1024 insertions(+), 730 deletions(-) create mode 100644 go/internal/feast/transformation.go delete mode 100644 sdk/python/feast/go_server.py diff --git a/go.mod b/go.mod index 7b74ac264ce..028fa0466a5 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/feast-dev/feast go 1.17 require ( - github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 + github.com/apache/arrow/go/v7 v7.0.0 github.com/ghodss/yaml v1.0.0 github.com/go-python/gopy v0.4.0 github.com/go-redis/redis/v8 v8.11.4 @@ -19,20 +19,23 @@ require ( ) require ( + github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect github.com/apache/thrift v0.16.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/goccy/go-json v0.7.10 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.0+incompatible // indirect github.com/google/go-cmp v0.5.7 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/pierrec/lz4/v4 v4.1.9 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/exp v0.0.0-20211028214138-64b4c8e87d1a // indirect + golang.org/x/mod v0.5.1-0.20210830214625-1b1db11ec8f4 // indirect golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect golang.org/x/text v0.3.7 // indirect + golang.org/x/tools v0.1.4 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/genproto v0.0.0-20220118154757-00ab72f36ad5 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index bccf79b3823..30963f0926d 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,7 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= @@ -43,14 +44,17 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 h1:byKBBF2CKWBjjA4J1ZL2JXttJULvWSl50LegTyRZ728= github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0= -github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= -github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= +github.com/apache/arrow/go/v7 v7.0.0 h1:3d+Qgwo/r75bNhC6N0MMzZXQhsOyB0TSn6wljfuBNWo= +github.com/apache/arrow/go/v7 v7.0.0/go.mod h1:vG2y+fH8mEUcX29tM6hOULGE06/XqEI8sG5fANM6T5w= github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -114,6 +118,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= @@ -160,6 +165,8 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/goccy/go-json v0.7.10 h1:ulhbuNe1JqE68nMRXXTJRrUu0uhouf0VevLINxQq4Ec= +github.com/goccy/go-json v0.7.10/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -278,8 +285,10 @@ github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/asmfmt v1.3.1/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A= github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -302,6 +311,8 @@ github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= @@ -435,6 +446,7 @@ github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c h1:UDt github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c/go.mod h1:qLb2Itmdcp7KPa5KZKvhE9U1q5bYSOmgeOckF/H2rQA= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zeebo/xxh3 v0.13.0/go.mod h1:AQY73TOrhF3jNsdiM9zZOb8MThrYbZONHj7ryDBaLpg= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= @@ -519,6 +531,7 @@ golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hM golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.5.1-0.20210830214625-1b1db11ec8f4 h1:7Qds88gNaRx0Dz/1wOwXlR7asekh1B1u26wEwN6FcEI= golang.org/x/mod v0.5.1-0.20210830214625-1b1db11ec8f4/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -599,6 +612,7 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200727154430-2d971f7391a4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -608,7 +622,6 @@ golang.org/x/sys v0.0.0-20210304124612-50617c2ba197/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -666,6 +679,7 @@ golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -734,8 +748,8 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= google.golang.org/grpc v1.44.0 h1:weqSxi/TMs1SqFRMHCtBgXRs8k3X39QIDEZ0pRcttUg= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/go/cmd/server/main.go b/go/cmd/server/main.go index a8114fee327..ae0f4dae2e9 100644 --- a/go/cmd/server/main.go +++ b/go/cmd/server/main.go @@ -17,6 +17,10 @@ const ( feastServerVersion = "0.18.0" ) +func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { + return 0 +} + // TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus func main() { repoPath := os.Getenv(flagFeastRepoPath) @@ -41,7 +45,7 @@ func main() { } log.Println("Initializing feature store...") - fs, err := feast.NewFeatureStore(repoConfig) + fs, err := feast.NewFeatureStore(repoConfig, dummyTransformCallback) if err != nil { log.Fatalln(err) } diff --git a/go/cmd/server/server.go b/go/cmd/server/server.go index 0c90f759020..0d8c4362a2b 100644 --- a/go/cmd/server/server.go +++ b/go/cmd/server/server.go @@ -5,9 +5,7 @@ import ( "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/protos/feast/serving" - prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" - "github.com/golang/protobuf/ptypes/timestamp" ) type servingServiceServer struct { @@ -39,6 +37,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s featuresOrService.FeaturesRefs, featuresOrService.FeatureService, request.GetEntities(), + request.GetRequestContext(), request.GetFullFeatureNames()) if err != nil { return nil, err @@ -50,22 +49,6 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s FeatureNames: &serving.FeatureList{Val: make([]string, 0)}, }, } - for name, values := range request.Entities { - resp.Metadata.FeatureNames.Val = append(resp.Metadata.FeatureNames.Val, name) - - vec := &serving.GetOnlineFeaturesResponse_FeatureVector{ - Values: make([]*prototypes.Value, 0), - Statuses: make([]serving.FieldStatus, 0), - EventTimestamps: make([]*timestamp.Timestamp, 0), - } - resp.Results = append(resp.Results, vec) - - for _, v := range values.Val { - vec.Values = append(vec.Values, v) - vec.Statuses = append(vec.Statuses, serving.FieldStatus_PRESENT) - vec.EventTimestamps = append(vec.EventTimestamps, ×tamp.Timestamp{}) - } - } for _, vector := range featureVectors { resp.Metadata.FeatureNames.Val = append(resp.Metadata.FeatureNames.Val, vector.Name) diff --git a/go/cmd/server/server_test.go b/go/cmd/server/server_test.go index 9ee8c87e2fe..7e19600b38c 100644 --- a/go/cmd/server/server_test.go +++ b/go/cmd/server/server_test.go @@ -41,7 +41,7 @@ func getClient(ctx context.Context, basePath string) (serving.ServingServiceClie if err != nil { panic(err) } - fs, err := feast.NewFeatureStore(config) + fs, err := feast.NewFeatureStore(config, dummyTransformCallback) if err != nil { panic(err) } diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 4602538f42f..6af818a3a14 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -2,9 +2,11 @@ package embedded import ( "context" - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/cdata" + "fmt" + "github.com/apache/arrow/go/v7/arrow" + "github.com/apache/arrow/go/v7/arrow/array" + "github.com/apache/arrow/go/v7/arrow/cdata" + "github.com/apache/arrow/go/v7/arrow/memory" "github.com/feast-dev/feast/go/internal/feast" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" @@ -25,13 +27,13 @@ type DataTable struct { SchemaPtr uintptr } -func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig) *OnlineFeatureService { +func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback feast.TransformationCallback) *OnlineFeatureService { repoConfig, err := feast.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig) if err != nil { log.Fatalln(err) } - fs, err := feast.NewFeatureStore(repoConfig) + fs, err := feast.NewFeatureStore(repoConfig, transformationCallback) if err != nil { log.Fatalln(err) } @@ -39,12 +41,74 @@ func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig) *OnlineFeatureSer return &OnlineFeatureService{fs: fs} } +func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[string]int32, error) { + viewNames := make(map[string]interface{}) + for _, featureRef := range featureRefs { + viewName, _, err := feast.ParseFeatureReference(featureRef) + if err != nil { + return nil, err + } + viewNames[viewName] = nil + } + + entities, _ := s.fs.ListEntities(true) + entitiesByName := make(map[string]*feast.Entity) + for _, entity := range entities { + entitiesByName[entity.Name] = entity + } + + joinKeyTypes := make(map[string]int32) + + for viewName, _ := range viewNames { + view, err := s.fs.GetFeatureView(viewName, true) + if err != nil { + // skip on demand feature views + continue + } + for entityName, _ := range view.Entities { + entity := entitiesByName[entityName] + joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) + } + } + + return joinKeyTypes, nil +} + +func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceName string) (map[string]int32, error) { + featureService, err := s.fs.GetFeatureService(featureServiceName) + if err != nil { + return nil, err + } + + joinKeyTypes := make(map[string]int32) + + entities, _ := s.fs.ListEntities(true) + entitiesByName := make(map[string]*feast.Entity) + for _, entity := range entities { + entitiesByName[entity.Name] = entity + } + + for _, projection := range featureService.Projections { + view, err := s.fs.GetFeatureView(projection.Name, true) + if err != nil { + // skip on demand feature views + continue + } + for entityName, _ := range view.Entities { + entity := entitiesByName[entityName] + joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) + } + } + + return joinKeyTypes, nil +} + func (s *OnlineFeatureService) GetOnlineFeatures( featureRefs []string, featureServiceName string, entities DataTable, + requestData DataTable, fullFeatureNames bool, - projectName string, output DataTable) error { entitiesRecord, err := readArrowRecord(entities) @@ -59,9 +123,19 @@ func (s *OnlineFeatureService) GetOnlineFeatures( return err } + requestDataRecords, err := readArrowRecord(requestData) + if err != nil { + return err + } + + requestDataProto, err := recordToProto(requestDataRecords) + if err != nil { + return err + } + var featureService *feast.FeatureService if featureServiceName != "" { - featureService, err = s.fs.GetFeatureService(featureServiceName, projectName) + featureService, err = s.fs.GetFeatureService(featureServiceName) } resp, err := s.fs.GetOnlineFeatures( @@ -69,18 +143,45 @@ func (s *OnlineFeatureService) GetOnlineFeatures( featureRefs, featureService, entitiesProto, + requestDataProto, fullFeatureNames) if err != nil { return err } - outputFields := entitiesRecord.Schema().Fields() - outputColumns := entitiesRecord.Columns() + outputFields := make([]arrow.Field, 0) + outputColumns := make([]arrow.Array, 0) + pool := memory.NewGoAllocator() for _, featureVector := range resp { outputFields = append(outputFields, - arrow.Field{Name: featureVector.Name, Type: featureVector.Values.DataType()}) + arrow.Field{ + Name: featureVector.Name, + Type: featureVector.Values.DataType()}) + outputFields = append(outputFields, + arrow.Field{ + Name: fmt.Sprintf("%s__status", featureVector.Name), + Type: arrow.PrimitiveTypes.Int32}) + outputFields = append(outputFields, + arrow.Field{ + Name: fmt.Sprintf("%s__timestamp", featureVector.Name), + Type: arrow.PrimitiveTypes.Int64}) + outputColumns = append(outputColumns, featureVector.Values) + + statusColumnBuilder := array.NewInt32Builder(pool) + for _, status := range featureVector.Statuses { + statusColumnBuilder.Append(int32(status)) + } + statusColumn := statusColumnBuilder.NewArray() + outputColumns = append(outputColumns, statusColumn) + + tsColumnBuilder := array.NewInt64Builder(pool) + for _, ts := range featureVector.Timestamps { + tsColumnBuilder.Append(ts.GetSeconds()) + } + tsColumn := tsColumnBuilder.NewArray() + outputColumns = append(outputColumns, tsColumn) } result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows)) diff --git a/go/internal/feast/basefeatureview.go b/go/internal/feast/basefeatureview.go index 15c543b904c..19a40af1855 100644 --- a/go/internal/feast/basefeatureview.go +++ b/go/internal/feast/basefeatureview.go @@ -6,37 +6,53 @@ import ( ) type BaseFeatureView struct { - name string - features []*Feature - projection *FeatureViewProjection + Name string + Features []*Feature + Projection *FeatureViewProjection } func NewBaseFeatureView(name string, featureProtos []*core.FeatureSpecV2) *BaseFeatureView { - base := &BaseFeatureView{name: name} + base := &BaseFeatureView{Name: name} features := make([]*Feature, len(featureProtos)) for index, featureSpecV2 := range featureProtos { features[index] = NewFeatureFromProto(featureSpecV2) } - base.features = features - base.projection = NewFeatureViewProjectionFromDefinition(base) + base.Features = features + base.Projection = NewFeatureViewProjectionFromDefinition(base) return base } func (fv *BaseFeatureView) withProjection(projection *FeatureViewProjection) (*BaseFeatureView, error) { - if projection.name != fv.name { + if projection.Name != fv.Name { return nil, fmt.Errorf("the projection for the %s FeatureView cannot be applied because it differs "+ "in name; the projection is named %s and the name indicates which "+ - "FeatureView the projection is for", fv.name, projection.name) + "FeatureView the projection is for", fv.Name, projection.Name) } features := make(map[string]bool) - for _, feature := range fv.features { + for _, feature := range fv.Features { features[feature.name] = true } - for _, feature := range projection.features { + for _, feature := range projection.Features { if _, ok := features[feature.name]; !ok { return nil, fmt.Errorf("the projection for %s cannot be applied because it contains %s which the "+ - "FeatureView doesn't have", projection.name, feature.name) + "FeatureView doesn't have", projection.Name, feature.name) } } - return &BaseFeatureView{name: fv.name, features: fv.features, projection: projection}, nil + return &BaseFeatureView{Name: fv.Name, Features: fv.Features, Projection: projection}, nil +} + +func (fv *BaseFeatureView) projectWithFeatures(featureNames []string) *FeatureViewProjection { + features := make([]*Feature, 0) + for _, feature := range fv.Features { + for _, allowedFeatureName := range featureNames { + if feature.name == allowedFeatureName { + features = append(features, feature) + } + } + } + + return &FeatureViewProjection{ + Name: fv.Name, + Features: features, + } } diff --git a/go/internal/feast/entity.go b/go/internal/feast/entity.go index 39908f1ecd8..747b9676297 100644 --- a/go/internal/feast/entity.go +++ b/go/internal/feast/entity.go @@ -6,14 +6,14 @@ import ( ) type Entity struct { - name string - valueType types.ValueType_Enum - joinKey string + Name string + ValueType types.ValueType_Enum + JoinKey string } func NewEntityFromProto(proto *core.Entity) *Entity { - return &Entity{name: proto.Spec.Name, - valueType: proto.Spec.ValueType, - joinKey: proto.Spec.JoinKey, + return &Entity{Name: proto.Spec.Name, + ValueType: proto.Spec.ValueType, + JoinKey: proto.Spec.JoinKey, } } diff --git a/go/internal/feast/featureservice.go b/go/internal/feast/featureservice.go index 351a55dcad3..e39b97dec49 100644 --- a/go/internal/feast/featureservice.go +++ b/go/internal/feast/featureservice.go @@ -10,7 +10,7 @@ type FeatureService struct { project string createdTimestamp *timestamppb.Timestamp lastUpdatedTimestamp *timestamppb.Timestamp - projections []*FeatureViewProjection + Projections []*FeatureViewProjection } func NewFeatureServiceFromProto(proto *core.FeatureService) *FeatureService { @@ -22,6 +22,6 @@ func NewFeatureServiceFromProto(proto *core.FeatureService) *FeatureService { project: proto.Spec.Project, createdTimestamp: proto.Meta.CreatedTimestamp, lastUpdatedTimestamp: proto.Meta.LastUpdatedTimestamp, - projections: projections, + Projections: projections, } } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 6b349f7092d..6f033b6ab9d 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -8,22 +8,21 @@ import ( "sort" "strings" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/v7/arrow" + "github.com/apache/arrow/go/v7/arrow/memory" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" "github.com/golang/protobuf/proto" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) type FeatureStore struct { - config *RepoConfig - registry *Registry - onlineStore OnlineStore + config *RepoConfig + registry *Registry + onlineStore OnlineStore + tranformationCallback TransformationCallback } // A Features struct specifies a list of features to be retrieved from the online store. These features @@ -43,7 +42,7 @@ type Features struct { */ type FeatureVector struct { Name string - Values array.Interface + Values arrow.Array Statuses []serving.FieldStatus Timestamps []*timestamppb.Timestamp } @@ -74,7 +73,7 @@ type GroupedFeaturesPerEntitySet struct { // NewFeatureStore constructs a feature store fat client using the // repo config (contents of feature_store.yaml converted to JSON map). -func NewFeatureStore(config *RepoConfig) (*FeatureStore, error) { +func NewFeatureStore(config *RepoConfig, callback TransformationCallback) (*FeatureStore, error) { onlineStore, err := NewOnlineStore(config) if err != nil { return nil, err @@ -87,9 +86,10 @@ func NewFeatureStore(config *RepoConfig) (*FeatureStore, error) { registry.initializeRegistry() return &FeatureStore{ - config: config, - registry: registry, - onlineStore: onlineStore, + config: config, + registry: registry, + onlineStore: onlineStore, + tranformationCallback: callback, }, nil } @@ -99,71 +99,49 @@ func (fs *FeatureStore) GetOnlineFeatures( ctx context.Context, featureRefs []string, featureService *FeatureService, - entityProtos map[string]*prototypes.RepeatedValue, + joinKeyToEntityValues map[string]*prototypes.RepeatedValue, + requestData map[string]*prototypes.RepeatedValue, fullFeatureNames bool) ([]*FeatureVector, error) { - - numRows, err := fs.validateEntityValues(entityProtos) + fvs, odFvs, err := fs.listAllViews() if err != nil { return nil, err } - var fvs map[string]*FeatureView var requestedFeatureViews []*featureViewAndRefs var requestedOnDemandFeatureViews []*OnDemandFeatureView if featureService != nil { - fvs, requestedFeatureViews, requestedOnDemandFeatureViews, err = - fs.getFeatureViewsToUseByService(featureService, false) + requestedFeatureViews, requestedOnDemandFeatureViews, err = + getFeatureViewsToUseByService(featureService, fvs, odFvs) } else { - fvs, requestedFeatureViews, requestedOnDemandFeatureViews, err = - fs.getFeatureViewsToUseByFeatureRefs(featureRefs, false) + requestedFeatureViews, requestedOnDemandFeatureViews, err = + getFeatureViewsToUseByFeatureRefs(featureRefs, fvs, odFvs) } - - err = validateFeatureRefs(requestedFeatureViews, fullFeatureNames) if err != nil { return nil, err } - if len(requestedOnDemandFeatureViews) > 0 { - return nil, status.Errorf(codes.InvalidArgument, "on demand feature views are currently not supported") - } - entityNameToJoinKeyMap, expectedJoinKeysSet, err := fs.getEntityMaps(requestedFeatureViews) if err != nil { return nil, err } - // TODO (Ly): This should return empty now - // Expect no ODFV or Request FV passed in GetOnlineFearuresRequest - neededRequestData, err := fs.getNeededRequestData(requestedOnDemandFeatureViews) + + err = validateFeatureRefs(requestedFeatureViews, fullFeatureNames) if err != nil { return nil, err } - // TODO: Add a map that contains provided entities + ODFV schema entities + request schema - // to use for ODFV - // Remove comments for requestDataFeatures when ODFV is supported - // requestDataFeatures := make(map[string]*prototypes.RepeatedValue) // TODO (Ly): Should be empty now until ODFV and Request FV are supported - mappedEntityProtos := make(map[string]*prototypes.RepeatedValue) - for joinKeyOrFeature, vals := range entityProtos { - if _, ok := neededRequestData[joinKeyOrFeature]; ok { - // requestDataFeatures[joinKeyOrFeature] = vals - } else { - if _, ok := expectedJoinKeysSet[joinKeyOrFeature]; !ok { - return nil, fmt.Errorf("JoinKey is not expected in this request: %s\n%v", joinKeyOrFeature, expectedJoinKeysSet) - } else { - mappedEntityProtos[joinKeyOrFeature] = vals - } - } + numRows, err := validateEntityValues(joinKeyToEntityValues, requestData, expectedJoinKeysSet) + if err != nil { + return nil, err } - // TODO (Ly): Skip this validation since we're not supporting ODFV yet - - // err = fs.ensureRequestedDataExist(neededRequestData, neededRequestODFVFeatures, requestDataFeatures) - // if err != nil { - // return nil, err - // } - - // Add provided entities + ODFV schema entities to response + err = ensureRequestedDataExist(requestedOnDemandFeatureViews, requestData) + if err != nil { + return nil, err + } + result := make([]*FeatureVector, 0) + arrowMemory := memory.NewGoAllocator() featureViews := make([]*FeatureView, len(requestedFeatureViews)) index := 0 for _, featuresAndView := range requestedFeatureViews { @@ -172,9 +150,8 @@ func (fs *FeatureStore) GetOnlineFeatures( } entitylessCase := false - for _, featureView := range featureViews { - if _, ok := featureView.entities[DUMMY_ENTITY_NAME]; ok { + if _, ok := featureView.Entities[DUMMY_ENTITY_NAME]; ok { entitylessCase = true break } @@ -185,15 +162,13 @@ func (fs *FeatureStore) GetOnlineFeatures( for index := 0; index < numRows; index++ { dummyEntityColumn.Val[index] = &DUMMY_ENTITY } - mappedEntityProtos[DUMMY_ENTITY_ID] = dummyEntityColumn + joinKeyToEntityValues[DUMMY_ENTITY_ID] = dummyEntityColumn } - groupedRefs, err := groupFeatureRefs(requestedFeatureViews, mappedEntityProtos, entityNameToJoinKeyMap, fullFeatureNames) + groupedRefs, err := groupFeatureRefs(requestedFeatureViews, joinKeyToEntityValues, entityNameToJoinKeyMap, fullFeatureNames) if err != nil { return nil, err } - result := make([]*FeatureVector, 0) - arrowMemory := memory.NewGoAllocator() for _, groupRef := range groupedRefs { featureData, err := fs.readFromOnlineStore(ctx, groupRef.entityKeys, groupRef.featureViewNames, groupRef.featureNames) @@ -201,9 +176,10 @@ func (fs *FeatureStore) GetOnlineFeatures( return nil, err } - vectors, err := fs.transposeFeatureRowsIntoColumns(featureData, + vectors, err := fs.transposeFeatureRowsIntoColumns( + featureData, groupRef, - fvs, + requestedFeatureViews, arrowMemory, numRows, ) @@ -212,7 +188,30 @@ func (fs *FeatureStore) GetOnlineFeatures( } result = append(result, vectors...) } - // TODO (Ly): ODFV, skip augmentResponseWithOnDemandTransforms + + onDemandFeatures, err := augmentResponseWithOnDemandTransforms( + requestedOnDemandFeatureViews, + requestData, + joinKeyToEntityValues, + result, + fs.tranformationCallback, + arrowMemory, + numRows, + fullFeatureNames, + ) + if err != nil { + return nil, err + } + + result = append(result, onDemandFeatures...) + + result, err = keepOnlyRequestedFeatures(result, featureRefs, featureService, fullFeatureNames) + if err != nil { + return nil, err + } + + entityColumns, err := entitiesToFeatureVectors(joinKeyToEntityValues, arrowMemory, numRows) + result = append(entityColumns, result...) return result, nil } @@ -236,171 +235,231 @@ func (fs *FeatureStore) ParseFeatures(kind interface{}) (*Features, error) { return nil, errors.New("cannot parse kind from GetOnlineFeaturesRequest") } -// getFeatureRefs extracts a list of feature references from a Features struct. -func (fs *FeatureStore) getFeatureRefs(features *Features) ([]string, error) { - if features.FeatureService != nil { - var featureViewName string - featureRefs := make([]string, 0) - for _, featureProjection := range features.FeatureService.projections { - featureViewName = featureProjection.nameToUse() - for _, feature := range featureProjection.features { - featureRefs = append(featureRefs, fmt.Sprintf("%s:%s", featureViewName, feature.name)) - } - } - return featureRefs, nil - } else { - return features.FeaturesRefs, nil - } -} - -func (fs *FeatureStore) GetFeatureService(name string, project string) (*FeatureService, error) { - return fs.registry.getFeatureService(project, name) +func (fs *FeatureStore) GetFeatureService(name string) (*FeatureService, error) { + return fs.registry.getFeatureService(fs.config.Project, name) } /* - Return a list of copies of FeatureViewProjection - copied from FeatureView, OnDemandFeatureView existed in the registry - - TODO (Ly): Since the implementation of registry has changed, a better approach here is just - retrieving featureViews asked in the passed in list of feature references instead of - retrieving all feature views. Similar argument to FeatureService applies. + Return + (1) requested feature views and features grouped per view + (2) requested on demand feature views + existed in the registry */ -func (fs *FeatureStore) getFeatureViewsToUseByService(featureService *FeatureService, hideDummyEntity bool) (map[string]*FeatureView, []*featureViewAndRefs, []*OnDemandFeatureView, error) { - fvs := make(map[string]*FeatureView) - odFvs := make(map[string]*OnDemandFeatureView) - - featureViews, err := fs.listFeatureViews(hideDummyEntity) - if err != nil { - return nil, nil, nil, err - } - for _, featureView := range featureViews { - fvs[featureView.base.name] = featureView - } - - onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project) - if err != nil { - return nil, nil, nil, err - } - for _, onDemandFeatureView := range onDemandFeatureViews { - odFvs[onDemandFeatureView.base.name] = onDemandFeatureView - } +func getFeatureViewsToUseByService( + featureService *FeatureService, + featureViews map[string]*FeatureView, + onDemandFeatureViews map[string]*OnDemandFeatureView) ([]*featureViewAndRefs, []*OnDemandFeatureView, error) { - fvsToUse := make([]*featureViewAndRefs, 0) + viewNameToViewAndRefs := make(map[string]*featureViewAndRefs) odFvsToUse := make([]*OnDemandFeatureView, 0) - for _, featureProjection := range featureService.projections { + for _, featureProjection := range featureService.Projections { // Create copies of FeatureView that may contains the same *FeatureView but // each differentiated by a *FeatureViewProjection - featureViewName := featureProjection.name - if fv, ok := fvs[featureViewName]; ok { - base, err := fv.base.withProjection(featureProjection) + featureViewName := featureProjection.Name + if fv, ok := featureViews[featureViewName]; ok { + base, err := fv.Base.withProjection(featureProjection) if err != nil { - return nil, nil, nil, err + return nil, nil, err + } + if _, ok := viewNameToViewAndRefs[featureProjection.NameToUse()]; !ok { + viewNameToViewAndRefs[featureProjection.NameToUse()] = &featureViewAndRefs{ + view: fv.NewFeatureViewFromBase(base), + featureRefs: []string{}, + } } - newFv := fv.NewFeatureViewFromBase(base) - features := make([]string, len(newFv.base.features)) - for index, feature := range newFv.base.features { - features[index] = feature.name + + for _, feature := range featureProjection.Features { + viewNameToViewAndRefs[featureProjection.NameToUse()].featureRefs = + addStringIfNotContains(viewNameToViewAndRefs[featureProjection.NameToUse()].featureRefs, + feature.name) } - fvsToUse = append(fvsToUse, &featureViewAndRefs{ - view: newFv, - featureRefs: features, - }) - } else if odFv, ok := odFvs[featureViewName]; ok { - base, err := odFv.base.withProjection(featureProjection) + + } else if odFv, ok := onDemandFeatureViews[featureViewName]; ok { + projectedOdFv, err := odFv.NewWithProjection(featureProjection) + if err != nil { + return nil, nil, err + } + odFvsToUse = append(odFvsToUse, projectedOdFv) + err = extractOdFvDependencies( + projectedOdFv, + featureViews, + viewNameToViewAndRefs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - odFvsToUse = append(odFvsToUse, odFv.NewOnDemandFeatureViewFromBase(base)) } else { - return nil, nil, nil, fmt.Errorf("the provided feature service %s contains a reference to a feature view"+ + return nil, nil, fmt.Errorf("the provided feature service %s contains a reference to a feature view"+ "%s which doesn't exist, please make sure that you have created the feature view"+ "%s and that you have registered it by running \"apply\"", featureService.name, featureViewName, featureViewName) } } - return fvs, fvsToUse, odFvsToUse, nil + + fvsToUse := make([]*featureViewAndRefs, 0) + for _, viewAndRef := range viewNameToViewAndRefs { + fvsToUse = append(fvsToUse, viewAndRef) + } + + return fvsToUse, odFvsToUse, nil } /* - Return all FeatureView, OnDemandFeatureView from the registry + Return + (1) requested feature views and features grouped per view + (2) requested on demand feature views + existed in the registry */ -func (fs *FeatureStore) getFeatureViewsToUseByFeatureRefs(features []string, hideDummyEntity bool) (map[string]*FeatureView, []*featureViewAndRefs, []*OnDemandFeatureView, error) { +func getFeatureViewsToUseByFeatureRefs( + features []string, + featureViews map[string]*FeatureView, + onDemandFeatureViews map[string]*OnDemandFeatureView) ([]*featureViewAndRefs, []*OnDemandFeatureView, error) { + viewNameToViewAndRefs := make(map[string]*featureViewAndRefs) + odFvToFeatures := make(map[string][]string) + + for _, featureRef := range features { + featureViewName, featureName, err := ParseFeatureReference(featureRef) + if err != nil { + return nil, nil, err + } + if fv, ok := featureViews[featureViewName]; ok { + if viewAndRef, ok := viewNameToViewAndRefs[fv.Base.Name]; ok { + viewAndRef.featureRefs = addStringIfNotContains(viewAndRef.featureRefs, featureName) + } else { + viewNameToViewAndRefs[fv.Base.Name] = &featureViewAndRefs{ + view: fv, + featureRefs: []string{featureName}, + } + } + } else if odfv, ok := onDemandFeatureViews[featureViewName]; ok { + if _, ok := odFvToFeatures[odfv.base.Name]; !ok { + odFvToFeatures[odfv.base.Name] = []string{featureName} + } else { + odFvToFeatures[odfv.base.Name] = append( + odFvToFeatures[odfv.base.Name], featureName) + } + } else { + return nil, nil, fmt.Errorf("feature view %s doesn't exist, please make sure that you have created the"+ + " feature view %s and that you have registered it by running \"apply\"", featureViewName, featureViewName) + } + } + + odFvsToUse := make([]*OnDemandFeatureView, 0) + + for odFvName, featureNames := range odFvToFeatures { + projectedOdFv, err := onDemandFeatureViews[odFvName].projectWithFeatures(featureNames) + if err != nil { + return nil, nil, err + } + + err = extractOdFvDependencies( + projectedOdFv, + featureViews, + viewNameToViewAndRefs) + if err != nil { + return nil, nil, err + } + odFvsToUse = append(odFvsToUse, projectedOdFv) + } + + fvsToUse := make([]*featureViewAndRefs, 0) + for _, viewAndRefs := range viewNameToViewAndRefs { + fvsToUse = append(fvsToUse, viewAndRefs) + } + + return fvsToUse, odFvsToUse, nil +} + +func extractOdFvDependencies( + odFv *OnDemandFeatureView, + sourceFvs map[string]*FeatureView, + requestedFeatures map[string]*featureViewAndRefs, +) error { + + for _, sourceFvProjection := range odFv.sourceFeatureViewProjections { + fv := sourceFvs[sourceFvProjection.Name] + base, err := fv.Base.withProjection(sourceFvProjection) + if err != nil { + return err + } + newFv := fv.NewFeatureViewFromBase(base) + + if _, ok := requestedFeatures[sourceFvProjection.NameToUse()]; !ok { + requestedFeatures[sourceFvProjection.NameToUse()] = &featureViewAndRefs{ + view: newFv, + featureRefs: []string{}, + } + } + + for _, feature := range sourceFvProjection.Features { + requestedFeatures[sourceFvProjection.NameToUse()].featureRefs = addStringIfNotContains( + requestedFeatures[sourceFvProjection.NameToUse()].featureRefs, feature.name) + } + } + + return nil +} + +func (fs *FeatureStore) listAllViews() (map[string]*FeatureView, map[string]*OnDemandFeatureView, error) { fvs := make(map[string]*FeatureView) odFvs := make(map[string]*OnDemandFeatureView) - featureViews, err := fs.listFeatureViews(hideDummyEntity) + + featureViews, err := fs.ListFeatureViews() if err != nil { - return nil, nil, nil, err + return nil, nil, err } for _, featureView := range featureViews { - fvs[featureView.base.name] = featureView + fvs[featureView.Base.Name] = featureView } onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project) if err != nil { - return nil, nil, nil, err + return nil, nil, err } for _, onDemandFeatureView := range onDemandFeatureViews { - odFvs[onDemandFeatureView.base.name] = onDemandFeatureView + odFvs[onDemandFeatureView.base.Name] = onDemandFeatureView } + return fvs, odFvs, nil +} - fvsToUse := make([]*featureViewAndRefs, 0) - odFvsToUse := make([]*OnDemandFeatureView, 0) - - for _, featureRef := range features { - featureViewName, featureName, err := parseFeatureReference(featureRef) - if err != nil { - return nil, nil, nil, err - } - if fv, ok := fvs[featureViewName]; ok { - found := false - for _, group := range fvsToUse { - if group.view == fv { - group.featureRefs = append(group.featureRefs, featureName) - found = true - } - } - if !found { - fvsToUse = append(fvsToUse, &featureViewAndRefs{ - view: fv, - featureRefs: []string{featureName}, - }) - } - } else if odFv, ok := odFvs[featureViewName]; ok { - odFvsToUse = append(odFvsToUse, odFv) - } else { - return nil, nil, nil, fmt.Errorf("feature view %s doesn't exist, please make sure that you have created the"+ - " feature view %s and that you have registered it by running \"apply\"", featureViewName, featureViewName) +func addStringIfNotContains(slice []string, element string) []string { + found := false + for _, item := range slice { + if element == item { + found = true } } - return fvs, fvsToUse, odFvsToUse, nil + if !found { + slice = append(slice, element) + } + return slice } func (fs *FeatureStore) getEntityMaps(requestedFeatureViews []*featureViewAndRefs) (map[string]string, map[string]interface{}, error) { entityNameToJoinKeyMap := make(map[string]string) expectedJoinKeysSet := make(map[string]interface{}) - entities, err := fs.listEntities(false) + entities, err := fs.ListEntities(false) if err != nil { return nil, nil, err } entitiesByName := make(map[string]*Entity) for _, entity := range entities { - entitiesByName[entity.name] = entity + entitiesByName[entity.Name] = entity } for _, featuresAndView := range requestedFeatureViews { featureView := featuresAndView.view var joinKeyToAliasMap map[string]string - if featureView.base.projection != nil && featureView.base.projection.joinKeyMap != nil { - joinKeyToAliasMap = featureView.base.projection.joinKeyMap + if featureView.Base.Projection != nil && featureView.Base.Projection.JoinKeyMap != nil { + joinKeyToAliasMap = featureView.Base.Projection.JoinKeyMap } else { joinKeyToAliasMap = map[string]string{} } - for entityName := range featureView.entities { - joinKey := entitiesByName[entityName].joinKey + for entityName := range featureView.Entities { + joinKey := entitiesByName[entityName].JoinKey entityNameToJoinKeyMap[entityName] = joinKey if alias, ok := joinKeyToAliasMap[joinKey]; ok { @@ -413,16 +472,28 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews []*featureViewAndRef return entityNameToJoinKeyMap, expectedJoinKeysSet, nil } -func (fs *FeatureStore) validateEntityValues(joinKeyValues map[string]*prototypes.RepeatedValue) (int, error) { - setOfRowLengths := make(map[int]bool) - var numRows int - for _, col := range joinKeyValues { - setOfRowLengths[len(col.Val)] = true - numRows = len(col.Val) - } - if len(setOfRowLengths) > 1 { - return 0, errors.New("valueError: All entity rows must have the same columns") +func validateEntityValues(joinKeyValues map[string]*prototypes.RepeatedValue, + requestData map[string]*prototypes.RepeatedValue, + expectedJoinKeysSet map[string]interface{}) (int, error) { + numRows := -1 + + for joinKey, values := range joinKeyValues { + if _, ok := expectedJoinKeysSet[joinKey]; !ok { + requestData[joinKey] = values + delete(joinKeyValues, joinKey) + // ToDo: when request data will be passed correctly (not as part of entity rows) + // ToDo: throw this error instead + // return 0, fmt.Errorf("JoinKey is not expected in this request: %s\n%v", JoinKey, expectedJoinKeysSet) + } else { + if numRows < 0 { + numRows = len(values.Val) + } else if len(values.Val) != numRows { + return -1, errors.New("valueError: All entity rows must have the same columns") + } + + } } + return numRows, nil } @@ -431,9 +502,9 @@ func validateFeatureRefs(requestedFeatures []*featureViewAndRefs, fullFeatureNam featureRefs := make([]string, 0) for _, viewAndFeatures := range requestedFeatures { for _, feature := range viewAndFeatures.featureRefs { - projectedViewName := viewAndFeatures.view.base.name - if viewAndFeatures.view.base.projection != nil { - projectedViewName = viewAndFeatures.view.base.projection.nameToUse() + projectedViewName := viewAndFeatures.view.Base.Name + if viewAndFeatures.view.Base.Projection != nil { + projectedViewName = viewAndFeatures.view.Base.Projection.NameToUse() } featureRefs = append(featureRefs, @@ -445,7 +516,7 @@ func validateFeatureRefs(requestedFeatures []*featureViewAndRefs, fullFeatureNam if fullFeatureNames { featureRefCounter[featureRef]++ } else { - _, featureName, _ := parseFeatureReference(featureRef) + _, featureName, _ := ParseFeatureReference(featureRef) featureRefCounter[featureName]++ } @@ -462,7 +533,7 @@ func validateFeatureRefs(requestedFeatures []*featureViewAndRefs, fullFeatureNam collidedFeatureRefs = append(collidedFeatureRefs, collidedFeatureRef) } else { for _, featureRef := range featureRefs { - _, featureName, _ := parseFeatureReference(featureRef) + _, featureName, _ := ParseFeatureReference(featureRef) if featureName == collidedFeatureRef { collidedFeatureRefs = append(collidedFeatureRefs, featureRef) } @@ -475,42 +546,6 @@ func validateFeatureRefs(requestedFeatures []*featureViewAndRefs, fullFeatureNam return nil } -func (fs *FeatureStore) getNeededRequestData(requestedOnDemandFeatureViews []*OnDemandFeatureView) (map[string]struct{}, error) { - neededRequestData := make(map[string]struct{}) - - for _, onDemandFeatureView := range requestedOnDemandFeatureViews { - requestSchema := onDemandFeatureView.getRequestDataSchema() - for fieldName := range requestSchema { - neededRequestData[fieldName] = struct{}{} - } - } - - return neededRequestData, nil -} - -func (fs *FeatureStore) ensureRequestedDataExist(neededRequestData map[string]struct{}, - neededRequestFvFeatures map[string]struct{}, - requestDataFeatures map[string]*prototypes.RepeatedValue) error { - // TODO (Ly): Review: Skip checking even if composite set of - // neededRequestData neededRequestFvFeatures is different from - // request_data_features but same length? - if len(neededRequestData)+len(neededRequestFvFeatures) != len(requestDataFeatures) { - missingFeatures := make([]string, 0) - for feature := range neededRequestData { - if _, ok := requestDataFeatures[feature]; !ok { - missingFeatures = append(missingFeatures, feature) - } - } - for feature := range neededRequestFvFeatures { - if _, ok := requestDataFeatures[feature]; !ok { - missingFeatures = append(missingFeatures, feature) - } - } - return fmt.Errorf("requestDataNotFoundInEntityRowsException: %s", strings.Join(missingFeatures, ", ")) - } - return nil -} - func (fs *FeatureStore) checkOutsideTtl(featureTimestamp *timestamppb.Timestamp, currentTimestamp *timestamppb.Timestamp, ttl *durationpb.Duration) bool { return currentTimestamp.GetSeconds()-featureTimestamp.GetSeconds() > ttl.Seconds } @@ -529,11 +564,15 @@ func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*p func (fs *FeatureStore) transposeFeatureRowsIntoColumns(featureData2D [][]FeatureData, groupRef *GroupedFeaturesPerEntitySet, - fvs map[string]*FeatureView, + requestedFeatureViews []*featureViewAndRefs, arrowAllocator memory.Allocator, numRows int) ([]*FeatureVector, error) { numFeatures := len(groupRef.aliasedFeatureNames) + fvs := make(map[string]*FeatureView) + for _, viewAndRefs := range requestedFeatureViews { + fvs[viewAndRefs.view.Base.Name] = viewAndRefs.view + } var value *prototypes.Value var status serving.FieldStatus @@ -566,7 +605,7 @@ func (fs *FeatureStore) transposeFeatureRowsIntoColumns(featureData2D [][]Featur if _, ok := featureData.value.Val.(*prototypes.Value_NullVal); ok { value = nil status = serving.FieldStatus_NOT_FOUND - } else if fs.checkOutsideTtl(eventTimeStamp, timestamppb.Now(), fv.ttl) { + } else if fs.checkOutsideTtl(eventTimeStamp, timestamppb.Now(), fv.Ttl) { value = &prototypes.Value{Val: featureData.value.Val} status = serving.FieldStatus_OUTSIDE_MAX_AGE } else { @@ -591,38 +630,66 @@ func (fs *FeatureStore) transposeFeatureRowsIntoColumns(featureData2D [][]Featur } -// TODO (Ly): Complete this function + ODFV -func (fs *FeatureStore) augmentResponseWithOnDemandTransforms(onlineFeaturesResponse *serving.GetOnlineFeaturesResponse, - featureRefs []string, - requestedOnDemandFeatureViews []*OnDemandFeatureView, - fullFeatureNames bool, -) { - requestedOdfvMap := make(map[string]*OnDemandFeatureView) - requestedOdfvNames := make([]string, len(requestedOnDemandFeatureViews)) - for index, requestedOdfv := range requestedOnDemandFeatureViews { - requestedOdfvMap[requestedOdfv.base.name] = requestedOdfv - requestedOdfvNames[index] = requestedOdfv.base.name +func keepOnlyRequestedFeatures( + vectors []*FeatureVector, + requestedFeatureRefs []string, + featureService *FeatureService, + fullFeatureNames bool) ([]*FeatureVector, error) { + vectorsByName := make(map[string]*FeatureVector) + expectedVectors := make([]*FeatureVector, 0) + + for _, vector := range vectors { + vectorsByName[vector.Name] = vector } - odfvFeatureRefs := make(map[string][]string) - for _, featureRef := range featureRefs { - viewName, featureName, err := parseFeatureReference(featureRef) - if err != nil { + if featureService != nil { + for _, projection := range featureService.Projections { + for _, f := range projection.Features { + requestedFeatureRefs = append(requestedFeatureRefs, + fmt.Sprintf("%s:%s", projection.NameToUse(), f.name)) + } + } + } + for _, featureRef := range requestedFeatureRefs { + viewName, featureName, err := ParseFeatureReference(featureRef) + if err != nil { + return nil, err + } + qualifiedName := getQualifiedFeatureName(viewName, featureName, fullFeatureNames) + if _, ok := vectorsByName[qualifiedName]; !ok { + return nil, fmt.Errorf("requested feature %s can't be retrieved", featureRef) } + expectedVectors = append(expectedVectors, vectorsByName[qualifiedName]) + } - if _, ok := requestedOdfvMap[viewName]; ok { + return expectedVectors, nil +} - viewNameToUse := requestedOdfvMap[viewName].base.projection.nameToUse() - if fullFeatureNames { - featureName = fmt.Sprintf("%s__%s", viewNameToUse, featureName) - } - odfvFeatureRefs[viewName] = append(odfvFeatureRefs[viewName], featureName) +func entitiesToFeatureVectors(entityColumns map[string]*prototypes.RepeatedValue, arrowAllocator memory.Allocator, numRows int) ([]*FeatureVector, error) { + vectors := make([]*FeatureVector, 0) + presentVector := make([]serving.FieldStatus, numRows) + timestampVector := make([]*timestamppb.Timestamp, numRows) + for idx := 0; idx < numRows; idx++ { + presentVector[idx] = serving.FieldStatus_PRESENT + timestampVector[idx] = timestamppb.Now() + } + for entityName, values := range entityColumns { + arrowColumn, err := types.ProtoValuesToArrowArray(values.Val, arrowAllocator, numRows) + if err != nil { + return nil, err } + vectors = append(vectors, &FeatureVector{ + Name: entityName, + Values: arrowColumn, + Statuses: presentVector, + Timestamps: timestampVector, + }) } + return vectors, nil } -func (fs *FeatureStore) listFeatureViews(hideDummyEntity bool) ([]*FeatureView, error) { +func (fs *FeatureStore) ListFeatureViews() ([]*FeatureView, error) { featureViews, err := fs.registry.listFeatureViews(fs.config.Project) if err != nil { return featureViews, err @@ -630,7 +697,7 @@ func (fs *FeatureStore) listFeatureViews(hideDummyEntity bool) ([]*FeatureView, return featureViews, nil } -func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) { +func (fs *FeatureStore) ListEntities(hideDummyEntity bool) ([]*Entity, error) { allEntities, err := fs.registry.listEntities(fs.config.Project) if err != nil { @@ -638,7 +705,7 @@ func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) { } entities := make([]*Entity, 0) for _, entity := range allEntities { - if entity.name != DUMMY_ENTITY_NAME || !hideDummyEntity { + if entity.Name != DUMMY_ENTITY_NAME || !hideDummyEntity { entities = append(entities, entity) } } @@ -688,7 +755,7 @@ func groupFeatureRefs(requestedFeatureViews []*featureViewAndRefs, joinKeys := make([]string, 0) fv := featuresAndView.view featureNames := featuresAndView.featureRefs - for entity := range fv.entities { + for entity := range fv.Entities { joinKeys = append(joinKeys, entityNameToJoinKeyMap[entity]) } @@ -696,8 +763,8 @@ func groupFeatureRefs(requestedFeatureViews []*featureViewAndRefs, joinKeysValuesProjection := make(map[string]*prototypes.RepeatedValue) joinKeyToAliasMap := make(map[string]string) - if fv.base.projection != nil && fv.base.projection.joinKeyMap != nil { - joinKeyToAliasMap = fv.base.projection.joinKeyMap + if fv.Base.Projection != nil && fv.Base.Projection.JoinKeyMap != nil { + joinKeyToAliasMap = fv.Base.Projection.JoinKeyMap } for _, joinKey := range joinKeys { @@ -723,16 +790,16 @@ func groupFeatureRefs(requestedFeatureViews []*featureViewAndRefs, aliasedFeatureNames := make([]string, 0) featureViewNames := make([]string, 0) var viewNameToUse string - if fv.base.projection != nil { - viewNameToUse = fv.base.projection.nameToUse() + if fv.Base.Projection != nil { + viewNameToUse = fv.Base.Projection.NameToUse() } else { - viewNameToUse = fv.base.name + viewNameToUse = fv.Base.Name } for _, featureName := range featureNames { aliasedFeatureNames = append(aliasedFeatureNames, - getFeatureResponseMeta(viewNameToUse, featureName, fullFeatureNames)) - featureViewNames = append(featureViewNames, fv.base.name) + getQualifiedFeatureName(viewNameToUse, featureName, fullFeatureNames)) + featureViewNames = append(featureViewNames, fv.Base.Name) } if _, ok := groups[groupKey]; !ok { @@ -789,18 +856,18 @@ func getUniqueEntityRows(joinKeysProto []*prototypes.EntityKey) ([]*prototypes.E return uniqueEntityRows, mappingIndices, nil } -func (fs *FeatureStore) getFeatureView(project, featureViewName string, hideDummyEntity bool) (*FeatureView, error) { +func (fs *FeatureStore) GetFeatureView(featureViewName string, hideDummyEntity bool) (*FeatureView, error) { fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName) if err != nil { return nil, err } - if _, ok := fv.entities[DUMMY_ENTITY_NAME]; ok && hideDummyEntity { - fv.entities = make(map[string]struct{}) + if _, ok := fv.Entities[DUMMY_ENTITY_NAME]; ok && hideDummyEntity { + fv.Entities = make(map[string]struct{}) } return fv, nil } -func parseFeatureReference(featureRef string) (featureViewName, featureName string, e error) { +func ParseFeatureReference(featureRef string) (featureViewName, featureName string, e error) { parsedFeatureName := strings.Split(featureRef, ":") if len(parsedFeatureName) == 0 { @@ -814,9 +881,9 @@ func parseFeatureReference(featureRef string) (featureViewName, featureName stri return } -func getFeatureResponseMeta(featureNameAlias string, featureName string, fullFeatureNames bool) string { +func getQualifiedFeatureName(viewName string, featureName string, fullFeatureNames bool) string { if fullFeatureNames { - return fmt.Sprintf("%s__%s", featureNameAlias, featureName) + return fmt.Sprintf("%s__%s", viewName, featureName) } else { return featureName } diff --git a/go/internal/feast/featurestore_test.go b/go/internal/feast/featurestore_test.go index 3a25a56bcd1..7c393b3d40e 100644 --- a/go/internal/feast/featurestore_test.go +++ b/go/internal/feast/featurestore_test.go @@ -22,6 +22,10 @@ func getRegistryPath() map[string]interface{} { return registry } +func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { + return 0 +} + func TestNewFeatureStore(t *testing.T) { t.Skip("@todo(achals): feature_repo isn't checked in yet") config := RepoConfig{ @@ -32,7 +36,7 @@ func TestNewFeatureStore(t *testing.T) { "type": "redis", }, } - fs, err := NewFeatureStore(&config) + fs, err := NewFeatureStore(&config, dummyTransformCallback) assert.Nil(t, err) assert.IsType(t, &RedisOnlineStore{}, fs.onlineStore) } @@ -58,35 +62,36 @@ func TestGetOnlineFeaturesRedis(t *testing.T) { {Val: &types.Value_Int64Val{Int64Val: 1003}}}}, } - fs, err := NewFeatureStore(&config) + fs, err := NewFeatureStore(&config, dummyTransformCallback) assert.Nil(t, err) ctx := context.Background() - response, err := fs.GetOnlineFeatures(ctx, featureNames, nil, entities, true) + response, err := fs.GetOnlineFeatures( + ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true) assert.Nil(t, err) - assert.Len(t, response, 4) // 3 features + 1 entity = 4 columns (feature vectors) in response + assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response } func TestGroupingFeatureRefs(t *testing.T) { viewA := &FeatureView{ - base: &BaseFeatureView{ - name: "viewA", - projection: &FeatureViewProjection{ - nameAlias: "aliasViewA", + Base: &BaseFeatureView{ + Name: "viewA", + Projection: &FeatureViewProjection{ + NameAlias: "aliasViewA", }, }, - entities: map[string]struct{}{"driver": {}, "customer": {}}, + Entities: map[string]struct{}{"driver": {}, "customer": {}}, } viewB := &FeatureView{ - base: &BaseFeatureView{name: "viewB"}, - entities: map[string]struct{}{"driver": {}, "customer": {}}, + Base: &BaseFeatureView{Name: "viewB"}, + Entities: map[string]struct{}{"driver": {}, "customer": {}}, } viewC := &FeatureView{ - base: &BaseFeatureView{name: "viewC"}, - entities: map[string]struct{}{"driver": {}}, + Base: &BaseFeatureView{Name: "viewC"}, + Entities: map[string]struct{}{"driver": {}}, } viewD := &FeatureView{ - base: &BaseFeatureView{name: "viewD"}, - entities: map[string]struct{}{"customer": {}}, + Base: &BaseFeatureView{Name: "viewD"}, + Entities: map[string]struct{}{"customer": {}}, } refGroups, _ := groupFeatureRefs( []*featureViewAndRefs{ @@ -152,18 +157,18 @@ func TestGroupingFeatureRefs(t *testing.T) { func TestGroupingFeatureRefsWithJoinKeyAliases(t *testing.T) { viewA := &FeatureView{ - base: &BaseFeatureView{ - name: "viewA", - projection: &FeatureViewProjection{ - name: "viewA", - joinKeyMap: map[string]string{"location_id": "destination_id"}, + Base: &BaseFeatureView{ + Name: "viewA", + Projection: &FeatureViewProjection{ + Name: "viewA", + JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - entities: map[string]struct{}{"location": {}}, + Entities: map[string]struct{}{"location": {}}, } viewB := &FeatureView{ - base: &BaseFeatureView{name: "viewB"}, - entities: map[string]struct{}{"location": {}}, + Base: &BaseFeatureView{Name: "viewB"}, + Entities: map[string]struct{}{"location": {}}, } refGroups, _ := groupFeatureRefs( @@ -211,14 +216,14 @@ func TestGroupingFeatureRefsWithJoinKeyAliases(t *testing.T) { func TestGroupingFeatureRefsWithMissingKey(t *testing.T) { viewA := &FeatureView{ - base: &BaseFeatureView{ - name: "viewA", - projection: &FeatureViewProjection{ - name: "viewA", - joinKeyMap: map[string]string{"location_id": "destination_id"}, + Base: &BaseFeatureView{ + Name: "viewA", + Projection: &FeatureViewProjection{ + Name: "viewA", + JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - entities: map[string]struct{}{"location": {}}, + Entities: map[string]struct{}{"location": {}}, } _, err := groupFeatureRefs( diff --git a/go/internal/feast/featureview.go b/go/internal/feast/featureview.go index 94930f4eea4..6e5c5b3cfc6 100644 --- a/go/internal/feast/featureview.go +++ b/go/internal/feast/featureview.go @@ -15,32 +15,32 @@ const ( var DUMMY_ENTITY types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} type FeatureView struct { - base *BaseFeatureView - ttl *durationpb.Duration + Base *BaseFeatureView + Ttl *durationpb.Duration // Make entities set so that search for dummy entity is faster - entities map[string]struct{} + Entities map[string]struct{} } func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { - featureView := &FeatureView{base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features), - ttl: &(*proto.Spec.Ttl), + featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features), + Ttl: &(*proto.Spec.Ttl), } if len(proto.Spec.Entities) == 0 { - featureView.entities = map[string]struct{}{DUMMY_ENTITY_NAME: {}} + featureView.Entities = map[string]struct{}{DUMMY_ENTITY_NAME: {}} } else { - featureView.entities = make(map[string]struct{}) + featureView.Entities = make(map[string]struct{}) for _, entityName := range proto.Spec.Entities { - featureView.entities[entityName] = struct{}{} + featureView.Entities[entityName] = struct{}{} } } return featureView } func (fs *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureView { - ttl := durationpb.Duration{Seconds: fs.ttl.Seconds, Nanos: fs.ttl.Nanos} - featureView := &FeatureView{base: base, - ttl: &ttl, - entities: fs.entities, + ttl := durationpb.Duration{Seconds: fs.Ttl.Seconds, Nanos: fs.Ttl.Nanos} + featureView := &FeatureView{Base: base, + Ttl: &ttl, + Entities: fs.Entities, } return featureView } diff --git a/go/internal/feast/featureviewprojection.go b/go/internal/feast/featureviewprojection.go index ba80838490f..475483a0813 100644 --- a/go/internal/feast/featureviewprojection.go +++ b/go/internal/feast/featureviewprojection.go @@ -5,37 +5,37 @@ import ( ) type FeatureViewProjection struct { - name string - nameAlias string - features []*Feature - joinKeyMap map[string]string + Name string + NameAlias string + Features []*Feature + JoinKeyMap map[string]string } -func (fv *FeatureViewProjection) nameToUse() string { - if len(fv.nameAlias) == 0 { - return fv.name +func (fv *FeatureViewProjection) NameToUse() string { + if len(fv.NameAlias) == 0 { + return fv.Name } - return fv.nameAlias + return fv.NameAlias } func NewFeatureViewProjectionFromProto(proto *core.FeatureViewProjection) *FeatureViewProjection { - featureProjection := &FeatureViewProjection{name: proto.FeatureViewName, - nameAlias: proto.FeatureViewNameAlias, - joinKeyMap: proto.JoinKeyMap, + featureProjection := &FeatureViewProjection{Name: proto.FeatureViewName, + NameAlias: proto.FeatureViewNameAlias, + JoinKeyMap: proto.JoinKeyMap, } features := make([]*Feature, len(proto.FeatureColumns)) for index, featureSpecV2 := range proto.FeatureColumns { features[index] = NewFeatureFromProto(featureSpecV2) } - featureProjection.features = features + featureProjection.Features = features return featureProjection } func NewFeatureViewProjectionFromDefinition(base *BaseFeatureView) *FeatureViewProjection { - return &FeatureViewProjection{name: base.name, - nameAlias: "", - features: base.features, - joinKeyMap: make(map[string]string), + return &FeatureViewProjection{Name: base.Name, + NameAlias: "", + Features: base.Features, + JoinKeyMap: make(map[string]string), } } diff --git a/go/internal/feast/ondemandfeatureview.go b/go/internal/feast/ondemandfeatureview.go index 436cb0a3d53..1176499e656 100644 --- a/go/internal/feast/ondemandfeatureview.go +++ b/go/internal/feast/ondemandfeatureview.go @@ -20,7 +20,7 @@ func NewOnDemandFeatureViewFromProto(proto *core.OnDemandFeatureView) *OnDemandF if onDemandSourceFeatureView, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureView); ok { featureViewProto := onDemandSourceFeatureView.FeatureView featureView := NewFeatureViewFromProto(featureViewProto) - onDemandFeatureView.sourceFeatureViewProjections[sourceName] = featureView.base.projection + onDemandFeatureView.sourceFeatureViewProjections[sourceName] = featureView.Base.Projection } else if onDemandSourceFeatureViewProjection, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureViewProjection); ok { featureProjectionProto := onDemandSourceFeatureViewProjection.FeatureViewProjection onDemandFeatureView.sourceFeatureViewProjections[sourceName] = NewFeatureViewProjectionFromProto(featureProjectionProto) @@ -35,10 +35,21 @@ func NewOnDemandFeatureViewFromProto(proto *core.OnDemandFeatureView) *OnDemandF return onDemandFeatureView } -func (fs *OnDemandFeatureView) NewOnDemandFeatureViewFromBase(base *BaseFeatureView) *OnDemandFeatureView { +func (fs *OnDemandFeatureView) NewWithProjection(projection *FeatureViewProjection) (*OnDemandFeatureView, error) { + projectedBase, err := fs.base.withProjection(projection) + if err != nil { + return nil, err + } + featureView := &OnDemandFeatureView{ + base: projectedBase, + sourceFeatureViewProjections: fs.sourceFeatureViewProjections, + sourceRequestDataSources: fs.sourceRequestDataSources, + } + return featureView, nil +} - featureView := &OnDemandFeatureView{base: base} - return featureView +func (fs *OnDemandFeatureView) projectWithFeatures(featureNames []string) (*OnDemandFeatureView, error) { + return fs.NewWithProjection(fs.base.projectWithFeatures(featureNames)) } func (fs *OnDemandFeatureView) getRequestDataSchema() map[string]types.ValueType_Enum { diff --git a/go/internal/feast/transformation.go b/go/internal/feast/transformation.go new file mode 100644 index 00000000000..d89e241c047 --- /dev/null +++ b/go/internal/feast/transformation.go @@ -0,0 +1,200 @@ +package feast + +import ( + "errors" + "fmt" + "github.com/apache/arrow/go/v7/arrow" + "github.com/apache/arrow/go/v7/arrow/array" + "github.com/apache/arrow/go/v7/arrow/cdata" + "github.com/apache/arrow/go/v7/arrow/memory" + "github.com/feast-dev/feast/go/protos/feast/serving" + prototypes "github.com/feast-dev/feast/go/protos/feast/types" + "github.com/feast-dev/feast/go/types" + "google.golang.org/protobuf/types/known/timestamppb" + "strings" + "unsafe" +) + +type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int + +func augmentResponseWithOnDemandTransforms( + onDemandFeatureViews []*OnDemandFeatureView, + requestData map[string]*prototypes.RepeatedValue, + entityRows map[string]*prototypes.RepeatedValue, + features []*FeatureVector, + transformationCallback TransformationCallback, + arrowMemory memory.Allocator, + numRows int, + fullFeatureNames bool, + +) ([]*FeatureVector, error) { + result := make([]*FeatureVector, 0) + var err error + + for _, odfv := range onDemandFeatureViews { + requestContextArrow := make(map[string]array.Interface) + for name, values := range requestData { + requestContextArrow[name], err = types.ProtoValuesToArrowArray(values.Val, arrowMemory, numRows) + if err != nil { + return nil, err + } + } + + for name, values := range entityRows { + requestContextArrow[name], err = types.ProtoValuesToArrowArray(values.Val, arrowMemory, numRows) + if err != nil { + return nil, err + } + } + + retrievedFeatures := make(map[string]array.Interface) + for _, vector := range features { + retrievedFeatures[vector.Name] = vector.Values + } + + onDemandFeatures, err := CallTransformations( + odfv, + retrievedFeatures, + requestContextArrow, + transformationCallback, + numRows, + fullFeatureNames, + ) + if err != nil { + return nil, err + } + result = append(result, onDemandFeatures...) + } + + return result, nil +} + +func CallTransformations( + featureView *OnDemandFeatureView, + retrievedFeatures map[string]array.Interface, + requestContext map[string]array.Interface, + callback TransformationCallback, + numRows int, + fullFeatureNames bool, +) ([]*FeatureVector, error) { + + inputArr := cdata.CArrowArray{} + inputSchema := cdata.CArrowSchema{} + + outArr := cdata.CArrowArray{} + outSchema := cdata.CArrowSchema{} + + defer cdata.ReleaseCArrowArray(&inputArr) + defer cdata.ReleaseCArrowArray(&outArr) + defer cdata.ReleaseCArrowSchema(&inputSchema) + defer cdata.ReleaseCArrowSchema(&outSchema) + + inputArrPtr := uintptr(unsafe.Pointer(&inputArr)) + inputSchemaPtr := uintptr(unsafe.Pointer(&inputSchema)) + + outArrPtr := uintptr(unsafe.Pointer(&outArr)) + outSchemaPtr := uintptr(unsafe.Pointer(&outSchema)) + + inputFields := make([]arrow.Field, 0) + inputColumns := make([]array.Interface, 0) + for name, arr := range retrievedFeatures { + inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) + inputColumns = append(inputColumns, arr) + } + for name, arr := range requestContext { + inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) + inputColumns = append(inputColumns, arr) + } + + inputRecord := array.NewRecord(arrow.NewSchema(inputFields, nil), inputColumns, int64(numRows)) + defer inputRecord.Release() + + cdata.ExportArrowRecordBatch(inputRecord, &inputArr, &inputSchema) + + ret := callback(featureView.base.Name, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr, fullFeatureNames) + + if ret != numRows { + return nil, errors.New("python transformation callback failed") + } + + outRecord, err := cdata.ImportCRecordBatch(&outArr, &outSchema) + if err != nil { + return nil, err + } + + result := make([]*FeatureVector, 0) + for idx, field := range outRecord.Schema().Fields() { + dropFeature := true + + if featureView.base.Projection != nil { + var featureName string + if fullFeatureNames { + featureName = strings.Split(field.Name, "__")[1] + } else { + featureName = field.Name + } + + for _, feature := range featureView.base.Projection.Features { + if featureName == feature.name { + dropFeature = false + } + } + } else { + dropFeature = false + } + + if dropFeature { + continue + } + + statuses := make([]serving.FieldStatus, numRows) + timestamps := make([]*timestamppb.Timestamp, numRows) + + for idx := 0; idx < numRows; idx++ { + statuses[idx] = serving.FieldStatus_PRESENT + timestamps[idx] = timestamppb.Now() + } + + result = append(result, &FeatureVector{ + Name: field.Name, + Values: outRecord.Column(idx), + Statuses: statuses, + Timestamps: timestamps, + }) + } + + return result, nil +} + +func ensureRequestedDataExist(requestedOnDemandFeatureViews []*OnDemandFeatureView, + requestDataFeatures map[string]*prototypes.RepeatedValue) error { + + neededRequestData, err := getNeededRequestData(requestedOnDemandFeatureViews) + if err != nil { + return err + } + missingFeatures := make([]string, 0) + for feature := range neededRequestData { + if _, ok := requestDataFeatures[feature]; !ok { + missingFeatures = append(missingFeatures, feature) + } + } + + if len(missingFeatures) > 0 { + return fmt.Errorf("requestDataNotFoundInEntityRowsException: %s", strings.Join(missingFeatures, ", ")) + } + return nil +} + +func getNeededRequestData(requestedOnDemandFeatureViews []*OnDemandFeatureView) (map[string]struct{}, error) { + neededRequestData := make(map[string]struct{}) + + for _, onDemandFeatureView := range requestedOnDemandFeatureViews { + requestSchema := onDemandFeatureView.getRequestDataSchema() + for fieldName := range requestSchema { + neededRequestData[fieldName] = struct{}{} + } + } + + return neededRequestData, nil +} diff --git a/go/types/typeconversion.go b/go/types/typeconversion.go index 1b577c7732e..cf960db25a1 100644 --- a/go/types/typeconversion.go +++ b/go/types/typeconversion.go @@ -2,9 +2,9 @@ package types import ( "fmt" - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/v7/arrow" + "github.com/apache/arrow/go/v7/arrow/array" + "github.com/apache/arrow/go/v7/arrow/memory" "github.com/feast-dev/feast/go/protos/feast/types" ) @@ -128,7 +128,7 @@ func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) e return nil } -func ArrowValuesToProtoValues(arr array.Interface) ([]*types.Value, error) { +func ArrowValuesToProtoValues(arr arrow.Array) ([]*types.Value, error) { values := make([]*types.Value, 0) if listArr, ok := arr.(*array.List); ok { @@ -250,7 +250,7 @@ func ArrowValuesToProtoValues(arr array.Interface) ([]*types.Value, error) { return values, nil } -func ProtoValuesToArrowArray(protoValues []*types.Value, arrowAllocator memory.Allocator, numRows int) (array.Interface, error) { +func ProtoValuesToArrowArray(protoValues []*types.Value, arrowAllocator memory.Allocator, numRows int) (arrow.Array, error) { var fieldType arrow.DataType var err error diff --git a/go/types/typeconversion_test.go b/go/types/typeconversion_test.go index b0f879fa79f..7407c6ff64a 100644 --- a/go/types/typeconversion_test.go +++ b/go/types/typeconversion_test.go @@ -1,7 +1,7 @@ package types import ( - "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/v7/arrow/memory" "github.com/feast-dev/feast/go/protos/feast/types" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index a18f1582c75..e2da2dfbd8f 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -1,18 +1,28 @@ -from typing import Any, Dict, List, Optional, Union +from functools import partial +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union import pyarrow as pa +from google.protobuf.timestamp_pb2 import Timestamp from pyarrow.cffi import ffi -from feast.errors import FeatureNameCollisionError +from feast.errors import ( + FeatureNameCollisionError, + RequestDataNotFoundInEntityRowsException, +) from feast.feature_service import FeatureService from feast.online_response import OnlineResponse from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse -from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value +from feast.protos.feast.types import Value_pb2 from feast.repo_config import RepoConfig +from feast.value_type import ValueType from .lib.embedded import DataTable, NewOnlineFeatureService, OnlineFeatureServiceConfig from .lib.go import Slice_string +if TYPE_CHECKING: + from feast.feature_store import FeatureStore + + ARROW_TYPE_TO_PROTO_FIELD = { pa.int32(): "int32_val", pa.int64(): "int64_val", @@ -21,62 +31,125 @@ pa.bool_(): "bool_val", pa.string(): "string_val", pa.binary(): "bytes_val", + pa.time64("ns"): "unix_timestamp_val", +} + +ARROW_LIST_TYPE_TO_PROTO_FIELD = { + pa.int32(): "int32_list_val", + pa.int64(): "int64_list_val", + pa.float32(): "float_list_val", + pa.float64(): "double_list_val", + pa.bool_(): "bool_list_val", + pa.string(): "string_list_val", + pa.binary(): "bytes_list_val", + pa.time64("ns"): "unix_timestamp_list_val", +} + +ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = { + pa.int32(): Value_pb2.Int32List, + pa.int64(): Value_pb2.Int64List, + pa.float32(): Value_pb2.FloatList, + pa.float64(): Value_pb2.DoubleList, + pa.bool_(): Value_pb2.BoolList, + pa.string(): Value_pb2.StringList, + pa.binary(): Value_pb2.BytesList, + pa.time64("ns"): Value_pb2.Int64List, +} + +# used for entity types only +PROTO_TYPE_TO_ARROW_TYPE = { + ValueType.INT32: pa.int32(), + ValueType.INT64: pa.int64(), + ValueType.FLOAT: pa.float32(), + ValueType.DOUBLE: pa.float64(), + ValueType.STRING: pa.string(), + ValueType.BYTES: pa.binary(), } class EmbeddedOnlineFeatureServer: - def __init__(self, repo_path: str, repo_config: RepoConfig): + def __init__( + self, repo_path: str, repo_config: RepoConfig, feature_store: "FeatureStore" + ): + # keep callback in self to prevent it from GC + self._transformation_callback = partial(transformation_callback, feature_store) + self._service = NewOnlineFeatureService( OnlineFeatureServiceConfig( RepoPath=repo_path, RepoConfig=repo_config.json() - ) + ), + self._transformation_callback, ) def get_online_features( self, features_refs: List[str], feature_service: Optional[FeatureService], - entities: Dict[str, Union[List[Any], RepeatedValue]], - project: str, + entities: Dict[str, Union[List[Any], Value_pb2.RepeatedValue]], + request_data: Dict[str, Union[List[Any], Value_pb2.RepeatedValue]], full_feature_names: bool = False, ): - entity_fields = [] - entity_arrays = [] - for entity_name, entity_values in entities.items(): - arr = _to_arrow(entity_values) - entity_fields.append((entity_name, arr.type)) - entity_arrays.append(arr) - schema = pa.schema(entity_fields) - batch = pa.RecordBatch.from_arrays(entity_arrays, schema=schema) + if feature_service: + join_keys_types = self._service.GetEntityTypesMapByFeatureService( + feature_service.name + ) + else: + join_keys_types = self._service.GetEntityTypesMap( + Slice_string(features_refs) + ) + + join_keys_types = { + join_key: ValueType(enum_value) for join_key, enum_value in join_keys_types + } # Here we create C structures that will be shared between Python and Go. # We will pass entities as arrow Record Batch to Go part (in_c_array & in_c_schema) # and receive features as Record Batch from Go (out_c_array & out_c_schema) # This objects needs to be initialized here in order to correctly # free them later using Python GC. - out_c_schema = ffi.new("struct ArrowSchema*") - out_ptr_schema = int(ffi.cast("uintptr_t", out_c_schema)) + ( + entities_c_schema, + entities_ptr_schema, + entities_c_array, + entities_ptr_array, + ) = allocate_schema_and_array() + ( + req_data_c_schema, + req_data_ptr_schema, + req_data_c_array, + req_data_ptr_array, + ) = allocate_schema_and_array() - out_c_array = ffi.new("struct ArrowArray*") - out_ptr_array = int(ffi.cast("uintptr_t", out_c_array)) + ( + features_c_schema, + features_ptr_schema, + features_c_array, + features_ptr_array, + ) = allocate_schema_and_array() - in_c_schema = ffi.new("struct ArrowSchema*") - in_ptr_schema = int(ffi.cast("uintptr_t", in_c_schema)) + batch, schema = map_to_record_batch(entities, join_keys_types) + schema._export_to_c(entities_ptr_schema) + batch._export_to_c(entities_ptr_array) - in_c_array = ffi.new("struct ArrowArray*") - in_ptr_array = int(ffi.cast("uintptr_t", in_c_array)) + batch, schema = map_to_record_batch(request_data) + schema._export_to_c(req_data_ptr_schema) + batch._export_to_c(req_data_ptr_array) - schema._export_to_c(in_ptr_schema) - batch._export_to_c(in_ptr_array) try: self._service.GetOnlineFeatures( featureRefs=Slice_string(features_refs), featureServiceName=feature_service and feature_service.name or "", - entities=DataTable(SchemaPtr=in_ptr_schema, DataPtr=in_ptr_array), - projectName=project, + entities=DataTable( + SchemaPtr=entities_ptr_schema, DataPtr=entities_ptr_array + ), + requestData=DataTable( + SchemaPtr=req_data_ptr_schema, DataPtr=req_data_ptr_array + ), fullFeatureNames=full_feature_names, - output=DataTable(SchemaPtr=out_ptr_schema, DataPtr=out_ptr_array), + output=DataTable( + SchemaPtr=features_ptr_schema, DataPtr=features_ptr_array + ), ) except RuntimeError as exc: (msg,) = exc.args @@ -88,37 +161,135 @@ def get_online_features( full_feature_names=full_feature_names, ) - raise - - result = pa.RecordBatch._import_from_c(out_ptr_array, out_ptr_schema) - - resp = GetOnlineFeaturesResponse() - - for idx, field in enumerate(result.schema): - feature_vector = GetOnlineFeaturesResponse.FeatureVector() - - if field.type == pa.null(): - feature_vector.values.extend([Value()] * len(result.columns[idx])) - else: - proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[field.type] - for v in result.columns[idx].tolist(): - feature_vector.values.append(Value(**{proto_field_name: v})) + if msg.startswith("requestDataNotFoundInEntityRowsException"): + feature_refs = msg[len("requestDataNotFoundInEntityRowsException: ") :] + feature_refs = feature_refs.split(",") + raise RequestDataNotFoundInEntityRowsException(feature_refs) - resp.results.append(feature_vector) - resp.metadata.feature_names.val.append(field.name) + raise + record_batch = pa.RecordBatch._import_from_c( + features_ptr_array, features_ptr_schema + ) + resp = record_batch_to_online_response(record_batch) return OnlineResponse(resp) -def _to_arrow(value) -> pa.Array: - if isinstance(value, RepeatedValue): +def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array: + if isinstance(value, Value_pb2.RepeatedValue): _proto_to_arrow(value) + if type_hint in PROTO_TYPE_TO_ARROW_TYPE: + return pa.array(value, PROTO_TYPE_TO_ARROW_TYPE[type_hint]) + return pa.array(value) -def _proto_to_arrow(value: RepeatedValue) -> pa.Array: +def _proto_to_arrow(value: Value_pb2.RepeatedValue) -> pa.Array: """ ToDo: support entity rows already packed in protos """ raise NotImplementedError + + +def transformation_callback( + fs: "FeatureStore", + on_demand_feature_view_name: str, + input_arr_ptr: int, + input_schema_ptr: int, + output_arr_ptr: int, + output_schema_ptr: int, + full_feature_names: bool, +) -> int: + odfv = fs.get_on_demand_feature_view(on_demand_feature_view_name) + + input_record = pa.RecordBatch._import_from_c(input_arr_ptr, input_schema_ptr) + + output = odfv.get_transformed_features_df( + input_record.to_pandas(), full_feature_names=full_feature_names + ) + output_record = pa.RecordBatch.from_pandas(output) + + output_record.schema._export_to_c(output_schema_ptr) + output_record._export_to_c(output_arr_ptr) + + return output_record.num_rows + + +def allocate_schema_and_array(): + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + + c_array = ffi.new("struct ArrowArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + return c_schema, ptr_schema, c_array, ptr_array + + +def map_to_record_batch( + map: Dict[str, Union[List[Any], Value_pb2.RepeatedValue]], + type_hint: Optional[Dict[str, ValueType]] = None, +) -> Tuple[pa.RecordBatch, pa.Schema]: + fields = [] + columns = [] + type_hint = type_hint or {} + + for name, values in map.items(): + arr = _to_arrow(values, type_hint.get(name)) + fields.append((name, arr.type)) + columns.append(arr) + + schema = pa.schema(fields) + batch = pa.RecordBatch.from_arrays(columns, schema=schema) + return batch, schema + + +def record_batch_to_online_response(record_batch): + resp = GetOnlineFeaturesResponse() + + for idx, field in enumerate(record_batch.schema): + if field.name.endswith("__timestamp") or field.name.endswith("__status"): + continue + + feature_vector = GetOnlineFeaturesResponse.FeatureVector( + statuses=record_batch.columns[idx + 1].to_pylist(), + event_timestamps=[ + Timestamp(seconds=seconds) + for seconds in record_batch.columns[idx + 2].to_pylist() + ], + ) + + if field.type == pa.null(): + feature_vector.values.extend( + [Value_pb2.Value()] * len(record_batch.columns[idx]) + ) + else: + if isinstance(field.type, pa.ListType): + proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[ + field.type.value_type + ] + proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[field.type.value_type] + + column = record_batch.columns[idx] + if field.type.value_type == pa.time64("ns"): + column = column.cast(pa.list_(pa.int64())) + + for v in column.tolist(): + feature_vector.values.append( + Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)}) + ) + else: + proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[field.type] + + column = record_batch.columns[idx] + if field.type == pa.time64("ns"): + column = column.cast(pa.int64()) + + for v in column.tolist(): + feature_vector.values.append( + Value_pb2.Value(**{proto_field_name: v}) + ) + + resp.results.append(feature_vector) + resp.metadata.feature_names.val.append(field.name) + + return resp diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c5388e1d082..63db1514129 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -135,7 +135,6 @@ def __init__( self._registry = Registry(registry_config, repo_path=self.repo_path) self._registry._initialize_registry() self._provider = get_provider(self.config, self.repo_path) - self._go_server = None @log_exceptions def version(self) -> str: @@ -766,6 +765,11 @@ def apply( self._registry.commit() + # go server needs to be reloaded to apply new configuration. + # we're stopping it here + # new server will be instantiated on the next online request + self._teardown_go_server() + @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" @@ -778,6 +782,7 @@ def teardown(self): self._get_provider().teardown_infra(self.project, tables, entities) self._registry.teardown() + self._teardown_go_server() @log_exceptions_and_usage def get_historical_features( @@ -1288,7 +1293,7 @@ def get_online_features( # Lazily start the go server on the first request if self._go_server is None: self._go_server = EmbeddedOnlineFeatureServer( - str(self.repo_path.absolute()), self.config + str(self.repo_path.absolute()), self.config, self ) return self._go_server.get_online_features( @@ -1297,8 +1302,8 @@ def get_online_features( if isinstance(features, FeatureService) else None, entities=columnar, + request_data={}, # TODO: add request data parameter to public API full_feature_names=full_feature_names, - project=self.config.project, ) return self._get_online_features( @@ -1939,6 +1944,9 @@ def serve_transformations(self, port: int) -> None: transformation_server.start_server(self, port) + def _teardown_go_server(self): + self._go_server = None + def _validate_entity_values(join_key_values: Dict[str, List[Value]]): set_of_row_lengths = {len(v) for v in join_key_values.values()} diff --git a/sdk/python/feast/go_server.py b/sdk/python/feast/go_server.py deleted file mode 100644 index 1fcbab61f08..00000000000 --- a/sdk/python/feast/go_server.py +++ /dev/null @@ -1,297 +0,0 @@ -# Copyright 2022 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import atexit -import logging -import os -import platform -import random -import string -import subprocess -import tempfile -import threading -import time -from pathlib import Path -from subprocess import Popen -from typing import Any, Dict, List, Optional, Union - -import grpc -from tenacity import retry, stop_after_attempt, stop_after_delay, wait_exponential - -import feast -from feast.errors import FeatureNameCollisionError, InvalidFeaturesParameterType -from feast.feature_service import FeatureService -from feast.flags_helper import is_test -from feast.online_response import OnlineResponse -from feast.protos.feast.serving.ServingService_pb2 import ( - GetFeastServingInfoRequest, - GetOnlineFeaturesRequest, -) -from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub -from feast.repo_config import RepoConfig -from feast.type_map import python_values_to_proto_values - -_logger = logging.getLogger(__name__) - - -class GoServerConnection: - def __init__(self, config: RepoConfig, repo_path: str): - self._process: Optional[Popen[bytes]] = None - self._config = config - self._repo_path = repo_path - self.temp_dir = tempfile.TemporaryDirectory() - self._client: Optional[ServingServiceStub] = None - - @property - def client(self): - if self._client: - return self._client - raise RuntimeError("Client not established with go subprocess") - - def _get_unix_domain_file_path(self) -> Path: - # This method should return a file that go server should listen on and that the python channel - # should communicate to. - now = time.time_ns() - letters = string.ascii_lowercase - random_suffix = "".join(random.choice(letters) for _ in range(10)) - - return Path(self.temp_dir.name, f"{now}_{random_suffix}.sock") - - def connect(self) -> bool: - self.sock_file = self._get_unix_domain_file_path() - env = { - "FEAST_REPO_CONFIG": self._config.json(), - "FEAST_REPO_PATH": self._repo_path, - "FEAST_GRPC_SOCK_FILE": str(self.sock_file), - **os.environ, - } - cwd = feast.__path__[0] - goos = platform.system().lower() - goarch = "amd64" if platform.machine() == "x86_64" else "arm64" - executable = ( - feast.__path__[0] + f"/binaries/server_{goos}_{goarch}" - if not is_test() - else feast.__path__[0] + "/binaries/server" - ) - # Automatically reconnect with go subprocess exits - self._process = Popen([executable], cwd=cwd, env=env,) - - channel = grpc.insecure_channel(f"unix:{self.sock_file}") - self._client = ServingServiceStub(channel) - - try: - self._check_grpc_connection() - return True - except grpc.RpcError: - return False - - def kill_process(self): - if self._process: - self._process.terminate() - - def is_process_alive(self): - return self._process and self._process.poll() is None - - def wait_for_process(self, timeout): - self._process.wait(timeout) - - # Make sure the connection can be used for feature retrieval before returning from - # constructor. We try connecting to the Go subprocess for 5 seconds or at most 50 times - @retry( - stop=(stop_after_delay(10) | stop_after_attempt(50)), - wait=wait_exponential(multiplier=0.1, min=0.1, max=5), - ) - def _check_grpc_connection(self): - return self.client.GetFeastServingInfo(request=GetFeastServingInfoRequest()) - - -class GoServer: - """ - A GoServer defines a thin Python wrapper around a Go gRPC server that retrieves and - serves online features. - - Attributes: - _repo_path: The path to the Feast repo for which this go server is defined. - _config: The RepoConfig for the Feast repo for which this go server is defined. - """ - - _repo_path: str - _config: RepoConfig - - def __init__(self, repo_path: str, config: RepoConfig): - """Creates a GoServer object.""" - self._repo_path = repo_path - self._config = config - self._go_server_started = threading.Event() - self._shared_connection = GoServerConnection(config, repo_path) - self._dev_mode = "dev" in feast.__version__ - - self._start_go_server_use_thread() - - def get_online_features( - self, - features: Union[List[str], FeatureService], - entities: Dict[str, List[Any]], - full_feature_names: bool = False, - ) -> OnlineResponse: - """ - Retrieves the latest online feature data. - - Args: - features: Either a list of feature references or a feature service that - determines which features will be retrieved. Feature references should - be of the form "feature_view:feature". - entity_rows: A list of dictionaries where each key-value pair is an - entity-name or entity-value pair. - full_feature_names: Whether feature names should be returned with feature - view names as prefixes, changing them from the format "feature" to - "feature_view__feature". - - Returns: - An OnlineResponse containing the feature data. - - Raises: - InvalidFeaturesParameterType: If features is not a list or a feature service. - FeatureNameCollisionError: If a feature reference is specified multiple times. - ValueError: If some other error occurs. - """ - # Wait for go server subprocess to restart before asking for features - if not self._go_server_started.is_set(): - self._go_server_started.wait() - - request = GetOnlineFeaturesRequest(full_feature_names=full_feature_names) - if isinstance(features, FeatureService): - request.feature_service = features.name - elif isinstance(features, list): - request.features.val.extend(features) - else: - raise InvalidFeaturesParameterType(features) - - for key, values in entities.items(): - request.entities[key].val.extend(python_values_to_proto_values(values)) - - try: - response = self._shared_connection.client.GetOnlineFeatures(request=request) - except grpc.RpcError as rpc_error: - # Socket might not have closed if this is a grpc problem. - if rpc_error.code() == grpc.StatusCode.UNAVAILABLE: - # If the server became unavailable, it could mean that the subprocess died or fell - # into a bad state, so the resolution is to wait for go server to restart in the background - if not self._go_server_started.is_set(): - self._go_server_started.wait() - # Retry request with the new Go subprocess - response = self._shared_connection.client.GetOnlineFeatures( - request=request - ) - else: - error_message = rpc_error.details() - if error_message.lower().startswith( - FeatureNameCollisionError.__name__.lower() - ): - parsed_error_message = error_message.split(": ")[1].split("; ") - collided_feature_refs = parsed_error_message[0].split(", ") - full_feature_names = parsed_error_message[1] == "true" - raise FeatureNameCollisionError( - collided_feature_refs, full_feature_names - ) - elif error_message.lower().startswith(ValueError.__name__.lower()): - parsed_error_message = error_message.split(": ")[1] - raise ValueError(parsed_error_message) - else: - raise - - return OnlineResponse(response) - - def _start_go_server_use_thread(self): - - self._go_server_background_thread = GoServerMonitorThread( - "GoServerMonitorThread", self._shared_connection, self._go_server_started - ) - self._go_server_background_thread.start() - atexit.register(lambda: self._go_server_background_thread.stop()) - - # Wait for go server subprocess to start for the first time before returning - self._go_server_started.wait() - - def kill_go_server_explicitly(self): - self._go_server_background_thread._is_cancelled.set() - self._go_server_background_thread.stop() - self._go_server_background_thread.join() - - -class GoServerMonitorThread(threading.Thread): - def __init__( - self, - name: str, - shared_connection: GoServerConnection, - go_server_first_started: threading.Event, - ): - threading.Thread.__init__(self) - self.name = name - self._shared_connection = shared_connection - self._is_cancelled = threading.Event() - self.daemon = True - self._go_server_started = go_server_first_started - - def run(self): - # Target function of the thread class - _logger.debug( - "%s Started monitoring thread to keep go feature server alive", self.ident - ) - try: - while not self._is_cancelled.is_set(): - - # If we fail to connect to grpc stub, terminate subprocess and repeat - _logger.debug("%s Connecting to subprocess", self.ident) - if not self._shared_connection.connect(): - _logger.debug( - "%s Failed to connect, killing and retrying", self.ident - ) - self._shared_connection.kill_process() - continue - else: - _logger.debug( - "%s Go feature server started, process: %s", - self.ident, - self._shared_connection._process.pid, - ) - self._go_server_started.set() - _logger.debug( - "%s is_cancelled status: %s", self.ident, self._is_cancelled - ) - while not self._is_cancelled.is_set(): - try: - # Making a blocking wait by setting timeout to a very long time so we don't waste cpu cycle - self._shared_connection.wait_for_process(3600) - except subprocess.TimeoutExpired: - pass - _logger.debug( - "%s No longer waiting for process: %s, %s, %s", - self.ident, - self._shared_connection._process.pid, - self._shared_connection._process.returncode, - self._shared_connection.is_process_alive(), - ) - if not self._shared_connection.is_process_alive(): - break - finally: - # Main thread exits - self._shared_connection.kill_process() - - def stop(self): - _logger.debug( - "%s Stopping monitoring thread and terminating go feature server", - self.ident, - ) - self._is_cancelled.set() - self._shared_connection.kill_process() diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 18f1ece8eba..b4721366066 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -101,6 +101,10 @@ offline_store_creator=SnowflakeDataSourceCreator, online_store=REDIS_CONFIG, ), + # Go implementation for online retrieval + IntegrationTestRepoConfig( + online_store=REDIS_CONFIG, go_feature_server=True, + ), ] ) full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index 6066009a6d6..dfe14d73f96 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -45,7 +45,7 @@ def test_registration_and_retrieval_from_custom_s3_endpoint(universal_data_sourc fs.materialize(environment.start_date, environment.end_date) out = fs.get_online_features( - features=["driver_stats:conv_rate"], entity_rows=[{"driver": 5001}] + features=["driver_stats:conv_rate"], entity_rows=[{"driver_id": 5001}] ).to_dict() assert out["conv_rate"][0] is not None diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 113fae5b79e..88c413fde46 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -397,7 +397,11 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name feature_service = FeatureService( "convrate_plus100", - features=[feature_views.driver[["conv_rate"]], feature_views.driver_odfv], + features=[ + feature_views.driver[["conv_rate"]], + feature_views.driver_odfv, + feature_views.customer[["current_balance"]], + ], ) feature_service_entity_mapping = FeatureService( name="entity_mapping", diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 59ca119f98a..ff68f3fd7f8 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -230,7 +230,7 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): driver_id_value = "1" if config.entity_type == ValueType.STRING else 1 online_features = fs.get_online_features( - features=features, entity_rows=[{"driver": driver_id_value}], + features=features, entity_rows=[{"driver_id": driver_id_value}], ).to_dict() feature_list_dtype_to_expected_online_response_value_type = { From a5e68a8bc49676b2c91249bbcd8d02d2519e3c90 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 8 Apr 2022 15:45:25 -0700 Subject: [PATCH 2/2] use nil instead of dummy implementation for transformation callback Signed-off-by: pyalex --- go/cmd/server/main.go | 6 +--- go/cmd/server/server_test.go | 2 +- go/internal/feast/featurestore.go | 45 +++++++++++++------------- go/internal/feast/featurestore_test.go | 8 ++--- go/internal/feast/transformation.go | 6 ++++ 5 files changed, 33 insertions(+), 34 deletions(-) diff --git a/go/cmd/server/main.go b/go/cmd/server/main.go index ae0f4dae2e9..c0e735a422b 100644 --- a/go/cmd/server/main.go +++ b/go/cmd/server/main.go @@ -17,10 +17,6 @@ const ( feastServerVersion = "0.18.0" ) -func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { - return 0 -} - // TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus func main() { repoPath := os.Getenv(flagFeastRepoPath) @@ -45,7 +41,7 @@ func main() { } log.Println("Initializing feature store...") - fs, err := feast.NewFeatureStore(repoConfig, dummyTransformCallback) + fs, err := feast.NewFeatureStore(repoConfig, nil) if err != nil { log.Fatalln(err) } diff --git a/go/cmd/server/server_test.go b/go/cmd/server/server_test.go index 7e19600b38c..e5e448a4656 100644 --- a/go/cmd/server/server_test.go +++ b/go/cmd/server/server_test.go @@ -41,7 +41,7 @@ func getClient(ctx context.Context, basePath string) (serving.ServingServiceClie if err != nil { panic(err) } - fs, err := feast.NewFeatureStore(config, dummyTransformCallback) + fs, err := feast.NewFeatureStore(config, nil) if err != nil { panic(err) } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 6f033b6ab9d..3b196bde3a3 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -19,10 +19,10 @@ import ( ) type FeatureStore struct { - config *RepoConfig - registry *Registry - onlineStore OnlineStore - tranformationCallback TransformationCallback + config *RepoConfig + registry *Registry + onlineStore OnlineStore + transformationCallback TransformationCallback } // A Features struct specifies a list of features to be retrieved from the online store. These features @@ -86,10 +86,10 @@ func NewFeatureStore(config *RepoConfig, callback TransformationCallback) (*Feat registry.initializeRegistry() return &FeatureStore{ - config: config, - registry: registry, - onlineStore: onlineStore, - tranformationCallback: callback, + config: config, + registry: registry, + onlineStore: onlineStore, + transformationCallback: callback, }, nil } @@ -189,22 +189,23 @@ func (fs *FeatureStore) GetOnlineFeatures( result = append(result, vectors...) } - onDemandFeatures, err := augmentResponseWithOnDemandTransforms( - requestedOnDemandFeatureViews, - requestData, - joinKeyToEntityValues, - result, - fs.tranformationCallback, - arrowMemory, - numRows, - fullFeatureNames, - ) - if err != nil { - return nil, err + if fs.transformationCallback != nil { + onDemandFeatures, err := augmentResponseWithOnDemandTransforms( + requestedOnDemandFeatureViews, + requestData, + joinKeyToEntityValues, + result, + fs.transformationCallback, + arrowMemory, + numRows, + fullFeatureNames, + ) + if err != nil { + return nil, err + } + result = append(result, onDemandFeatures...) } - result = append(result, onDemandFeatures...) - result, err = keepOnlyRequestedFeatures(result, featureRefs, featureService, fullFeatureNames) if err != nil { return nil, err diff --git a/go/internal/feast/featurestore_test.go b/go/internal/feast/featurestore_test.go index 7c393b3d40e..6fdf4fd3b60 100644 --- a/go/internal/feast/featurestore_test.go +++ b/go/internal/feast/featurestore_test.go @@ -22,10 +22,6 @@ func getRegistryPath() map[string]interface{} { return registry } -func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { - return 0 -} - func TestNewFeatureStore(t *testing.T) { t.Skip("@todo(achals): feature_repo isn't checked in yet") config := RepoConfig{ @@ -36,7 +32,7 @@ func TestNewFeatureStore(t *testing.T) { "type": "redis", }, } - fs, err := NewFeatureStore(&config, dummyTransformCallback) + fs, err := NewFeatureStore(&config, nil) assert.Nil(t, err) assert.IsType(t, &RedisOnlineStore{}, fs.onlineStore) } @@ -62,7 +58,7 @@ func TestGetOnlineFeaturesRedis(t *testing.T) { {Val: &types.Value_Int64Val{Int64Val: 1003}}}}, } - fs, err := NewFeatureStore(&config, dummyTransformCallback) + fs, err := NewFeatureStore(&config, nil) assert.Nil(t, err) ctx := context.Background() response, err := fs.GetOnlineFeatures( diff --git a/go/internal/feast/transformation.go b/go/internal/feast/transformation.go index d89e241c047..00cc0b42369 100644 --- a/go/internal/feast/transformation.go +++ b/go/internal/feast/transformation.go @@ -15,6 +15,12 @@ import ( "unsafe" ) +/* + TransformationCallback is a Python callback function's expected signature. + The function should accept name of the on demand feature view and pointers to input & output record batches. + Each record batch is being passed as two pointers: pointer to array (data) and pointer to schema. + Python function is expected to return number of rows added to the output record batch. +*/ type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int func augmentResponseWithOnDemandTransforms(