[Data] Export dataset operator output schema to event logger#60086
[Data] Export dataset operator output schema to event logger#60086edoakes merged 1 commit intoray-project:masterfrom
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
2fc2b9a to
fa41ed2
Compare
b819b1a to
2452647
Compare
iamjustinhsu
left a comment
There was a problem hiding this comment.
@coqian thanks for doing this!!
Just some questions for my own understanding
- Is the
event_EXPORT_DATASET_OPERATOR_SCHEMA.logper operator or per dataset? - how do you detect changes in the file?
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
| @@ -0,0 +1,145 @@ | |||
| """Exporter API for Ray Data operator schema.""" | |||
There was a problem hiding this comment.
I feel like this file and dataset_state.py can be moved under a new parent directory, thoughts?
There was a problem hiding this comment.
oh also, i was thinking -- metadata_exporter.py exports operators too. Can you explain the purposes of this exporter vs. that one?
There was a problem hiding this comment.
metadata_exporter is mainly for the dataset and operator metadata and their relationship. After adding operator args and DataContext, the file can be too large to be exported reliably.
The operator_schema_exporter is specifically for the operator schema, as well as potential updates. It can also be large when there are many fields in the schema or it changes many times. And the operator schema is not related with dataset / operator state or other metadata, that's why I create a separate exporter for it.
| return LoggerOperatorSchemaExporter.create_if_enabled() | ||
|
|
||
|
|
||
| class OperatorSchemaExporter(ABC): |
There was a problem hiding this comment.
You probably know best, but wondering when we would need this because I can't forsee if we'll need multiple OperatorSchemaExporter
There was a problem hiding this comment.
This just follows previous design, such as metadata_exporter.py. Right now we only export to files, but we may have some future export methods, e.g. to a database, web_request, etc. link to previous PR FYI.
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
|
0b5296e to
d89646b
Compare
| return [json.loads(line) for line in data] | ||
|
|
||
|
|
||
| def test_export_operator_schema(): |
There was a problem hiding this comment.
up to u, but do u think it would be good to parameterize based on schema types? Here is an example of the ones we use for sanitization. Just wanted to cover all the cases, but no need to do that if u feel it's redundant
There was a problem hiding this comment.
This can introduce dependencies on how ToString() of different types are implemented in PyArrow. If it's updated in the future, it may break our test cases here. And different version of them can have different results: an example is the update of datetime64[s] dtype in pandas 2.0, it has different behaviors when we print it before and after 2.0, datetime64[ns] vs datetime64[s]. Our test cases mainly focus on if the schema can be exported to the log file instead.
Signed-off-by: cong.qian <[email protected]>
|
@edoakes for review & merge the proto as it seems that core team owns the proto code. Looks like the PR updates the old export event schema for data events. I think until core support the feature for library events, we will need to keep the export event schema. |
…ject#60086) ## Description In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability. If `DataContext.enforce_schemas` is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated. Example export event: ``` { "event_id": "83b3A80eAa283CFFBf", "timestamp": 1769111404, "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA", "event_data": { "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410", "schema_fields": { "int_field": "int64", "bool_field": "bool", "bytes_field": "binary", "string_field": "string", "date_field": "date32[day]", "datetime_field": "timestamp[us]", "numpy_int_field": "int32", "numpy_float_field": "double", "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)", "list_float_field": "list<item: double>", "list_list_field": "list<item: list<item: double>>", "nested_dict_field": "struct<a: struct<b: string>>", "none_field": "null", } } } ``` Signed-off-by: cong.qian <[email protected]>
…ject#60086) ## Description In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability. If `DataContext.enforce_schemas` is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated. Example export event: ``` { "event_id": "83b3A80eAa283CFFBf", "timestamp": 1769111404, "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA", "event_data": { "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410", "schema_fields": { "int_field": "int64", "bool_field": "bool", "bytes_field": "binary", "string_field": "string", "date_field": "date32[day]", "datetime_field": "timestamp[us]", "numpy_int_field": "int32", "numpy_float_field": "double", "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)", "list_float_field": "list<item: double>", "list_list_field": "list<item: list<item: double>>", "nested_dict_field": "struct<a: struct<b: string>>", "none_field": "null", } } } ``` Signed-off-by: cong.qian <[email protected]>
…ject#60086) ## Description In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability. If `DataContext.enforce_schemas` is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated. Example export event: ``` { "event_id": "83b3A80eAa283CFFBf", "timestamp": 1769111404, "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA", "event_data": { "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410", "schema_fields": { "int_field": "int64", "bool_field": "bool", "bytes_field": "binary", "string_field": "string", "date_field": "date32[day]", "datetime_field": "timestamp[us]", "numpy_int_field": "int32", "numpy_float_field": "double", "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)", "list_float_field": "list<item: double>", "list_list_field": "list<item: list<item: double>>", "nested_dict_field": "struct<a: struct<b: string>>", "none_field": "null", } } } ``` Signed-off-by: cong.qian <[email protected]> Signed-off-by: 400Ping <[email protected]>
…ject#60086) ## Description In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability. If `DataContext.enforce_schemas` is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated. Example export event: ``` { "event_id": "83b3A80eAa283CFFBf", "timestamp": 1769111404, "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA", "event_data": { "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410", "schema_fields": { "int_field": "int64", "bool_field": "bool", "bytes_field": "binary", "string_field": "string", "date_field": "date32[day]", "datetime_field": "timestamp[us]", "numpy_int_field": "int32", "numpy_float_field": "double", "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)", "list_float_field": "list<item: double>", "list_list_field": "list<item: list<item: double>>", "nested_dict_field": "struct<a: struct<b: string>>", "none_field": "null", } } } ``` Signed-off-by: cong.qian <[email protected]> Signed-off-by: Sirui Huang <[email protected]>
## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.
If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.
Example export event:
```
{
"event_id": "83b3A80eAa283CFFBf",
"timestamp": 1769111404,
"source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
"event_data": {
"operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
"schema_fields": {
"int_field": "int64",
"bool_field": "bool",
"bytes_field": "binary",
"string_field": "string",
"date_field": "date32[day]",
"datetime_field": "timestamp[us]",
"numpy_int_field": "int32",
"numpy_float_field": "double",
"numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
"list_float_field": "list<item: double>",
"list_list_field": "list<item: list<item: double>>",
"nested_dict_field": "struct<a: struct<b: string>>",
"none_field": "null",
}
}
}
```
Signed-off-by: cong.qian <[email protected]>
Signed-off-by: elliot-barn <[email protected]>
## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.
If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.
Example export event:
```
{
"event_id": "83b3A80eAa283CFFBf",
"timestamp": 1769111404,
"source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
"event_data": {
"operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
"schema_fields": {
"int_field": "int64",
"bool_field": "bool",
"bytes_field": "binary",
"string_field": "string",
"date_field": "date32[day]",
"datetime_field": "timestamp[us]",
"numpy_int_field": "int32",
"numpy_float_field": "double",
"numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
"list_float_field": "list<item: double>",
"list_list_field": "list<item: list<item: double>>",
"nested_dict_field": "struct<a: struct<b: string>>",
"none_field": "null",
}
}
}
```
Signed-off-by: cong.qian <[email protected]>
…ject#60086) ## Description In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability. If `DataContext.enforce_schemas` is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated. Example export event: ``` { "event_id": "83b3A80eAa283CFFBf", "timestamp": 1769111404, "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA", "event_data": { "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410", "schema_fields": { "int_field": "int64", "bool_field": "bool", "bytes_field": "binary", "string_field": "string", "date_field": "date32[day]", "datetime_field": "timestamp[us]", "numpy_int_field": "int32", "numpy_float_field": "double", "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)", "list_float_field": "list<item: double>", "list_list_field": "list<item: list<item: double>>", "nested_dict_field": "struct<a: struct<b: string>>", "none_field": "null", } } } ``` Signed-off-by: cong.qian <[email protected]> Signed-off-by: Adel Nour <[email protected]>
…ject#60086) ## Description In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability. If `DataContext.enforce_schemas` is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated. Example export event: ``` { "event_id": "83b3A80eAa283CFFBf", "timestamp": 1769111404, "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA", "event_data": { "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410", "schema_fields": { "int_field": "int64", "bool_field": "bool", "bytes_field": "binary", "string_field": "string", "date_field": "date32[day]", "datetime_field": "timestamp[us]", "numpy_int_field": "int32", "numpy_float_field": "double", "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)", "list_float_field": "list<item: double>", "list_list_field": "list<item: list<item: double>>", "nested_dict_field": "struct<a: struct<b: string>>", "none_field": "null", } } } ``` Signed-off-by: cong.qian <[email protected]> Signed-off-by: peterxcli <[email protected]>
…ject#60086) ## Description In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability. If `DataContext.enforce_schemas` is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated. Example export event: ``` { "event_id": "83b3A80eAa283CFFBf", "timestamp": 1769111404, "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA", "event_data": { "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410", "schema_fields": { "int_field": "int64", "bool_field": "bool", "bytes_field": "binary", "string_field": "string", "date_field": "date32[day]", "datetime_field": "timestamp[us]", "numpy_int_field": "int32", "numpy_float_field": "double", "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)", "list_float_field": "list<item: double>", "list_list_field": "list<item: list<item: double>>", "nested_dict_field": "struct<a: struct<b: string>>", "none_field": "null", } } } ``` Signed-off-by: cong.qian <[email protected]> Signed-off-by: peterxcli <[email protected]>
Description
In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability.
If
DataContext.enforce_schemasis set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated.Example export event: