Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Early Exit 2 - sdf.collect(...).to_pandas() #729

Closed
wants to merge 7 commits into from

Conversation

gwaramadze
Copy link
Contributor

@gwaramadze gwaramadze commented Jan 29, 2025

Introduces an ability to collect a small batch of data and inspect interactively in a Python interpreter or a tool like Jupyter Notebook.

  1. Like POC 1 makes use of ExitManager, an internal class that facilitates stopping the Application once a certain number of messages has been processed or a timeout in seconds is reached.
  2. Adds StreamingDataFrame.collect method:
    • Accepts number and timeout parameters and configures ExitManager.
    • Ensures that a single update operation is appended to the sdf. This operation will collect processed messages into a list. This step will not happen on consecutive sdf.collect calls.
    • Triggers Application.run and Application.reset.
    • The collect name is temporary. I don't like the clash with the windowed aggregation function named collect.
  3. Adds to_pandas and to_polars methods, inspired by to_topic. These methods will return either Pandas or Polars dataframe, not our StreamingDataFrame.
  4. Note: Application reset will not reset the consumer group, this means that consecutive runs will consume new messages (according to committed offset) instead of reprocessing the stream.
  5. Note: There is no need for the user to do app.run()

Example:

from uuid import uuid4
from quixstreams import Application

app = Application(
    broker_address="localhost:19092",
    auto_offset_reset="earliest",
    consumer_group=str(uuid4()),
    use_changelog_topics=False,
)

topic = app.topic("1000000-numbers-100-keys")
sdf = app.dataframe(topic=topic)
sdf.collect(number=2)

# Inspect
sdf.to_pandas()
sdf.to_polars()

# Collect new data
sdf.collect(timeout=3)

# Inspect new data
sdf.to_pandas()
sdf.to_polars()

This reverts commit c68503e.
ExistManager is responsible for stopping the application once
a certain number of messages arrives or a timeout is reached.
This reverts commit f54f601.
@gwaramadze gwaramadze force-pushed the feature/early-exit-poc-2 branch from 78a8fd7 to 91aa9a3 Compare January 29, 2025 15:58
@daniil-quix
Copy link
Collaborator

I like the to_pandas() idea — it looks neat and returns a usable DataFrame (there is no need to initialize it upfront).
We could also pack the .collect() inside to_pandas() to simplify the API.

@daniil-quix daniil-quix closed this Feb 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants