Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Accelerated DAG should execute work on actor's main thread #46336

Open
stephanie-wang opened this issue Jun 29, 2024 · 8 comments
Assignees
Labels
compiled-graphs core Issues that should be addressed in Ray Core enhancement Request for new feature and/or capability good-first-issue Great starter issue for someone just starting to contribute to Ray P1 Issue that should be fixed within a few weeks UX The issue is not only about technical bugs

Comments

@stephanie-wang
Copy link
Contributor

Description

Currently actor tasks in the DAG execute on a background thread on the actor, which is a bit of a gotcha when the actor has some thread-local state that is initialized before the DAG is created. We should move the execution loop to the main thread and execute other system-level tasks on a background thread.

Use case

No response

@stephanie-wang stephanie-wang added enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component) P1 Issue that should be fixed within a few weeks UX The issue is not only about technical bugs compiled-graphs and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jun 29, 2024
@rkooo567 rkooo567 added P0 Issues that should be fixed in short order and removed P1 Issue that should be fixed within a few weeks labels Aug 12, 2024
@anyscalesam anyscalesam added the core Issues that should be addressed in Ray Core label Aug 26, 2024
@rkooo567 rkooo567 self-assigned this Sep 3, 2024
@stephanie-wang stephanie-wang added good-first-issue Great starter issue for someone just starting to contribute to Ray P1 Issue that should be fixed within a few weeks and removed P0 Issues that should be fixed in short order labels Oct 3, 2024
@stephanie-wang
Copy link
Contributor Author

Default behavior: the DAG should run on the main actor thread

Additional API: if user specifies a thread name during experimental_compile, then we run the DAG on that thread

If a thread is already being used by some other DAG, throw an error during compile.

@xslingcn
Copy link

I will be working on this.

@kevin85421
Copy link
Member

system-level tasks => ex: close channel, resize channel

@kevin85421
Copy link
Member

kevin85421 commented Jan 21, 2025

@stephanie-wang I took a look at the issue. The problem is that any normal ray.get call to the actors belonging to the graph will not work because the execution loop blocks the main thread indefinitely.

It does not seem worth it to support accessing thread-local state while disabling users from submitting normal tasks.

def test_simulate_pipeline_parallelism(ray_start_regular, single_fetch):
    ...
    worker_0 = Worker.remote(0)
    worker_1 = Worker.remote(1)

    # Worker 0: FFFBBB
    # Worker 1: BBB
    with InputNode() as inp:
        w0_input = worker_0.read_input.bind(inp)
        ...
        output_dag = MultiOutputNode([d03, d04, d05])

    output_dag = output_dag.experimental_compile()
    res = output_dag.execute([0, 1, 2])
    ...
    assert ray.get(res) == [0, 1, 2]

    assert ray.get(worker_0.get_logs.remote()) == [ # <-- block here
        "FWD rank-0, batch-0",
        "FWD rank-0, batch-1",
        "FWD rank-0, batch-2",
        "BWD rank-0, batch-0",
        "BWD rank-0, batch-1",
        "BWD rank-0, batch-2",
    ]

I used py-spy to check the main thread of worker_0. It is stuck at get_objects when ray.get(worker_0.get_logs.remote()) blocks. In addition, I check the log of the core driver process. When the core driver process submits the task get_logs, 9 tasks have already been submitted, and one of them is still in-flight. The 9 tasks include 1 get_node_id, 7 do_allocate_channel, and 1 for execution loop which is still in-flight.

Image

Image

@kevin85421
Copy link
Member

Maybe it is not worth supporting access to thread-local state if it blocks normal tasks.

@kevin85421
Copy link
Member

kevin85421 commented Jan 23, 2025

