Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async mode in PPO/GRPO #580

Open
ashish230897 opened this issue Feb 25, 2025 · 2 comments · May be fixed by #586
Open

Async mode in PPO/GRPO #580

ashish230897 opened this issue Feb 25, 2025 · 2 comments · May be fixed by #586

Comments

@ashish230897
Copy link

Hello,

This is not regarding the issue in the code, but this readme: https://github.com/allenai/open-instruct/blob/main/docs/algorithms/ppo.md
Here, in the code at the last, inside the if async_mode condition:

I think
param_and_query_Q.put((agent.param, queries))
should be replaced by
param_and_query_Q.put((agent.param, next_queries))

Please let me know if that is not the case. Thanks!

@ashish230897
Copy link
Author

Also, the else condition should have:
queries = next_queries

@vwxyzjn
Copy link
Collaborator

vwxyzjn commented Feb 27, 2025

Thanks for looking into it closely! You are right. The correct code should be

import queue
import threading
import time

class Agent():
    def __init__(self):
        self.param = 1

    def learn(self, data):
        self.param += 1

def query_generator_fn():
    for i in range(1, 100):
        yield i


ITER = 7
batch_size = 32
agent = Agent()
data_Q = queue.Queue(maxsize=1)
param_and_query_Q = queue.Queue(maxsize=1)
def actor():
    for i in range(1, ITER + 1):
        params, query = param_and_query_Q.get()
        data = params
        print(f"[actor] generating data π_{params} -> p_{query} D_π_{data}")
        time.sleep(1) # simulate data generation
        data_Q.put((query, data))

actor_thread = threading.Thread(target=actor)
actor_thread.start()

# initial param put
generator = query_generator_fn()
next_queries = next(generator)
param_and_query_Q.put((agent.param, next_queries))

# cleanba style stuff
async_mode = True
start_time = time.time()
for g in range(1, ITER + 1):
    queries = next_queries
    if async_mode:
        if g != 1:
            next_queries = next(generator)
        param_and_query_Q.put((agent.param, next_queries))
    else:
        if g != 1:
            next_queries = next(generator)
            param_and_query_Q.put((agent.param, next_queries)) # note the indent here is different
            queries = next_queries
    _, data = data_Q.get()
    old_param = agent.param
    agent.learn(data)
    time.sleep(1) # simulate training
    print(f"--[leaner] get π_{old_param} ->  p_{queries} D_π_{data} -> π_{agent.param}, time: {time.time() - start_time}")
actor_thread.join()

Async mode:

Existing incorrect code

➜  open-instruct git:(main) ✗ python x13.py
[actor] generating data π_1 -> p_1 D_π_1
[actor] generating data π_1 -> p_1 D_π_1
--[leaner] get π_1 ->  p_1 D_π_1 -> π_2, time: 2.0003821849823
❌ [actor] generating data π_2 -> p_1 D_π_2
--[leaner] get π_2 ->  p_1 D_π_1 -> π_3, time: 3.001086950302124
[actor] generating data π_3 -> p_2 D_π_3
❌ --[leaner] get π_3 ->  p_2 D_π_2 -> π_4, time: 4.001964569091797
[actor] generating data π_4 -> p_3 D_π_4
--[leaner] get π_4 ->  p_3 D_π_3 -> π_5, time: 5.002766847610474
[actor] generating data π_5 -> p_4 D_π_5

correct (because D_π_2 is generated with p_2 and p_2 D_π_2 is then used to update the policy):

[actor] generating data π_1 -> p_1 D_π_1
[actor] generating data π_1 -> p_1 D_π_1
--[leaner] get π_1 ->  p_1 D_π_1 -> π_2, time: 2.000866651535034
✅ [actor] generating data π_2 -> p_2 D_π_2
--[leaner] get π_2 ->  p_1 D_π_1 -> π_3, time: 3.0017740726470947
[actor] generating data π_3 -> p_3 D_π_3
✅--[leaner] get π_3 ->  p_2 D_π_2 -> π_4, time: 4.002020359039307
[actor] generating data π_4 -> p_4 D_π_4
--[leaner] get π_4 ->  p_3 D_π_3 -> π_5, time: 5.002366304397583
[actor] generating data π_5 -> p_5 D_π_5

Sync mode:

Existing incorrect code (p_2 D_π_2 is the generated data, but we used p_1 D_π_2 to update the policy)

[actor] generating data π_1 -> p_1 D_π_1
--[leaner] get π_1 ->  p_1 D_π_1 -> π_2, time: 2.0009827613830566
[actor] generating data π_2 -> p_2 D_π_2
❌ --[leaner] get π_2 ->  p_1 D_π_2 -> π_3, time: 4.002542018890381
[actor] generating data π_3 -> p_3 D_π_3
--[leaner] get π_3 ->  p_2 D_π_3 -> π_4, time: 6.004257678985596
[actor] generating data π_4 -> p_4 D_π_4
--[leaner] get π_4 ->  p_3 D_π_4 -> π_5, time: 8.00551176071167
[actor] generating data π_5 -> p_5 D_π_5
--[leaner] get π_5 ->  p_4 D_π_5 -> π_6, time: 10.007193088531494
[actor] generating data π_6 -> p_6 D_π_6

correct (because D_π_2 is generated with p_2 and p_2 D_π_2 is then used to update the policy):

[actor] generating data π_1 -> p_1 D_π_1
--[leaner] get π_1 ->  p_1 D_π_1 -> π_2, time: 2.0003509521484375
✅ [actor] generating data π_2 -> p_2 D_π_2
✅ --[leaner] get π_2 ->  p_2 D_π_2 -> π_3, time: 4.001307487487793
[actor] generating data π_3 -> p_3 D_π_3
--[leaner] get π_3 ->  p_3 D_π_3 -> π_4, time: 6.002503395080566
[actor] generating data π_4 -> p_4 D_π_4
--[leaner] get π_4 ->  p_4 D_π_4 -> π_5, time: 8.003612279891968
[actor] generating data π_5 -> p_5 D_π_5

@vwxyzjn vwxyzjn linked a pull request Feb 27, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants