Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
quettabit committed Jan 20, 2025
1 parent 58a0041 commit f65d174
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 77 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pip install streamstore

## Examples

`examples/streaming` directory in the [repo](https://github.com/s2-streamstore/s2-sdk-python/tree/main/examples/streaming) contain examples for streaming APIs.
`examples/` directory in the [repo](https://github.com/s2-streamstore/s2-sdk-python/tree/main/examples/) contain examples for streaming APIs.

## Get in touch

Expand Down
6 changes: 4 additions & 2 deletions examples/streaming/producer.py → examples/append_session.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import asyncio
import os
import random
from typing import AsyncIterable

from streamstore import S2
from streamstore.schemas import Record, AppendInput
from streamstore.schemas import AppendInput, Record

AUTH_TOKEN = os.getenv("S2_AUTH_TOKEN")
MY_BASIN = os.getenv("MY_BASIN")
MY_STREAM = os.getenv("MY_STREAM")


async def append_inputs():
async def append_inputs() -> AsyncIterable[AppendInput]:
num_inputs = random.randint(1, 100)
for _ in range(num_inputs):
num_records = random.randint(1, 100)
Expand Down
37 changes: 37 additions & 0 deletions examples/append_session_with_auto_batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import os
import random
from datetime import timedelta
from typing import AsyncIterable

from streamstore import S2
from streamstore.schemas import Record

AUTH_TOKEN = os.getenv("S2_AUTH_TOKEN")
MY_BASIN = os.getenv("MY_BASIN")
MY_STREAM = os.getenv("MY_STREAM")


async def records() -> AsyncIterable[Record]:
num_records = random.randint(1, 1000)
for _ in range(num_records):
body_size = random.randint(1, 1000)
if random.random() < 0.5:
await asyncio.sleep(random.random() * 2.5)
yield Record(body=os.urandom(body_size))


async def producer():
async with S2(auth_token=AUTH_TOKEN) as s2:
stream = s2[MY_BASIN][MY_STREAM]
async for output in stream.append_session_with_auto_batching(
records=records(),
max_records_per_batch=10,
max_linger_per_batch=timedelta(milliseconds=5),
):
num_appended_records = output.end_seq_num - output.start_seq_num
print(f"appended {num_appended_records} records")


if __name__ == "__main__":
asyncio.run(producer())
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os

from streamstore import S2

AUTH_TOKEN = os.getenv("S2_AUTH_TOKEN")
Expand Down
Loading

0 comments on commit f65d174

Please sign in to comment.