#50032 attempts to move the execution loop to the same thread as the default thread (the thread used when concurrency_group is not specified in options).

  • The execution loop can't be running on MainThread

    • The task_execution_service_ on the actor's core worker process is not only responsible for handling control plane logic, such as posting the CoreWorker.HandlePushTaskActor instance, but also for executing actor tasks if no concurrency group is set. Therefore, if the execution doesn't finish, no other actor tasks can be executed, even if the incoming tasks belong to different concurrency groups, because the main thread responsible for control plane logic is currently busy with task execution.
    • TLDR: If the execution loop is on the MainThread, normal tasks can't be submitted, regardless of whether they belong to the same concurrency group or not.
    • Solution: In [WIP] Move execution loop to the same thread as the constructor of an actor #50032, I always create a new thread for actor tasks, allowing the main thread to focus solely on control plane logic.
  • In current Ray implementations, the __init__ of the actor instance is always running on the MainThread. In the following example, the constructor will be running on the MainThread and compute will be running on another thread.

     import threading
     import ray
     
     # @ray.remote(concurrency_groups={"io": 2})
     @ray.remote(concurrency_groups={"io": 2})
     class ThreadLocalActor:
         @ray.method(concurrency_group="io")
         def __init__(self):
             # data local to actor default executor thread
             current_thread = threading.current_thread()
             print(current_thread)
             self.local_data = threading.local()
             self.local_data.value = 42
             self.local_data.thread_id = threading.get_ident()
     
         @ray.method(concurrency_group="io")
         def compute(self, value):
             current_thread = threading.current_thread()
             print(current_thread)
             assert threading.get_ident() == self.local_data.thread_id
             return value + self.local_data.value
     
     actor = ThreadLocalActor.remote()
     ref = actor.compute.remote(1)
     print(ray.get(ref))

Based on the two conclusions mentioned above, it is difficult to achieve our goal of putting the execution loop and __init__ in the same thread. One possible solution is to consider moving the constructor's execution to a thread other than the MainThread.

@kevin85421
Copy link
Member

b000620 this commit moves the constructor to run on the default executor. I ran the following script and the output is attached below.

Both __init__ and compute are running on the same thread and local_data has the same address, but local_data.value doesn't exist. It looks like Ray updates the thread context.

def test_execute_on_actor_thread(shutdown_only):
    import threading

    @ray.remote
    class ThreadLocalActor:
        def __init__(self):
            # data local to actor default executor thread
            print("init")
            current_thread = threading.current_thread()
            print(current_thread)
            self.local_data = threading.local()
            print("local data",self.local_data)
            self.local_data.value = 42
            print("local data value",self.local_data.value)
            print("value", id(self.local_data.value))

        def compute(self, value):
            print("compute")
            current_thread = threading.current_thread()
            print(current_thread)
            print("local data",self.local_data)
            if hasattr(self.local_data, "value"):
                print("self.local_data.value", self.local_data.value)
            else:
                print("self.local_data.value is not set")
            return value + self.local_data.value

    actor = ThreadLocalActor.remote()
    assert ray.get(actor.compute.remote(1)) == 43
Image

@kevin85421
Copy link
Member

The destructor of MyObject will be called immediately after f1 finishes, so f2 cannot access self.thread_data.value. Based on my current observation, our Python thread behaves differently from a normal Python thread. In a normal Python thread, once the thread stops, it cannot be restarted, and thread-local data is released. However, in Ray, the underlying C++ thread is always running, but it is only considered a Python thread when it executes Python functions. From Python's perspective, this appears as if a stopped thread is being restarted.

import ray
import threading


class MyObject:
    def __init__(self, value):
        self.value = value

    def __del__(self):
        print("MyObject.__del__", self.value)


@ray.remote(concurrency_groups={"io": 1})
class Actor:
    def __init__(self):
        pass

    @ray.method(concurrency_group="io")
    def f1(self):
        print("f1")
        current_thread = threading.current_thread()
        print(f"current_thread: {current_thread}")
        self.threading_data = threading.local()
        self.threading_data.value = MyObject("f1")
        print(f"threading_data: {self.threading_data}, value: {self.threading_data.value}")

    @ray.method(concurrency_group="io")
    def f2(self):
        print("f2")
        current_thread = threading.current_thread()
        print(f"current_thread: {current_thread}")
        print(f"threading_data: {self.threading_data}, value: {self.threading_data.value}")

a = Actor.remote()
ray.get(a.f1.remote())
ray.get(a.f2.remote())
Image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
compiled-graphs core Issues that should be addressed in Ray Core enhancement Request for new feature and/or capability good-first-issue Great starter issue for someone just starting to contribute to Ray P1 Issue that should be fixed within a few weeks UX The issue is not only about technical bugs
Projects
None yet
Development

No branches or pull requests

6 participants