From 9d16da79b1bd51e9d04a58f781479aa603ef1a51 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Mon, 9 May 2022 16:17:08 -0700 Subject: [PATCH 1/2] reorder fields in log schema Signed-off-by: Oleksii Moskalenko --- sdk/python/feast/feature_logging.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 70fab930bbf..c47922b6de2 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -52,17 +52,6 @@ def get_schema(self, registry: "Registry") -> pa.Schema: fields: Dict[str, pa.DataType] = {} for projection in self._feature_service.feature_view_projections: - for feature in projection.features: - fields[ - f"{projection.name_to_use()}__{feature.name}" - ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] - fields[ - f"{projection.name_to_use()}__{feature.name}__timestamp" - ] = PA_TIMESTAMP_TYPE - fields[ - f"{projection.name_to_use()}__{feature.name}__status" - ] = pa.int32() - try: feature_view = registry.get_feature_view(projection.name, self._project) except FeatureViewNotFoundException: @@ -91,9 +80,21 @@ def get_schema(self, registry: "Registry") -> pa.Schema: from_value_type(entity.value_type) ] + for feature in projection.features: + fields[ + f"{projection.name_to_use()}__{feature.name}" + ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] + fields[ + f"{projection.name_to_use()}__{feature.name}__timestamp" + ] = PA_TIMESTAMP_TYPE + fields[ + f"{projection.name_to_use()}__{feature.name}__status" + ] = pa.int32() + # system columns - fields[REQUEST_ID_FIELD] = pa.string() fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) + fields[LOG_DATE_FIELD] = pa.date32() + fields[REQUEST_ID_FIELD] = pa.string() return pa.schema( [pa.field(name, data_type) for name, data_type in fields.items()] From a19a1fa4f6e10f8d6a6795d44d1143601c53d027 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Tue, 10 May 2022 09:43:06 -0700 Subject: [PATCH 2/2] add comment Signed-off-by: Oleksii Moskalenko --- sdk/python/feast/feature_logging.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index c47922b6de2..e2982988366 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -52,6 +52,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema: fields: Dict[str, pa.DataType] = {} for projection in self._feature_service.feature_view_projections: + # The order of fields in the generated schema should match + # the order created on the other side (inside Go logger). + # Otherwise, some offline stores might not accept parquet files (produced by Go). + # Go code can be found here: + # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51 try: feature_view = registry.get_feature_view(projection.name, self._project) except FeatureViewNotFoundException: