Expected Behavior
spark materialize engine working on yarn(not local[*])
Current Behavior
not working... since spark on yarn cannot share files without a common file system(like hdfs, cifs, ...)
Steps to reproduce
set spark.master config to yarn
Specifications
- Version: 0.26.0
- Platform: centos 7
- Subsystem: python 3.9 / spark 3.3.0
Possible Solution
AS-IS
|
class _SparkSerializedArtifacts: |
|
"""Class to assist with serializing unpicklable artifacts to the spark workers""" |
|
|
|
feature_view_proto: str |
|
repo_config_file: str |
|
|
|
@classmethod |
|
def serialize(cls, feature_view, repo_config): |
|
|
|
# serialize to proto |
|
feature_view_proto = feature_view.to_proto().SerializeToString() |
|
|
|
# serialize repo_config to disk. Will be used to instantiate the online store |
|
repo_config_file = tempfile.NamedTemporaryFile(delete=False).name |
|
with open(repo_config_file, "wb") as f: |
|
dill.dump(repo_config, f) |
|
|
|
return _SparkSerializedArtifacts( |
|
feature_view_proto=feature_view_proto, repo_config_file=repo_config_file |
|
) |
|
|
|
def unserialize(self): |
|
# unserialize |
|
proto = FeatureViewProto() |
|
proto.ParseFromString(self.feature_view_proto) |
|
feature_view = FeatureView.from_proto(proto) |
|
|
|
# load |
|
with open(self.repo_config_file, "rb") as f: |
|
repo_config = dill.load(f) |
|
|
|
provider = PassthroughProvider(repo_config) |
|
online_store = provider.online_store |
|
return feature_view, online_store, repo_config |
TO-BE
class _SparkSerializedArtifacts:
"""Class to assist with serializing unpicklable artifacts to the spark workers"""
feature_view_proto: str
repo_config_byte: str
@classmethod
def serialize(cls, feature_view, repo_config):
# serialize to proto
feature_view_proto = feature_view.to_proto().SerializeToString()
# serialize repo_config to disk. Will be used to instantiate the online store
repo_config_byte = dill.dumps(repo_config)
return _SparkSerializedArtifacts(
feature_view_proto=feature_view_proto, repo_config_byte=repo_config_byte
)
def unserialize(self):
# unserialize
proto = FeatureViewProto()
proto.ParseFromString(self.feature_view_proto)
feature_view = FeatureView.from_proto(proto)
# load
repo_config = dill.load(self.repo_config_byte)
provider = PassthroughProvider(repo_config)
online_store = provider.online_store
return feature_view, online_store, repo_config
use dill.dumps and dill.loads instead of dill.dump and dill.load
string types can be serialized and deserialized using pyspark seializer. So you don't need to pass the repo_config path.
this solution works for me.
Expected Behavior
spark materialize engine working on yarn(not local[*])
Current Behavior
not working... since spark on yarn cannot share files without a common file system(like hdfs, cifs, ...)
Steps to reproduce
set spark.master config to yarn
Specifications
Possible Solution
AS-IS
feast/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py
Lines 195 to 228 in a59c33a
TO-BE
use dill.dumps and dill.loads instead of dill.dump and dill.load
string types can be serialized and deserialized using pyspark seializer. So you don't need to pass the repo_config path.
this solution works for me.