Skip to content

[Data] Export dataset operator output schema to event logger#60086

Merged
edoakes merged 1 commit intoray-project:masterfrom
coqian:coqian/schema
Jan 30, 2026
Merged

[Data] Export dataset operator output schema to event logger#60086
edoakes merged 1 commit intoray-project:masterfrom
coqian:coqian/schema

Conversation

@coqian
Copy link
Contributor

@coqian coqian commented Jan 13, 2026

Description

In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability.

If DataContext.enforce_schemas is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated.

Example export event:

{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}

@gemini-code-assist
Copy link
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@coqian coqian force-pushed the coqian/schema branch 3 times, most recently from 2fc2b9a to fa41ed2 Compare January 15, 2026 08:02
@coqian coqian marked this pull request as ready for review January 27, 2026 08:20
@coqian coqian requested review from a team as code owners January 27, 2026 08:20
@coqian coqian changed the title [WIP][Data] Export dataset operator output schema to event logger [Data] Export dataset operator output schema to event logger Jan 27, 2026
@ray-gardener ray-gardener bot added data Ray Data-related issues observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Jan 27, 2026
@coqian coqian force-pushed the coqian/schema branch 2 times, most recently from b819b1a to 2452647 Compare January 27, 2026 16:50
Copy link
Contributor

@iamjustinhsu iamjustinhsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coqian thanks for doing this!!

Just some questions for my own understanding

  • Is the event_EXPORT_DATASET_OPERATOR_SCHEMA.log per operator or per dataset?
  • how do you detect changes in the file?

@@ -0,0 +1,145 @@
"""Exporter API for Ray Data operator schema."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this file and dataset_state.py can be moved under a new parent directory, thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh also, i was thinking -- metadata_exporter.py exports operators too. Can you explain the purposes of this exporter vs. that one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata_exporter is mainly for the dataset and operator metadata and their relationship. After adding operator args and DataContext, the file can be too large to be exported reliably.
The operator_schema_exporter is specifically for the operator schema, as well as potential updates. It can also be large when there are many fields in the schema or it changes many times. And the operator schema is not related with dataset / operator state or other metadata, that's why I create a separate exporter for it.

return LoggerOperatorSchemaExporter.create_if_enabled()


class OperatorSchemaExporter(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably know best, but wondering when we would need this because I can't forsee if we'll need multiple OperatorSchemaExporter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just follows previous design, such as metadata_exporter.py. Right now we only export to files, but we may have some future export methods, e.g. to a database, web_request, etc. link to previous PR FYI.

@coqian
Copy link
Contributor Author

coqian commented Jan 28, 2026

@coqian thanks for doing this!!

Just some questions for my own understanding

  • Is the event_EXPORT_DATASET_OPERATOR_SCHEMA.log per operator or per dataset?
  • how do you detect changes in the file?
  • For dataset and operator in the same session, their schema will be written into the same file
  • We can leverage some other tools to detect file changes, such as vector

@coqian coqian force-pushed the coqian/schema branch 3 times, most recently from 0b5296e to d89646b Compare January 29, 2026 22:51
Copy link
Contributor

@iamjustinhsu iamjustinhsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

return [json.loads(line) for line in data]


def test_export_operator_schema():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to u, but do u think it would be good to parameterize based on schema types? Here is an example of the ones we use for sanitization. Just wanted to cover all the cases, but no need to do that if u feel it's redundant

Copy link
Contributor Author

@coqian coqian Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can introduce dependencies on how ToString() of different types are implemented in PyArrow. If it's updated in the future, it may break our test cases here. And different version of them can have different results: an example is the update of datetime64[s] dtype in pandas 2.0, it has different behaviors when we print it before and after 2.0, datetime64[ns] vs datetime64[s]. Our test cases mainly focus on if the schema can be exported to the log file instead.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

@MengjinYan
Copy link
Contributor

@edoakes for review & merge the proto as it seems that core team owns the proto code.

Looks like the PR updates the old export event schema for data events. I think until core support the feature for library events, we will need to keep the export event schema.

@edoakes edoakes added the go add ONLY when ready to merge, run all tests label Jan 30, 2026
@edoakes edoakes enabled auto-merge (squash) January 30, 2026 20:21
@edoakes edoakes merged commit 216182b into ray-project:master Jan 30, 2026
8 checks passed
liulehui pushed a commit to liulehui/ray that referenced this pull request Jan 31, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
simonsays1980 pushed a commit to simonsays1980/ray that referenced this pull request Jan 31, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
400Ping pushed a commit to 400Ping/ray that referenced this pull request Feb 1, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
Signed-off-by: 400Ping <[email protected]>
rayhhome pushed a commit to rayhhome/ray that referenced this pull request Feb 4, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
Signed-off-by: Sirui Huang <[email protected]>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
Signed-off-by: elliot-barn <[email protected]>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
Signed-off-by: Adel Nour <[email protected]>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
Signed-off-by: peterxcli <[email protected]>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <[email protected]>
Signed-off-by: peterxcli <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants