Skip to content

[Core][Serve] Fix tracing signature mismatch when calling actors from different processes#59634

Merged
abrarsheikh merged 10 commits intomasterfrom
57803-abrar-trace
Jan 10, 2026
Merged

[Core][Serve] Fix tracing signature mismatch when calling actors from different processes#59634
abrarsheikh merged 10 commits intomasterfrom
57803-abrar-trace

Conversation

@abrarsheikh
Copy link
Contributor

@abrarsheikh abrarsheikh commented Dec 23, 2025

Problem:

fixes #57803

When tracing is enabled, calling an actor method from a different process than the one that created the actor fails with:

TypeError: got an unexpected keyword argument '_ray_trace_ctx'

This commonly occurs with Ray Serve, where:

  • serve start creates the controller actor (process A)
  • Dashboard calls ray.get_actor() to interact with it (process B)

Repo

Simplest way to repro is to run the following

ray start --head --tracing-startup-hook="ray.util.tracing.setup_local_tmp_tracing:setup_tracing"                                                                                                        ✔ │ ray_310 Py │ with ubuntu@devbox │ at 06:22:12
serve start

But here is a core specific repro script

repro_actor_module.py

class MyActor:
    """A simple actor class that will be decorated dynamically."""
    
    def __init__(self):
        self.value = 0
    
    def my_method(self, x):
        """A simple method."""
        return x * 2
    
    def check_alive(self):
        """Health check method."""
        return True
    
    def increment(self, amount=1):
        """Method with a default parameter."""
        self.value += amount
        return self.value

repro_tracing_issue.py

import multiprocessing
import subprocess
import sys


NAMESPACE = "test_ns"


def creator_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled
    
    # Import the actor class from module (NOT decorated yet)
    from repro_actor_module import MyActor
    
    setup_tracing_path = "ray.util.tracing.setup_local_tmp_tracing:setup_tracing"
    ray.init(_tracing_startup_hook=setup_tracing_path, namespace=NAMESPACE)
    
    print(f"[CREATOR] Tracing enabled: {_is_tracing_enabled()}")
    
    # Dynamically decorate and create the test actor (like Serve does)
    MyActorRemote = ray.remote(
        name="my_test_actor",
        namespace=NAMESPACE,
        num_cpus=0,
        lifetime="detached",
    )(MyActor)
    
    actor = MyActorRemote.remote()
    
    # Print signatures from creator's handle
    print(f"[CREATOR] Signatures in handle from creation:")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")
    
    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[CREATOR] my_method has _ray_trace_ctx: {has_trace}")
    
    # Verify the method works from creator
    result = ray.get(actor.my_method.remote(5))
    print(f"[CREATOR] Test call result: {result}")
    
    # Signal that actor is ready
    print("[CREATOR] Actor created, signaling getter...")
    sys.stdout.flush()
    ready_event.set()
    
    # Wait for getter to finish
    done_event.wait(timeout=30)
    print("[CREATOR] Getter finished, shutting down...")
    
    # Cleanup
    ray.kill(actor)
    ray.shutdown()


def getter_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled
    
    # Wait for creator to signal ready
    print("[GETTER] Waiting for creator to set up actor...")
    if not ready_event.wait(timeout=30):
        print("[GETTER] Timeout waiting for creator!")
        done_event.set()
        return
    
    # Connect to the existing cluster (this will also enable tracing from GCS hook)
    ray.init(address="auto", namespace=NAMESPACE)
    
    print(f"\n[GETTER] Tracing enabled: {_is_tracing_enabled()}")
    
    # Get the actor by name - this will RELOAD the class fresh in this process
    # The class loaded here was NEVER processed by _inject_tracing_into_class
    actor = ray.get_actor("my_test_actor", namespace=NAMESPACE)
    
    # Print signatures from getter's handle
    print(f"[GETTER] Signatures in handle from get_actor():")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")
    
    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[GETTER] my_method has _ray_trace_ctx: {has_trace}")
    
    # Try calling a method
    print(f"\n[GETTER] Attempting to call my_method.remote(5)...")
    sys.stdout.flush()
    try:
        result = ray.get(actor.my_method.remote(5))
        print(f"[GETTER] Method call SUCCEEDED! Result: {result}")
    except TypeError as e:
        print(f"[GETTER] Method call FAILED with TypeError: {e}")
    
    # Signal done
    done_event.set()
    ray.shutdown()


def main():
    # Stop any existing Ray cluster
    print("Stopping any existing Ray cluster...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    
    # Create synchronization events
    ready_event = multiprocessing.Event()
    done_event = multiprocessing.Event()
    
    # Start creator process
    creator = multiprocessing.Process(target=creator_process, args=(ready_event, done_event))
    creator.start()
    
    # Start getter process (will connect to existing cluster)
    getter = multiprocessing.Process(target=getter_process, args=(ready_event, done_event))
    getter.start()
    
    # Wait for both to complete
    getter.join(timeout=60)
    creator.join(timeout=10)
    
    # Clean up any hung processes
    if creator.is_alive():
        creator.terminate()
        creator.join(timeout=5)
    if getter.is_alive():
        getter.terminate()
        getter.join(timeout=5)
    
    # Cleanup Ray
    print("\nCleaning up...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    print("Done.")


if __name__ == "__main__":
    main()
output from master
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:05:02,215 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:05:02,953 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:38871...
2025-12-24 07:05:02,984 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: []
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: []
  increment: ['amount']
  my_method: ['x']
[GETTER] my_method has _ray_trace_ctx: False

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call FAILED with TypeError: got an unexpected keyword argument '_ray_trace_ctx'
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
output from this PR
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:04:03,758 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:04:04,476 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:37231...
2025-12-24 07:04:04,504 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[GETTER] my_method has _ray_trace_ctx: True

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call SUCCEEDED! Result: 10
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.

Root Cause:

_inject_tracing_into_class sets __signature__ (including _ray_trace_ctx) on the method object during actor creation. However:

  1. When the actor class is serialized (cloudpickle) and loaded in another process, __signature__ is not preserved on module-level functions. See repro script at the end of PR description as proof
  2. _ActorClassMethodMetadata.create() uses inspect.unwrap() which follows the __wrapped__ chain to the deeply unwrapped original method
  3. The original method's __signature__ was lost during serialization → signatures extracted without _ray_trace_ctx
  4. When calling the method, _tracing_actor_method_invocation adds _ray_trace_ctx to kwargs → signature validation fails

Fix:

  1. In _inject_tracing_into_class: Set __signature__ on the deeply unwrapped method (via inspect.unwrap) rather than the immediate method. This ensures _ActorClassMethodMetadata.create() finds it after unwrapping.

  2. In load_actor_class: Call _inject_tracing_into_class after loading to re-inject the lost __signature__ attributes.

Testing:

  • Added reproduction script demonstrating cross-process actor method calls with tracing
  • All existing tracing tests pass
  • Add a new test for serve with tracing

repro_cloudpickle_signature.py

import inspect
import cloudpickle
import pickle
import multiprocessing


def check_signature_in_subprocess(pickled_func_bytes):
    func = pickle.loads(pickled_func_bytes)
    
    print(f"[SUBPROCESS] Unpickled function: {func}")
    print(f"[SUBPROCESS] Module: {func.__module__}")
    
    sig = getattr(func, '__signature__', None)
    if sig is not None:
        params = list(sig.parameters.keys())
        print(f"[SUBPROCESS] __signature__: {sig}")
        if '_ray_trace_ctx' in params:
            print(f"[SUBPROCESS] __signature__ WAS preserved")
            return True
        else:
            print(f"[SUBPROCESS] __signature__ NOT preserved (missing _ray_trace_ctx)")
            return False
    else:
        print(f"[SUBPROCESS] __signature__ NOT preserved (attribute missing)")
        return False


def main():
    from repro_actor_module import MyActor
    func = MyActor.my_method
    
    print(f"\n[MAIN] Function: {func}")
    print(f"[MAIN] Module: {func.__module__}")
    print(f"[MAIN] __signature__ before: {getattr(func, '__signature__', 'NOT SET')}")
    
    # Set a custom __signature__ with _ray_trace_ctx
    custom_sig = inspect.signature(func)
    new_params = list(custom_sig.parameters.values()) + [
        inspect.Parameter("_ray_trace_ctx", inspect.Parameter.KEYWORD_ONLY, default=None)
    ]
    func.__signature__ = custom_sig.replace(parameters=new_params)
    
    print(f"[MAIN] __signature__ after: {func.__signature__}")
    print(f"[MAIN] Parameters: {list(func.__signature__.parameters.keys())}")
    
    # Pickle
    print(f"\n[MAIN] Pickling with cloudpickle...")
    pickled = cloudpickle.dumps(func)
    
    # Test 1: Same process
    print(f"\n{'='*70}")
    print("TEST 1: Unpickle in SAME process")
    print(f"{'='*70}")
    same_func = pickle.loads(pickled)
    same_sig = getattr(same_func, '__signature__', None)
    if same_sig and '_ray_trace_ctx' in list(same_sig.parameters.keys()):
        print(f"Same process: __signature__ preserved")
    else:
        print(f"Same process: __signature__ NOT preserved")
    
    # Test 2: Different process
    print(f"\n{'='*70}")
    print("TEST 2: Unpickle in DIFFERENT process")
    print(f"{'='*70}")
    
    ctx = multiprocessing.get_context('spawn')
    with ctx.Pool(1) as pool:
        result = pool.apply(check_signature_in_subprocess, (pickled,))
    
    if result:
        print("__signature__ IS preserved (unexpected)")
    else:
        print("__signature__ is NOT preserved for functions from imported modules!")


if __name__ == "__main__":
    main()

@abrarsheikh abrarsheikh added the go add ONLY when ready to merge, run all tests label Dec 23, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a workaround to make Ray tracing compatible with Ray Serve, addressing a TypeError related to _ray_trace_ctx. The core of the solution is to dynamically add _ray_trace_ctx to actor method signatures when tracing is enabled. The changes are well-contained and include corresponding adjustments in Ray Serve to correctly handle and expose the tracing context. The addition of a dedicated test file with a comprehensive test case is a great way to ensure the fix is working as expected.

My review focuses on improving code clarity and type safety. I've suggested a minor refinement in ray/actor.py for more idiomatic list manipulation and proposed tightening the type hints for the tracing context in ray/serve/_private/replica.py for better consistency and maintainability.

I am having trouble creating individual review comments. Click here to see my feedback.

python/ray/actor.py (974-978)

medium

For better readability and robustness, it's more idiomatic to use a slice for the remainder of the list, even if it currently contains only one element. This also makes the code more resilient to future changes.

        method_signature = (
            method_signature[:var_keyword_idx]
            + [new_param]
            + method_signature[var_keyword_idx:]
        )

python/ray/serve/_private/replica.py (817)

medium

For better type safety and consistency, the return type for ray_trace_ctx should be more specific. In serve.context, it's defined as Optional[dict]. Using Optional[dict] here would improve clarity and allow static analysis tools to catch potential type errors.

    ) -> Tuple[Tuple[Any], Dict[str, Any], Optional[dict]]:

python/ray/serve/_private/replica.py (1196)

medium

To maintain type consistency with _RequestContext and the suggested change in _unpack_proxy_args, it's better to use Optional[dict] for ray_trace_ctx instead of Optional[Any]. This improves type safety and code clarity.

        self, request_metadata: RequestMetadata, ray_trace_ctx: Optional[dict] = None

@abrarsheikh abrarsheikh changed the title [Core][Serve] make trace_ctx work with serve [Core][Serve] Fix tracing signature mismatch when calling actors from different processes Dec 24, 2025
@abrarsheikh abrarsheikh marked this pull request as ready for review December 24, 2025 18:30
@abrarsheikh abrarsheikh requested review from a team as code owners December 24, 2025 18:30
@ray-gardener ray-gardener bot added serve Ray Serve Related Issue core Issues that should be addressed in Ray Core labels Dec 24, 2025
Copy link
Contributor

@ZacAttack ZacAttack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks for the deep dive!

@ZacAttack
Copy link
Contributor

At least the core test failure you're hitting:
[2025-12-24T08:21:33Z] > "/": psutil._common.sdiskusage(total=1, used=0, free=1, percent=0.0)

[2025-12-24T08:21:33Z] }
[2025-12-24T08:21:33Z] E AttributeError: module 'psutil._common' has no attribute 'sdiskusage'

I know happens when you end up with a newer version of psutil (I have another PR for making this version agnostic) but I'm not sure why you'd be hitting it here.....

@abrarsheikh
Copy link
Contributor Author

At least the core test failure you're hitting:

[2025-12-24T08:21:33Z] > "/": psutil._common.sdiskusage(total=1, used=0, free=1, percent=0.0)
[2025-12-24T08:21:33Z] } [2025-12-24T08:21:33Z] E AttributeError: module 'psutil._common' has no attribute 'sdiskusage'

I know happens when you end up with a newer version of psutil (I have another PR for making this version agnostic) but I'm not sure why you'd be hitting it here.....

cc @elliot-barn

@abrarsheikh abrarsheikh merged commit ac15dba into master Jan 10, 2026
6 checks passed
@abrarsheikh abrarsheikh deleted the 57803-abrar-trace branch January 10, 2026 01:36
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
… different processes (ray-project#59634)

**Problem:**

fixes ray-project#57803

When tracing is enabled, calling an actor method from a different
process than the one that created the actor fails with:
```
TypeError: got an unexpected keyword argument '_ray_trace_ctx'
```

This commonly occurs with Ray Serve, where:
- `serve start` creates the controller actor (process A)
- Dashboard calls `ray.get_actor()` to interact with it (process B)

## Repo
Simplest way to repro is to run the following
```bash
ray start --head --tracing-startup-hook="ray.util.tracing.setup_local_tmp_tracing:setup_tracing"                                                                                                        ✔ │ ray_310 Py │ with ubuntu@devbox │ at 06:22:12
serve start
```

But here is a core specific repro script

`repro_actor_module.py`
```python
class MyActor:
    """A simple actor class that will be decorated dynamically."""

    def __init__(self):
        self.value = 0

    def my_method(self, x):
        """A simple method."""
        return x * 2

    def check_alive(self):
        """Health check method."""
        return True

    def increment(self, amount=1):
        """Method with a default parameter."""
        self.value += amount
        return self.value
```

`repro_tracing_issue.py`
```python
import multiprocessing
import subprocess
import sys

NAMESPACE = "test_ns"

def creator_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled

    # Import the actor class from module (NOT decorated yet)
    from repro_actor_module import MyActor

    setup_tracing_path = "ray.util.tracing.setup_local_tmp_tracing:setup_tracing"
    ray.init(_tracing_startup_hook=setup_tracing_path, namespace=NAMESPACE)

    print(f"[CREATOR] Tracing enabled: {_is_tracing_enabled()}")

    # Dynamically decorate and create the test actor (like Serve does)
    MyActorRemote = ray.remote(
        name="my_test_actor",
        namespace=NAMESPACE,
        num_cpus=0,
        lifetime="detached",
    )(MyActor)

    actor = MyActorRemote.remote()

    # Print signatures from creator's handle
    print(f"[CREATOR] Signatures in handle from creation:")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")

    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[CREATOR] my_method has _ray_trace_ctx: {has_trace}")

    # Verify the method works from creator
    result = ray.get(actor.my_method.remote(5))
    print(f"[CREATOR] Test call result: {result}")

    # Signal that actor is ready
    print("[CREATOR] Actor created, signaling getter...")
    sys.stdout.flush()
    ready_event.set()

    # Wait for getter to finish
    done_event.wait(timeout=30)
    print("[CREATOR] Getter finished, shutting down...")

    # Cleanup
    ray.kill(actor)
    ray.shutdown()

def getter_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled

    # Wait for creator to signal ready
    print("[GETTER] Waiting for creator to set up actor...")
    if not ready_event.wait(timeout=30):
        print("[GETTER] Timeout waiting for creator!")
        done_event.set()
        return

    # Connect to the existing cluster (this will also enable tracing from GCS hook)
    ray.init(address="auto", namespace=NAMESPACE)

    print(f"\n[GETTER] Tracing enabled: {_is_tracing_enabled()}")

    # Get the actor by name - this will RELOAD the class fresh in this process
    # The class loaded here was NEVER processed by _inject_tracing_into_class
    actor = ray.get_actor("my_test_actor", namespace=NAMESPACE)

    # Print signatures from getter's handle
    print(f"[GETTER] Signatures in handle from get_actor():")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")

    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[GETTER] my_method has _ray_trace_ctx: {has_trace}")

    # Try calling a method
    print(f"\n[GETTER] Attempting to call my_method.remote(5)...")
    sys.stdout.flush()
    try:
        result = ray.get(actor.my_method.remote(5))
        print(f"[GETTER] Method call SUCCEEDED! Result: {result}")
    except TypeError as e:
        print(f"[GETTER] Method call FAILED with TypeError: {e}")

    # Signal done
    done_event.set()
    ray.shutdown()

def main():
    # Stop any existing Ray cluster
    print("Stopping any existing Ray cluster...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)

    # Create synchronization events
    ready_event = multiprocessing.Event()
    done_event = multiprocessing.Event()

    # Start creator process
    creator = multiprocessing.Process(target=creator_process, args=(ready_event, done_event))
    creator.start()

    # Start getter process (will connect to existing cluster)
    getter = multiprocessing.Process(target=getter_process, args=(ready_event, done_event))
    getter.start()

    # Wait for both to complete
    getter.join(timeout=60)
    creator.join(timeout=10)

    # Clean up any hung processes
    if creator.is_alive():
        creator.terminate()
        creator.join(timeout=5)
    if getter.is_alive():
        getter.terminate()
        getter.join(timeout=5)

    # Cleanup Ray
    print("\nCleaning up...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    print("Done.")

if __name__ == "__main__":
    main()
```

<details>

<summary> output from master </summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:05:02,215 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:05:02,953 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:38871...
2025-12-24 07:05:02,984 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: []
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: []
  increment: ['amount']
  my_method: ['x']
[GETTER] my_method has _ray_trace_ctx: False

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call FAILED with TypeError: got an unexpected keyword argument '_ray_trace_ctx'
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

<details>

<summary>output from this PR</summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:04:03,758 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:04:04,476 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:37231...
2025-12-24 07:04:04,504 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[GETTER] my_method has _ray_trace_ctx: True

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call SUCCEEDED! Result: 10
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

**Root Cause:**

`_inject_tracing_into_class` sets `__signature__` (including
`_ray_trace_ctx`) on the method object during actor creation. However:

1. When the actor class is serialized (cloudpickle) and loaded in
another process, `__signature__` is **not preserved** on module-level
functions. See repro script at the end of PR description as proof
2. `_ActorClassMethodMetadata.create()` uses `inspect.unwrap()` which
follows the `__wrapped__` chain to the **deeply unwrapped original
method**
3. The original method's `__signature__` was lost during serialization →
signatures extracted **without** `_ray_trace_ctx`
4. When calling the method, `_tracing_actor_method_invocation` adds
`_ray_trace_ctx` to kwargs → **signature validation fails**

**Fix:**

1. In `_inject_tracing_into_class`: Set `__signature__` on the **deeply
unwrapped** method (via `inspect.unwrap`) rather than the immediate
method. This ensures `_ActorClassMethodMetadata.create()` finds it after
unwrapping.

2. In `load_actor_class`: Call `_inject_tracing_into_class` after
loading to re-inject the lost `__signature__` attributes.

**Testing:**
- Added reproduction script demonstrating cross-process actor method
calls with tracing
- All existing tracing tests pass
- Add a new test for serve with tracing

`repro_cloudpickle_signature.py`
```python
import inspect
import cloudpickle
import pickle
import multiprocessing

def check_signature_in_subprocess(pickled_func_bytes):
    func = pickle.loads(pickled_func_bytes)

    print(f"[SUBPROCESS] Unpickled function: {func}")
    print(f"[SUBPROCESS] Module: {func.__module__}")

    sig = getattr(func, '__signature__', None)
    if sig is not None:
        params = list(sig.parameters.keys())
        print(f"[SUBPROCESS] __signature__: {sig}")
        if '_ray_trace_ctx' in params:
            print(f"[SUBPROCESS] __signature__ WAS preserved")
            return True
        else:
            print(f"[SUBPROCESS] __signature__ NOT preserved (missing _ray_trace_ctx)")
            return False
    else:
        print(f"[SUBPROCESS] __signature__ NOT preserved (attribute missing)")
        return False

def main():
    from repro_actor_module import MyActor
    func = MyActor.my_method

    print(f"\n[MAIN] Function: {func}")
    print(f"[MAIN] Module: {func.__module__}")
    print(f"[MAIN] __signature__ before: {getattr(func, '__signature__', 'NOT SET')}")

    # Set a custom __signature__ with _ray_trace_ctx
    custom_sig = inspect.signature(func)
    new_params = list(custom_sig.parameters.values()) + [
        inspect.Parameter("_ray_trace_ctx", inspect.Parameter.KEYWORD_ONLY, default=None)
    ]
    func.__signature__ = custom_sig.replace(parameters=new_params)

    print(f"[MAIN] __signature__ after: {func.__signature__}")
    print(f"[MAIN] Parameters: {list(func.__signature__.parameters.keys())}")

    # Pickle
    print(f"\n[MAIN] Pickling with cloudpickle...")
    pickled = cloudpickle.dumps(func)

    # Test 1: Same process
    print(f"\n{'='*70}")
    print("TEST 1: Unpickle in SAME process")
    print(f"{'='*70}")
    same_func = pickle.loads(pickled)
    same_sig = getattr(same_func, '__signature__', None)
    if same_sig and '_ray_trace_ctx' in list(same_sig.parameters.keys()):
        print(f"Same process: __signature__ preserved")
    else:
        print(f"Same process: __signature__ NOT preserved")

    # Test 2: Different process
    print(f"\n{'='*70}")
    print("TEST 2: Unpickle in DIFFERENT process")
    print(f"{'='*70}")

    ctx = multiprocessing.get_context('spawn')
    with ctx.Pool(1) as pool:
        result = pool.apply(check_signature_in_subprocess, (pickled,))

    if result:
        print("__signature__ IS preserved (unexpected)")
    else:
        print("__signature__ is NOT preserved for functions from imported modules!")

if __name__ == "__main__":
    main()

```

---------

Signed-off-by: abrar <[email protected]>
Signed-off-by: jasonwrwang <[email protected]>
# uses inspect.unwrap which goes all the way to the original method.
unwrapped_method = inspect.unwrap(method)

# Add _ray_trace_ctx to the UNWRAPPED method's signature.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image

lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
… different processes (ray-project#59634)

**Problem:**

fixes ray-project#57803

When tracing is enabled, calling an actor method from a different
process than the one that created the actor fails with:
```
TypeError: got an unexpected keyword argument '_ray_trace_ctx'
```

This commonly occurs with Ray Serve, where:
- `serve start` creates the controller actor (process A)
- Dashboard calls `ray.get_actor()` to interact with it (process B)

## Repo
Simplest way to repro is to run the following
```bash
ray start --head --tracing-startup-hook="ray.util.tracing.setup_local_tmp_tracing:setup_tracing"                                                                                                        ✔ │ ray_310 Py │ with ubuntu@devbox │ at 06:22:12
serve start
```

But here is a core specific repro script

`repro_actor_module.py`
```python
class MyActor:
    """A simple actor class that will be decorated dynamically."""
    
    def __init__(self):
        self.value = 0
    
    def my_method(self, x):
        """A simple method."""
        return x * 2
    
    def check_alive(self):
        """Health check method."""
        return True
    
    def increment(self, amount=1):
        """Method with a default parameter."""
        self.value += amount
        return self.value
```

`repro_tracing_issue.py`
```python
import multiprocessing
import subprocess
import sys


NAMESPACE = "test_ns"


def creator_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled
    
    # Import the actor class from module (NOT decorated yet)
    from repro_actor_module import MyActor
    
    setup_tracing_path = "ray.util.tracing.setup_local_tmp_tracing:setup_tracing"
    ray.init(_tracing_startup_hook=setup_tracing_path, namespace=NAMESPACE)
    
    print(f"[CREATOR] Tracing enabled: {_is_tracing_enabled()}")
    
    # Dynamically decorate and create the test actor (like Serve does)
    MyActorRemote = ray.remote(
        name="my_test_actor",
        namespace=NAMESPACE,
        num_cpus=0,
        lifetime="detached",
    )(MyActor)
    
    actor = MyActorRemote.remote()
    
    # Print signatures from creator's handle
    print(f"[CREATOR] Signatures in handle from creation:")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")
    
    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[CREATOR] my_method has _ray_trace_ctx: {has_trace}")
    
    # Verify the method works from creator
    result = ray.get(actor.my_method.remote(5))
    print(f"[CREATOR] Test call result: {result}")
    
    # Signal that actor is ready
    print("[CREATOR] Actor created, signaling getter...")
    sys.stdout.flush()
    ready_event.set()
    
    # Wait for getter to finish
    done_event.wait(timeout=30)
    print("[CREATOR] Getter finished, shutting down...")
    
    # Cleanup
    ray.kill(actor)
    ray.shutdown()


def getter_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled
    
    # Wait for creator to signal ready
    print("[GETTER] Waiting for creator to set up actor...")
    if not ready_event.wait(timeout=30):
        print("[GETTER] Timeout waiting for creator!")
        done_event.set()
        return
    
    # Connect to the existing cluster (this will also enable tracing from GCS hook)
    ray.init(address="auto", namespace=NAMESPACE)
    
    print(f"\n[GETTER] Tracing enabled: {_is_tracing_enabled()}")
    
    # Get the actor by name - this will RELOAD the class fresh in this process
    # The class loaded here was NEVER processed by _inject_tracing_into_class
    actor = ray.get_actor("my_test_actor", namespace=NAMESPACE)
    
    # Print signatures from getter's handle
    print(f"[GETTER] Signatures in handle from get_actor():")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")
    
    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[GETTER] my_method has _ray_trace_ctx: {has_trace}")
    
    # Try calling a method
    print(f"\n[GETTER] Attempting to call my_method.remote(5)...")
    sys.stdout.flush()
    try:
        result = ray.get(actor.my_method.remote(5))
        print(f"[GETTER] Method call SUCCEEDED! Result: {result}")
    except TypeError as e:
        print(f"[GETTER] Method call FAILED with TypeError: {e}")
    
    # Signal done
    done_event.set()
    ray.shutdown()


def main():
    # Stop any existing Ray cluster
    print("Stopping any existing Ray cluster...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    
    # Create synchronization events
    ready_event = multiprocessing.Event()
    done_event = multiprocessing.Event()
    
    # Start creator process
    creator = multiprocessing.Process(target=creator_process, args=(ready_event, done_event))
    creator.start()
    
    # Start getter process (will connect to existing cluster)
    getter = multiprocessing.Process(target=getter_process, args=(ready_event, done_event))
    getter.start()
    
    # Wait for both to complete
    getter.join(timeout=60)
    creator.join(timeout=10)
    
    # Clean up any hung processes
    if creator.is_alive():
        creator.terminate()
        creator.join(timeout=5)
    if getter.is_alive():
        getter.terminate()
        getter.join(timeout=5)
    
    # Cleanup Ray
    print("\nCleaning up...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    print("Done.")


if __name__ == "__main__":
    main()
```

<details>

<summary> output from master </summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:05:02,215 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:05:02,953 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:38871...
2025-12-24 07:05:02,984 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: []
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: []
  increment: ['amount']
  my_method: ['x']
[GETTER] my_method has _ray_trace_ctx: False

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call FAILED with TypeError: got an unexpected keyword argument '_ray_trace_ctx'
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

<details>

<summary>output from this PR</summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:04:03,758 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:04:04,476 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:37231...
2025-12-24 07:04:04,504 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[GETTER] my_method has _ray_trace_ctx: True

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call SUCCEEDED! Result: 10
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

**Root Cause:**

`_inject_tracing_into_class` sets `__signature__` (including
`_ray_trace_ctx`) on the method object during actor creation. However:

1. When the actor class is serialized (cloudpickle) and loaded in
another process, `__signature__` is **not preserved** on module-level
functions. See repro script at the end of PR description as proof
2. `_ActorClassMethodMetadata.create()` uses `inspect.unwrap()` which
follows the `__wrapped__` chain to the **deeply unwrapped original
method**
3. The original method's `__signature__` was lost during serialization →
signatures extracted **without** `_ray_trace_ctx`
4. When calling the method, `_tracing_actor_method_invocation` adds
`_ray_trace_ctx` to kwargs → **signature validation fails**

**Fix:**

1. In `_inject_tracing_into_class`: Set `__signature__` on the **deeply
unwrapped** method (via `inspect.unwrap`) rather than the immediate
method. This ensures `_ActorClassMethodMetadata.create()` finds it after
unwrapping.

2. In `load_actor_class`: Call `_inject_tracing_into_class` after
loading to re-inject the lost `__signature__` attributes.

**Testing:**
- Added reproduction script demonstrating cross-process actor method
calls with tracing
- All existing tracing tests pass
- Add a new test for serve with tracing

`repro_cloudpickle_signature.py`
```python
import inspect
import cloudpickle
import pickle
import multiprocessing


def check_signature_in_subprocess(pickled_func_bytes):
    func = pickle.loads(pickled_func_bytes)
    
    print(f"[SUBPROCESS] Unpickled function: {func}")
    print(f"[SUBPROCESS] Module: {func.__module__}")
    
    sig = getattr(func, '__signature__', None)
    if sig is not None:
        params = list(sig.parameters.keys())
        print(f"[SUBPROCESS] __signature__: {sig}")
        if '_ray_trace_ctx' in params:
            print(f"[SUBPROCESS] __signature__ WAS preserved")
            return True
        else:
            print(f"[SUBPROCESS] __signature__ NOT preserved (missing _ray_trace_ctx)")
            return False
    else:
        print(f"[SUBPROCESS] __signature__ NOT preserved (attribute missing)")
        return False


def main():
    from repro_actor_module import MyActor
    func = MyActor.my_method
    
    print(f"\n[MAIN] Function: {func}")
    print(f"[MAIN] Module: {func.__module__}")
    print(f"[MAIN] __signature__ before: {getattr(func, '__signature__', 'NOT SET')}")
    
    # Set a custom __signature__ with _ray_trace_ctx
    custom_sig = inspect.signature(func)
    new_params = list(custom_sig.parameters.values()) + [
        inspect.Parameter("_ray_trace_ctx", inspect.Parameter.KEYWORD_ONLY, default=None)
    ]
    func.__signature__ = custom_sig.replace(parameters=new_params)
    
    print(f"[MAIN] __signature__ after: {func.__signature__}")
    print(f"[MAIN] Parameters: {list(func.__signature__.parameters.keys())}")
    
    # Pickle
    print(f"\n[MAIN] Pickling with cloudpickle...")
    pickled = cloudpickle.dumps(func)
    
    # Test 1: Same process
    print(f"\n{'='*70}")
    print("TEST 1: Unpickle in SAME process")
    print(f"{'='*70}")
    same_func = pickle.loads(pickled)
    same_sig = getattr(same_func, '__signature__', None)
    if same_sig and '_ray_trace_ctx' in list(same_sig.parameters.keys()):
        print(f"Same process: __signature__ preserved")
    else:
        print(f"Same process: __signature__ NOT preserved")
    
    # Test 2: Different process
    print(f"\n{'='*70}")
    print("TEST 2: Unpickle in DIFFERENT process")
    print(f"{'='*70}")
    
    ctx = multiprocessing.get_context('spawn')
    with ctx.Pool(1) as pool:
        result = pool.apply(check_signature_in_subprocess, (pickled,))
    
    if result:
        print("__signature__ IS preserved (unexpected)")
    else:
        print("__signature__ is NOT preserved for functions from imported modules!")


if __name__ == "__main__":
    main()

```

---------

Signed-off-by: abrar <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
… different processes (ray-project#59634)

**Problem:**

fixes ray-project#57803

When tracing is enabled, calling an actor method from a different
process than the one that created the actor fails with:
```
TypeError: got an unexpected keyword argument '_ray_trace_ctx'
```

This commonly occurs with Ray Serve, where:
- `serve start` creates the controller actor (process A)
- Dashboard calls `ray.get_actor()` to interact with it (process B)

## Repo
Simplest way to repro is to run the following
```bash
ray start --head --tracing-startup-hook="ray.util.tracing.setup_local_tmp_tracing:setup_tracing"                                                                                                        ✔ │ ray_310 Py │ with ubuntu@devbox │ at 06:22:12
serve start
```

But here is a core specific repro script

`repro_actor_module.py`
```python
class MyActor:
    """A simple actor class that will be decorated dynamically."""
    
    def __init__(self):
        self.value = 0
    
    def my_method(self, x):
        """A simple method."""
        return x * 2
    
    def check_alive(self):
        """Health check method."""
        return True
    
    def increment(self, amount=1):
        """Method with a default parameter."""
        self.value += amount
        return self.value
```

`repro_tracing_issue.py`
```python
import multiprocessing
import subprocess
import sys


NAMESPACE = "test_ns"


def creator_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled
    
    # Import the actor class from module (NOT decorated yet)
    from repro_actor_module import MyActor
    
    setup_tracing_path = "ray.util.tracing.setup_local_tmp_tracing:setup_tracing"
    ray.init(_tracing_startup_hook=setup_tracing_path, namespace=NAMESPACE)
    
    print(f"[CREATOR] Tracing enabled: {_is_tracing_enabled()}")
    
    # Dynamically decorate and create the test actor (like Serve does)
    MyActorRemote = ray.remote(
        name="my_test_actor",
        namespace=NAMESPACE,
        num_cpus=0,
        lifetime="detached",
    )(MyActor)
    
    actor = MyActorRemote.remote()
    
    # Print signatures from creator's handle
    print(f"[CREATOR] Signatures in handle from creation:")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")
    
    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[CREATOR] my_method has _ray_trace_ctx: {has_trace}")
    
    # Verify the method works from creator
    result = ray.get(actor.my_method.remote(5))
    print(f"[CREATOR] Test call result: {result}")
    
    # Signal that actor is ready
    print("[CREATOR] Actor created, signaling getter...")
    sys.stdout.flush()
    ready_event.set()
    
    # Wait for getter to finish
    done_event.wait(timeout=30)
    print("[CREATOR] Getter finished, shutting down...")
    
    # Cleanup
    ray.kill(actor)
    ray.shutdown()


def getter_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled
    
    # Wait for creator to signal ready
    print("[GETTER] Waiting for creator to set up actor...")
    if not ready_event.wait(timeout=30):
        print("[GETTER] Timeout waiting for creator!")
        done_event.set()
        return
    
    # Connect to the existing cluster (this will also enable tracing from GCS hook)
    ray.init(address="auto", namespace=NAMESPACE)
    
    print(f"\n[GETTER] Tracing enabled: {_is_tracing_enabled()}")
    
    # Get the actor by name - this will RELOAD the class fresh in this process
    # The class loaded here was NEVER processed by _inject_tracing_into_class
    actor = ray.get_actor("my_test_actor", namespace=NAMESPACE)
    
    # Print signatures from getter's handle
    print(f"[GETTER] Signatures in handle from get_actor():")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")
    
    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[GETTER] my_method has _ray_trace_ctx: {has_trace}")
    
    # Try calling a method
    print(f"\n[GETTER] Attempting to call my_method.remote(5)...")
    sys.stdout.flush()
    try:
        result = ray.get(actor.my_method.remote(5))
        print(f"[GETTER] Method call SUCCEEDED! Result: {result}")
    except TypeError as e:
        print(f"[GETTER] Method call FAILED with TypeError: {e}")
    
    # Signal done
    done_event.set()
    ray.shutdown()


def main():
    # Stop any existing Ray cluster
    print("Stopping any existing Ray cluster...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    
    # Create synchronization events
    ready_event = multiprocessing.Event()
    done_event = multiprocessing.Event()
    
    # Start creator process
    creator = multiprocessing.Process(target=creator_process, args=(ready_event, done_event))
    creator.start()
    
    # Start getter process (will connect to existing cluster)
    getter = multiprocessing.Process(target=getter_process, args=(ready_event, done_event))
    getter.start()
    
    # Wait for both to complete
    getter.join(timeout=60)
    creator.join(timeout=10)
    
    # Clean up any hung processes
    if creator.is_alive():
        creator.terminate()
        creator.join(timeout=5)
    if getter.is_alive():
        getter.terminate()
        getter.join(timeout=5)
    
    # Cleanup Ray
    print("\nCleaning up...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    print("Done.")


if __name__ == "__main__":
    main()
```

<details>

<summary> output from master </summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:05:02,215 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:05:02,953 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:38871...
2025-12-24 07:05:02,984 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: []
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: []
  increment: ['amount']
  my_method: ['x']
[GETTER] my_method has _ray_trace_ctx: False

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call FAILED with TypeError: got an unexpected keyword argument '_ray_trace_ctx'
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

<details>

<summary>output from this PR</summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:04:03,758 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:04:04,476 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:37231...
2025-12-24 07:04:04,504 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265 
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[GETTER] my_method has _ray_trace_ctx: True

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call SUCCEEDED! Result: 10
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

**Root Cause:**

`_inject_tracing_into_class` sets `__signature__` (including
`_ray_trace_ctx`) on the method object during actor creation. However:

1. When the actor class is serialized (cloudpickle) and loaded in
another process, `__signature__` is **not preserved** on module-level
functions. See repro script at the end of PR description as proof
2. `_ActorClassMethodMetadata.create()` uses `inspect.unwrap()` which
follows the `__wrapped__` chain to the **deeply unwrapped original
method**
3. The original method's `__signature__` was lost during serialization →
signatures extracted **without** `_ray_trace_ctx`
4. When calling the method, `_tracing_actor_method_invocation` adds
`_ray_trace_ctx` to kwargs → **signature validation fails**

**Fix:**

1. In `_inject_tracing_into_class`: Set `__signature__` on the **deeply
unwrapped** method (via `inspect.unwrap`) rather than the immediate
method. This ensures `_ActorClassMethodMetadata.create()` finds it after
unwrapping.

2. In `load_actor_class`: Call `_inject_tracing_into_class` after
loading to re-inject the lost `__signature__` attributes.

**Testing:**
- Added reproduction script demonstrating cross-process actor method
calls with tracing
- All existing tracing tests pass
- Add a new test for serve with tracing

`repro_cloudpickle_signature.py`
```python
import inspect
import cloudpickle
import pickle
import multiprocessing


def check_signature_in_subprocess(pickled_func_bytes):
    func = pickle.loads(pickled_func_bytes)
    
    print(f"[SUBPROCESS] Unpickled function: {func}")
    print(f"[SUBPROCESS] Module: {func.__module__}")
    
    sig = getattr(func, '__signature__', None)
    if sig is not None:
        params = list(sig.parameters.keys())
        print(f"[SUBPROCESS] __signature__: {sig}")
        if '_ray_trace_ctx' in params:
            print(f"[SUBPROCESS] __signature__ WAS preserved")
            return True
        else:
            print(f"[SUBPROCESS] __signature__ NOT preserved (missing _ray_trace_ctx)")
            return False
    else:
        print(f"[SUBPROCESS] __signature__ NOT preserved (attribute missing)")
        return False


def main():
    from repro_actor_module import MyActor
    func = MyActor.my_method
    
    print(f"\n[MAIN] Function: {func}")
    print(f"[MAIN] Module: {func.__module__}")
    print(f"[MAIN] __signature__ before: {getattr(func, '__signature__', 'NOT SET')}")
    
    # Set a custom __signature__ with _ray_trace_ctx
    custom_sig = inspect.signature(func)
    new_params = list(custom_sig.parameters.values()) + [
        inspect.Parameter("_ray_trace_ctx", inspect.Parameter.KEYWORD_ONLY, default=None)
    ]
    func.__signature__ = custom_sig.replace(parameters=new_params)
    
    print(f"[MAIN] __signature__ after: {func.__signature__}")
    print(f"[MAIN] Parameters: {list(func.__signature__.parameters.keys())}")
    
    # Pickle
    print(f"\n[MAIN] Pickling with cloudpickle...")
    pickled = cloudpickle.dumps(func)
    
    # Test 1: Same process
    print(f"\n{'='*70}")
    print("TEST 1: Unpickle in SAME process")
    print(f"{'='*70}")
    same_func = pickle.loads(pickled)
    same_sig = getattr(same_func, '__signature__', None)
    if same_sig and '_ray_trace_ctx' in list(same_sig.parameters.keys()):
        print(f"Same process: __signature__ preserved")
    else:
        print(f"Same process: __signature__ NOT preserved")
    
    # Test 2: Different process
    print(f"\n{'='*70}")
    print("TEST 2: Unpickle in DIFFERENT process")
    print(f"{'='*70}")
    
    ctx = multiprocessing.get_context('spawn')
    with ctx.Pool(1) as pool:
        result = pool.apply(check_signature_in_subprocess, (pickled,))
    
    if result:
        print("__signature__ IS preserved (unexpected)")
    else:
        print("__signature__ is NOT preserved for functions from imported modules!")


if __name__ == "__main__":
    main()

```

---------

Signed-off-by: abrar <[email protected]>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
… different processes (ray-project#59634)

**Problem:**

fixes ray-project#57803

When tracing is enabled, calling an actor method from a different
process than the one that created the actor fails with:
```
TypeError: got an unexpected keyword argument '_ray_trace_ctx'
```

This commonly occurs with Ray Serve, where:
- `serve start` creates the controller actor (process A)
- Dashboard calls `ray.get_actor()` to interact with it (process B)

## Repo
Simplest way to repro is to run the following
```bash
ray start --head --tracing-startup-hook="ray.util.tracing.setup_local_tmp_tracing:setup_tracing"                                                                                                        ✔ │ ray_310 Py │ with ubuntu@devbox │ at 06:22:12
serve start
```

But here is a core specific repro script

`repro_actor_module.py`
```python
class MyActor:
    """A simple actor class that will be decorated dynamically."""

    def __init__(self):
        self.value = 0

    def my_method(self, x):
        """A simple method."""
        return x * 2

    def check_alive(self):
        """Health check method."""
        return True

    def increment(self, amount=1):
        """Method with a default parameter."""
        self.value += amount
        return self.value
```

`repro_tracing_issue.py`
```python
import multiprocessing
import subprocess
import sys

NAMESPACE = "test_ns"

def creator_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled

    # Import the actor class from module (NOT decorated yet)
    from repro_actor_module import MyActor

    setup_tracing_path = "ray.util.tracing.setup_local_tmp_tracing:setup_tracing"
    ray.init(_tracing_startup_hook=setup_tracing_path, namespace=NAMESPACE)

    print(f"[CREATOR] Tracing enabled: {_is_tracing_enabled()}")

    # Dynamically decorate and create the test actor (like Serve does)
    MyActorRemote = ray.remote(
        name="my_test_actor",
        namespace=NAMESPACE,
        num_cpus=0,
        lifetime="detached",
    )(MyActor)

    actor = MyActorRemote.remote()

    # Print signatures from creator's handle
    print(f"[CREATOR] Signatures in handle from creation:")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")

    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[CREATOR] my_method has _ray_trace_ctx: {has_trace}")

    # Verify the method works from creator
    result = ray.get(actor.my_method.remote(5))
    print(f"[CREATOR] Test call result: {result}")

    # Signal that actor is ready
    print("[CREATOR] Actor created, signaling getter...")
    sys.stdout.flush()
    ready_event.set()

    # Wait for getter to finish
    done_event.wait(timeout=30)
    print("[CREATOR] Getter finished, shutting down...")

    # Cleanup
    ray.kill(actor)
    ray.shutdown()

def getter_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled

    # Wait for creator to signal ready
    print("[GETTER] Waiting for creator to set up actor...")
    if not ready_event.wait(timeout=30):
        print("[GETTER] Timeout waiting for creator!")
        done_event.set()
        return

    # Connect to the existing cluster (this will also enable tracing from GCS hook)
    ray.init(address="auto", namespace=NAMESPACE)

    print(f"\n[GETTER] Tracing enabled: {_is_tracing_enabled()}")

    # Get the actor by name - this will RELOAD the class fresh in this process
    # The class loaded here was NEVER processed by _inject_tracing_into_class
    actor = ray.get_actor("my_test_actor", namespace=NAMESPACE)

    # Print signatures from getter's handle
    print(f"[GETTER] Signatures in handle from get_actor():")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")

    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[GETTER] my_method has _ray_trace_ctx: {has_trace}")

    # Try calling a method
    print(f"\n[GETTER] Attempting to call my_method.remote(5)...")
    sys.stdout.flush()
    try:
        result = ray.get(actor.my_method.remote(5))
        print(f"[GETTER] Method call SUCCEEDED! Result: {result}")
    except TypeError as e:
        print(f"[GETTER] Method call FAILED with TypeError: {e}")

    # Signal done
    done_event.set()
    ray.shutdown()

def main():
    # Stop any existing Ray cluster
    print("Stopping any existing Ray cluster...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)

    # Create synchronization events
    ready_event = multiprocessing.Event()
    done_event = multiprocessing.Event()

    # Start creator process
    creator = multiprocessing.Process(target=creator_process, args=(ready_event, done_event))
    creator.start()

    # Start getter process (will connect to existing cluster)
    getter = multiprocessing.Process(target=getter_process, args=(ready_event, done_event))
    getter.start()

    # Wait for both to complete
    getter.join(timeout=60)
    creator.join(timeout=10)

    # Clean up any hung processes
    if creator.is_alive():
        creator.terminate()
        creator.join(timeout=5)
    if getter.is_alive():
        getter.terminate()
        getter.join(timeout=5)

    # Cleanup Ray
    print("\nCleaning up...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    print("Done.")

if __name__ == "__main__":
    main()
```

<details>

<summary> output from master </summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:05:02,215 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:05:02,953 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:38871...
2025-12-24 07:05:02,984 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: []
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: []
  increment: ['amount']
  my_method: ['x']
[GETTER] my_method has _ray_trace_ctx: False

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call FAILED with TypeError: got an unexpected keyword argument '_ray_trace_ctx'
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

<details>

<summary>output from this PR</summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:04:03,758 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:04:04,476 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:37231...
2025-12-24 07:04:04,504 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[GETTER] my_method has _ray_trace_ctx: True

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call SUCCEEDED! Result: 10
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

**Root Cause:**

`_inject_tracing_into_class` sets `__signature__` (including
`_ray_trace_ctx`) on the method object during actor creation. However:

1. When the actor class is serialized (cloudpickle) and loaded in
another process, `__signature__` is **not preserved** on module-level
functions. See repro script at the end of PR description as proof
2. `_ActorClassMethodMetadata.create()` uses `inspect.unwrap()` which
follows the `__wrapped__` chain to the **deeply unwrapped original
method**
3. The original method's `__signature__` was lost during serialization →
signatures extracted **without** `_ray_trace_ctx`
4. When calling the method, `_tracing_actor_method_invocation` adds
`_ray_trace_ctx` to kwargs → **signature validation fails**

**Fix:**

1. In `_inject_tracing_into_class`: Set `__signature__` on the **deeply
unwrapped** method (via `inspect.unwrap`) rather than the immediate
method. This ensures `_ActorClassMethodMetadata.create()` finds it after
unwrapping.

2. In `load_actor_class`: Call `_inject_tracing_into_class` after
loading to re-inject the lost `__signature__` attributes.

**Testing:**
- Added reproduction script demonstrating cross-process actor method
calls with tracing
- All existing tracing tests pass
- Add a new test for serve with tracing

`repro_cloudpickle_signature.py`
```python
import inspect
import cloudpickle
import pickle
import multiprocessing

def check_signature_in_subprocess(pickled_func_bytes):
    func = pickle.loads(pickled_func_bytes)

    print(f"[SUBPROCESS] Unpickled function: {func}")
    print(f"[SUBPROCESS] Module: {func.__module__}")

    sig = getattr(func, '__signature__', None)
    if sig is not None:
        params = list(sig.parameters.keys())
        print(f"[SUBPROCESS] __signature__: {sig}")
        if '_ray_trace_ctx' in params:
            print(f"[SUBPROCESS] __signature__ WAS preserved")
            return True
        else:
            print(f"[SUBPROCESS] __signature__ NOT preserved (missing _ray_trace_ctx)")
            return False
    else:
        print(f"[SUBPROCESS] __signature__ NOT preserved (attribute missing)")
        return False

def main():
    from repro_actor_module import MyActor
    func = MyActor.my_method

    print(f"\n[MAIN] Function: {func}")
    print(f"[MAIN] Module: {func.__module__}")
    print(f"[MAIN] __signature__ before: {getattr(func, '__signature__', 'NOT SET')}")

    # Set a custom __signature__ with _ray_trace_ctx
    custom_sig = inspect.signature(func)
    new_params = list(custom_sig.parameters.values()) + [
        inspect.Parameter("_ray_trace_ctx", inspect.Parameter.KEYWORD_ONLY, default=None)
    ]
    func.__signature__ = custom_sig.replace(parameters=new_params)

    print(f"[MAIN] __signature__ after: {func.__signature__}")
    print(f"[MAIN] Parameters: {list(func.__signature__.parameters.keys())}")

    # Pickle
    print(f"\n[MAIN] Pickling with cloudpickle...")
    pickled = cloudpickle.dumps(func)

    # Test 1: Same process
    print(f"\n{'='*70}")
    print("TEST 1: Unpickle in SAME process")
    print(f"{'='*70}")
    same_func = pickle.loads(pickled)
    same_sig = getattr(same_func, '__signature__', None)
    if same_sig and '_ray_trace_ctx' in list(same_sig.parameters.keys()):
        print(f"Same process: __signature__ preserved")
    else:
        print(f"Same process: __signature__ NOT preserved")

    # Test 2: Different process
    print(f"\n{'='*70}")
    print("TEST 2: Unpickle in DIFFERENT process")
    print(f"{'='*70}")

    ctx = multiprocessing.get_context('spawn')
    with ctx.Pool(1) as pool:
        result = pool.apply(check_signature_in_subprocess, (pickled,))

    if result:
        print("__signature__ IS preserved (unexpected)")
    else:
        print("__signature__ is NOT preserved for functions from imported modules!")

if __name__ == "__main__":
    main()

```

---------

Signed-off-by: abrar <[email protected]>
Signed-off-by: peterxcli <[email protected]>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
… different processes (ray-project#59634)

**Problem:**

fixes ray-project#57803

When tracing is enabled, calling an actor method from a different
process than the one that created the actor fails with:
```
TypeError: got an unexpected keyword argument '_ray_trace_ctx'
```

This commonly occurs with Ray Serve, where:
- `serve start` creates the controller actor (process A)
- Dashboard calls `ray.get_actor()` to interact with it (process B)

## Repo
Simplest way to repro is to run the following
```bash
ray start --head --tracing-startup-hook="ray.util.tracing.setup_local_tmp_tracing:setup_tracing"                                                                                                        ✔ │ ray_310 Py │ with ubuntu@devbox │ at 06:22:12
serve start
```

But here is a core specific repro script

`repro_actor_module.py`
```python
class MyActor:
    """A simple actor class that will be decorated dynamically."""

    def __init__(self):
        self.value = 0

    def my_method(self, x):
        """A simple method."""
        return x * 2

    def check_alive(self):
        """Health check method."""
        return True

    def increment(self, amount=1):
        """Method with a default parameter."""
        self.value += amount
        return self.value
```

`repro_tracing_issue.py`
```python
import multiprocessing
import subprocess
import sys

NAMESPACE = "test_ns"

def creator_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled

    # Import the actor class from module (NOT decorated yet)
    from repro_actor_module import MyActor

    setup_tracing_path = "ray.util.tracing.setup_local_tmp_tracing:setup_tracing"
    ray.init(_tracing_startup_hook=setup_tracing_path, namespace=NAMESPACE)

    print(f"[CREATOR] Tracing enabled: {_is_tracing_enabled()}")

    # Dynamically decorate and create the test actor (like Serve does)
    MyActorRemote = ray.remote(
        name="my_test_actor",
        namespace=NAMESPACE,
        num_cpus=0,
        lifetime="detached",
    )(MyActor)

    actor = MyActorRemote.remote()

    # Print signatures from creator's handle
    print(f"[CREATOR] Signatures in handle from creation:")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")

    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[CREATOR] my_method has _ray_trace_ctx: {has_trace}")

    # Verify the method works from creator
    result = ray.get(actor.my_method.remote(5))
    print(f"[CREATOR] Test call result: {result}")

    # Signal that actor is ready
    print("[CREATOR] Actor created, signaling getter...")
    sys.stdout.flush()
    ready_event.set()

    # Wait for getter to finish
    done_event.wait(timeout=30)
    print("[CREATOR] Getter finished, shutting down...")

    # Cleanup
    ray.kill(actor)
    ray.shutdown()

def getter_process(ready_event, done_event):
    import ray
    from ray.util.tracing.tracing_helper import _is_tracing_enabled

    # Wait for creator to signal ready
    print("[GETTER] Waiting for creator to set up actor...")
    if not ready_event.wait(timeout=30):
        print("[GETTER] Timeout waiting for creator!")
        done_event.set()
        return

    # Connect to the existing cluster (this will also enable tracing from GCS hook)
    ray.init(address="auto", namespace=NAMESPACE)

    print(f"\n[GETTER] Tracing enabled: {_is_tracing_enabled()}")

    # Get the actor by name - this will RELOAD the class fresh in this process
    # The class loaded here was NEVER processed by _inject_tracing_into_class
    actor = ray.get_actor("my_test_actor", namespace=NAMESPACE)

    # Print signatures from getter's handle
    print(f"[GETTER] Signatures in handle from get_actor():")
    for method_name, sig in actor._ray_method_signatures.items():
        param_names = [p.name for p in sig]
        print(f"  {method_name}: {param_names}")

    my_method_sig = actor._ray_method_signatures.get("my_method", [])
    has_trace = "_ray_trace_ctx" in [p.name for p in my_method_sig]
    print(f"[GETTER] my_method has _ray_trace_ctx: {has_trace}")

    # Try calling a method
    print(f"\n[GETTER] Attempting to call my_method.remote(5)...")
    sys.stdout.flush()
    try:
        result = ray.get(actor.my_method.remote(5))
        print(f"[GETTER] Method call SUCCEEDED! Result: {result}")
    except TypeError as e:
        print(f"[GETTER] Method call FAILED with TypeError: {e}")

    # Signal done
    done_event.set()
    ray.shutdown()

def main():
    # Stop any existing Ray cluster
    print("Stopping any existing Ray cluster...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)

    # Create synchronization events
    ready_event = multiprocessing.Event()
    done_event = multiprocessing.Event()

    # Start creator process
    creator = multiprocessing.Process(target=creator_process, args=(ready_event, done_event))
    creator.start()

    # Start getter process (will connect to existing cluster)
    getter = multiprocessing.Process(target=getter_process, args=(ready_event, done_event))
    getter.start()

    # Wait for both to complete
    getter.join(timeout=60)
    creator.join(timeout=10)

    # Clean up any hung processes
    if creator.is_alive():
        creator.terminate()
        creator.join(timeout=5)
    if getter.is_alive():
        getter.terminate()
        getter.join(timeout=5)

    # Cleanup Ray
    print("\nCleaning up...")
    subprocess.run(["ray", "stop", "--force"], capture_output=True)
    print("Done.")

if __name__ == "__main__":
    main()
```

<details>

<summary> output from master </summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:05:02,215 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:05:02,953 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:38871...
2025-12-24 07:05:02,984 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: []
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: []
  increment: ['amount']
  my_method: ['x']
[GETTER] my_method has _ray_trace_ctx: False

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call FAILED with TypeError: got an unexpected keyword argument '_ray_trace_ctx'
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

<details>

<summary>output from this PR</summary>

```bash
❯ python repro_tracing_issue.py
Stopping any existing Ray cluster...
[GETTER] Waiting for creator to set up actor...
2025-12-24 07:04:03,758 INFO worker.py:1991 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[CREATOR] Tracing enabled: True
[CREATOR] Signatures in handle from creation:
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[CREATOR] my_method has _ray_trace_ctx: True
[CREATOR] Test call result: 10
[CREATOR] Actor created, signaling getter...
2025-12-24 07:04:04,476 INFO worker.py:1811 -- Connecting to existing Ray cluster at address: 172.31.7.228:37231...
2025-12-24 07:04:04,504 INFO worker.py:1991 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
/home/ubuntu/ray/python/ray/_private/worker.py:2039: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(

[GETTER] Tracing enabled: True
[GETTER] Signatures in handle from get_actor():
  __init__: ['_ray_trace_ctx']
  __ray_call__: ['fn', 'args', '_ray_trace_ctx', 'kwargs']
  __ray_ready__: ['_ray_trace_ctx']
  __ray_terminate__: ['_ray_trace_ctx']
  check_alive: ['_ray_trace_ctx']
  increment: ['amount', '_ray_trace_ctx']
  my_method: ['x', '_ray_trace_ctx']
[GETTER] my_method has _ray_trace_ctx: True

[GETTER] Attempting to call my_method.remote(5)...
[GETTER] Method call SUCCEEDED! Result: 10
[CREATOR] Getter finished, shutting down...

Cleaning up...
Done.
```

</details>

**Root Cause:**

`_inject_tracing_into_class` sets `__signature__` (including
`_ray_trace_ctx`) on the method object during actor creation. However:

1. When the actor class is serialized (cloudpickle) and loaded in
another process, `__signature__` is **not preserved** on module-level
functions. See repro script at the end of PR description as proof
2. `_ActorClassMethodMetadata.create()` uses `inspect.unwrap()` which
follows the `__wrapped__` chain to the **deeply unwrapped original
method**
3. The original method's `__signature__` was lost during serialization →
signatures extracted **without** `_ray_trace_ctx`
4. When calling the method, `_tracing_actor_method_invocation` adds
`_ray_trace_ctx` to kwargs → **signature validation fails**

**Fix:**

1. In `_inject_tracing_into_class`: Set `__signature__` on the **deeply
unwrapped** method (via `inspect.unwrap`) rather than the immediate
method. This ensures `_ActorClassMethodMetadata.create()` finds it after
unwrapping.

2. In `load_actor_class`: Call `_inject_tracing_into_class` after
loading to re-inject the lost `__signature__` attributes.

**Testing:**
- Added reproduction script demonstrating cross-process actor method
calls with tracing
- All existing tracing tests pass
- Add a new test for serve with tracing

`repro_cloudpickle_signature.py`
```python
import inspect
import cloudpickle
import pickle
import multiprocessing

def check_signature_in_subprocess(pickled_func_bytes):
    func = pickle.loads(pickled_func_bytes)

    print(f"[SUBPROCESS] Unpickled function: {func}")
    print(f"[SUBPROCESS] Module: {func.__module__}")

    sig = getattr(func, '__signature__', None)
    if sig is not None:
        params = list(sig.parameters.keys())
        print(f"[SUBPROCESS] __signature__: {sig}")
        if '_ray_trace_ctx' in params:
            print(f"[SUBPROCESS] __signature__ WAS preserved")
            return True
        else:
            print(f"[SUBPROCESS] __signature__ NOT preserved (missing _ray_trace_ctx)")
            return False
    else:
        print(f"[SUBPROCESS] __signature__ NOT preserved (attribute missing)")
        return False

def main():
    from repro_actor_module import MyActor
    func = MyActor.my_method

    print(f"\n[MAIN] Function: {func}")
    print(f"[MAIN] Module: {func.__module__}")
    print(f"[MAIN] __signature__ before: {getattr(func, '__signature__', 'NOT SET')}")

    # Set a custom __signature__ with _ray_trace_ctx
    custom_sig = inspect.signature(func)
    new_params = list(custom_sig.parameters.values()) + [
        inspect.Parameter("_ray_trace_ctx", inspect.Parameter.KEYWORD_ONLY, default=None)
    ]
    func.__signature__ = custom_sig.replace(parameters=new_params)

    print(f"[MAIN] __signature__ after: {func.__signature__}")
    print(f"[MAIN] Parameters: {list(func.__signature__.parameters.keys())}")

    # Pickle
    print(f"\n[MAIN] Pickling with cloudpickle...")
    pickled = cloudpickle.dumps(func)

    # Test 1: Same process
    print(f"\n{'='*70}")
    print("TEST 1: Unpickle in SAME process")
    print(f"{'='*70}")
    same_func = pickle.loads(pickled)
    same_sig = getattr(same_func, '__signature__', None)
    if same_sig and '_ray_trace_ctx' in list(same_sig.parameters.keys()):
        print(f"Same process: __signature__ preserved")
    else:
        print(f"Same process: __signature__ NOT preserved")

    # Test 2: Different process
    print(f"\n{'='*70}")
    print("TEST 2: Unpickle in DIFFERENT process")
    print(f"{'='*70}")

    ctx = multiprocessing.get_context('spawn')
    with ctx.Pool(1) as pool:
        result = pool.apply(check_signature_in_subprocess, (pickled,))

    if result:
        print("__signature__ IS preserved (unexpected)")
    else:
        print("__signature__ is NOT preserved for functions from imported modules!")

if __name__ == "__main__":
    main()

```

---------

Signed-off-by: abrar <[email protected]>
Signed-off-by: peterxcli <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests serve Ray Serve Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[serve] Serve is unable to start when tracing is enabled

3 participants