Skip to content

Commit

Permalink
Merge branch 'master' into RDO-2-all-errors-to-file-sd-ks
Browse files Browse the repository at this point in the history
  • Loading branch information
John-Memphis committed Jan 15, 2024
2 parents cb00c9f + 8a3c3a4 commit 9197b9c
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 27 deletions.
4 changes: 2 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ def gitBranch = env.BRANCH_NAME
def gitURL = "[email protected]:Memphisdev/memphis.py.git"
def repoUrlPrefix = "memphisos"

node ("small-ec2-fleet") {
node ("memphis-jenkins-small-fleet-agent") {
git credentialsId: 'main-github', url: gitURL, branch: gitBranch

if (env.BRANCH_NAME ==~ /(master)/) {
Expand Down Expand Up @@ -33,7 +33,7 @@ node ("small-ec2-fleet") {
python3 setup.py sdist
"""
withCredentials([usernamePassword(credentialsId: 'python_sdk', usernameVariable: 'USR', passwordVariable: 'PSW')]) {
sh '/home/ec2-user/.local/bin/twine upload -u $USR -p $PSW dist/*'
sh '~/.local/bin/twine upload -u $USR -p $PSW dist/*'
}
}

Expand Down
42 changes: 35 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<a href="![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)">![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)</a>
<a href="![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)">[![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)](https://memphis.dev)</a>
<p align="center">
<a href="https://memphis.dev/discord"><img src="https://img.shields.io/discord/963333392844328961?color=6557ff&label=discord" alt="Discord"></a>
<a href="https://github.com/memphisdev/memphis/issues?q=is%3Aissue+is%3Aclosed"><img src="https://img.shields.io/github/issues-closed/memphisdev/memphis?color=6557ff"></a>
Expand Down Expand Up @@ -803,7 +803,7 @@ consumer = await memphis.consumer(
consumer_group="<group-name>", # defaults to the consumer name
pull_interval_ms=1000, # defaults to 1000
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
batch_max_time_to_wait_ms=100, # defaults to 100
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=2, # defaults to 2
start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1
Expand Down Expand Up @@ -866,7 +866,7 @@ Here is an example of a consumer that will try to poll 100 messages every 10 sec
consumer_name = "new_consumer",
pull_interval_ms = 10000,
batch_size = 100,
batch_max_time_to_wait_ms = 15000
batch_max_time_to_wait_ms = 100
)
```

Expand All @@ -882,7 +882,7 @@ The max_msg_deliveries parameter allows the user how many messages the consumer
consumer_name = "new_consumer",
pull_interval_ms = 10000,
batch_size = 100,
batch_max_time_to_wait_ms = 15000,
batch_max_time_to_wait_ms = 100,
max_msg_deliveries = 2
)
```
Expand Down Expand Up @@ -950,7 +950,7 @@ msgs = await memphis.fetch_messages(
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
batch_max_time_to_wait_ms=100, # defaults to 100
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=2, # defaults to 2
start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1
Expand All @@ -961,12 +961,13 @@ msgs = await memphis.fetch_messages(
```

### Fetch a single batch of messages after creating a consumer

```python
msgs = await consumer.fetch(batch_size=10) # defaults to 10
```

### Fetch a single batch of messages after creating a consumer
`prefetch = true` will prefetch next batch of messages and save it in memory for future fetch() request<br>

```python
msgs = await consumer.fetch(batch_size=10, prefetch=True) # defaults to False
```
Expand All @@ -979,6 +980,23 @@ Acknowledge a message indicates the Memphis server to not re-send the same messa
await message.ack()
```

### Nacking a Message

Mark the message as not acknowledged - the broker will resend the message immediately to the same consumers group, instead of waiting to the max ack time configured.

```python
await message.nack();
```

### Sending a message to the dead-letter

Sending the message to the dead-letter station (DLS) - the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
The message will still be available to other consumer groups

```python
await message.dead_letter("reason");
```

### Delay the message after a given duration

Delay the message and tell Memphis server to re-send the same message again to the same consumer group. The message will be redelivered only in case `consumer.max_msg_deliveries` is not reached yet.
Expand All @@ -987,20 +1005,30 @@ Delay the message and tell Memphis server to re-send the same message again to t
await message.delay(delay_in_seconds)
```

### Get headers
### Get headers

Get headers per message

```python
headers = message.get_headers()
```

### Get message sequence number

Get message sequence number

```python
sequence_number = msg.get_sequence_number()
```

### Get message time sent

Get message time sent

```python
time_sent = msg.get_timesent()
```

### Destroying a Consumer

```python
Expand Down
10 changes: 5 additions & 5 deletions memphis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
self.consumer_group = consumer_group.lower()
self.pull_interval_ms = pull_interval_ms
self.batch_size = batch_size
self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000
self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 100 else 100
self.max_ack_time_ms = max_ack_time_ms
self.max_msg_deliveries = max_msg_deliveries
self.ping_consumer_interval_ms = 30000
Expand All @@ -50,13 +50,13 @@ def __init__(
self.dls_messages = []
self.dls_current_index = 0
self.dls_callback_func = None
self.t_dls = asyncio.create_task(self.__consume_dls())
self.t_consume = None
self.inner_station_name = get_internal_name(self.station_name)
self.subscriptions = subscriptions
self.partition_generator = partition_generator
self.cached_messages = []
self.loading_thread = None
self.t_dls = asyncio.create_task(self.__consume_dls())


def set_context(self, context):
Expand Down Expand Up @@ -125,7 +125,7 @@ async def __consume(self, callback, partition_key: str = None, consumer_partitio

for msg in msgs:
memphis_messages.append(
Message(msg, self.connection, self.consumer_group, self.internal_station_name)
Message(msg, self.connection, self.consumer_group, self.internal_station_name, partition=partition_number)
)
await callback(memphis_messages, None, self.context)
await asyncio.sleep(self.pull_interval_ms / 1000)
Expand All @@ -146,7 +146,7 @@ async def __consume_dls(self):
subject = get_internal_name(self.station_name)
consumer_group = get_internal_name(self.consumer_group)
try:
subscription_name = "$memphis_dls_" + subject + "_" + consumer_group
subscription_name = "$memphis_dls_" + subject + "." + consumer_group
self.consumer_dls = await self.connection.broker_manager.subscribe(
subscription_name, subscription_name
)
Expand Down Expand Up @@ -254,7 +254,7 @@ async def main(host, username, password, station):
msgs = await self.subscriptions[partition_number].fetch(batch_size)
for msg in msgs:
messages.append(
Message(msg, self.connection, self.consumer_group, self.internal_station_name))
Message(msg,self.connection,self.consumer_group,self.internal_station_name,partition=partition_number))
if prefetch:
number_of_messages_to_prefetch = batch_size * 2
self.load_messages_to_cache(number_of_messages_to_prefetch, partition_number)
Expand Down
18 changes: 10 additions & 8 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,10 @@ async def _single_station_producer(
"station_name": station_name,
"connection_id": self.connection_id,
"producer_type": "application",
"req_version": 3,
"req_version": 4,
"username": self.username,
"app_id": app_id,
"sdk_lang": "python"
}
create_producer_req_bytes = json.dumps(create_producer_req, indent=2).encode(
"utf-8"
Expand Down Expand Up @@ -654,7 +655,7 @@ async def consumer(
consumer_group: str = "",
pull_interval_ms: int = 1000,
batch_size: int = 10,
batch_max_time_to_wait_ms: int = 5000,
batch_max_time_to_wait_ms: int = 100,
max_ack_time_ms: int = 30000,
max_msg_deliveries: int = 2,
generate_random_suffix: bool = False,
Expand All @@ -669,7 +670,7 @@ async def consumer(
consumer_group (str, optional): consumer group name. Defaults to the consumer name.
pull_interval_ms (int, optional): interval in milliseconds between pulls. Defaults to 1000.
batch_size (int, optional): pull batch size. Defaults to 10.
batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000. The lowest value is 1000(1 second), and if the value is lower than 1000, it will be set to 1000.
batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 100. The lowest value is 100, and if the value is lower than 100, it will be set to 100.
max_ack_time_ms (int, optional): max time for ack a message in milliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000.
max_msg_deliveries (int, optional): max number of message deliveries, by default is 2.
generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to consumer's name
Expand Down Expand Up @@ -707,9 +708,10 @@ async def consumer(
"max_msg_deliveries": max_msg_deliveries,
"start_consume_from_sequence": start_consume_from_sequence,
"last_messages": last_messages,
"req_version": 3,
"req_version": 4,
"username": self.username,
"app_id": app_id,
"sdk_lang":"python"
}

create_consumer_req_bytes = json.dumps(create_consumer_req, indent=2).encode(
Expand Down Expand Up @@ -767,7 +769,7 @@ async def consumer(
cg,
pull_interval_ms,
batch_size,
batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000,
batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 100 else 100,
max_ack_time_ms,
max_msg_deliveries,
start_consume_from_sequence=start_consume_from_sequence,
Expand Down Expand Up @@ -894,7 +896,7 @@ async def fetch_messages(
consumer_name: str,
consumer_group: str = "",
batch_size: int = 10,
batch_max_time_to_wait_ms: int = 5000,
batch_max_time_to_wait_ms: int = 100,
max_ack_time_ms: int = 30000,
max_msg_deliveries: int = 2,
generate_random_suffix: bool = False,
Expand All @@ -910,7 +912,7 @@ async def fetch_messages(
consumer_name (str): name for the consumer.
consumer_group (str, optional): consumer group name. Defaults to the consumer name.
batch_size (int, optional): pull batch size. Defaults to 10.
batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000. The lowest value is 1000(1 second), and if the value is lower than 1000, it will be set to 1000.
batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 100. The lowest value is 100, and if the value is lower than 100, it will be set to 100.
max_ack_time_ms (int, optional): max time for ack a message in milliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000.
max_msg_deliveries (int, optional): max number of message deliveries, by default is 2.
generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to consumer's name
Expand Down Expand Up @@ -940,7 +942,7 @@ async def fetch_messages(
consumer_name=consumer_name,
consumer_group=consumer_group,
batch_size=batch_size,
batch_max_time_to_wait_ms=batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000,
batch_max_time_to_wait_ms=batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 100 else 100,
max_ack_time_ms=max_ack_time_ms,
max_msg_deliveries=max_msg_deliveries,
generate_random_suffix=generate_random_suffix,
Expand Down
43 changes: 42 additions & 1 deletion memphis/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from memphis.station import Station

class Message:
def __init__(self, message, connection, cg_name, internal_station_name):
def __init__(self, message, connection, cg_name, internal_station_name, partition = 0):
self.message = message
self.connection = connection
self.cg_name = cg_name
self.internal_station_name = internal_station_name
self.partition = partition
self.station = Station(connection, internal_station_name)

async def ack(self):
Expand All @@ -37,6 +38,38 @@ async def ack(self):
raise MemphisConnectError(str(e)) from e
return

async def nack(self):
"""
nack - not ack for a message, meaning that the message will be redelivered again to the same consumers group without waiting to its ack wait time.
"""
if not hasattr(self.message, 'nak'):
return
await self.message.nak()

async def dead_letter(self, reason: str):
"""
dead_letter - Sending the message to the dead-letter station (DLS). the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
The message will still be available to other consumer groups
"""
try:
if not hasattr(self.message, 'term'):
return
await self.message.term()
md = self.message.metadata()
stream_seq = md.sequence.stream
request = {
"station_name": self.internal_station_name,
"error": reason,
"partition": self.partition,
"cg_name": self.cg_name,
"seq": stream_seq,
}
await self.connection.broker_manager.publish(
"$memphis_nacked_dls", json.dumps(request).encode("utf-8")
)
except Exception as e:
raise MemphisConnectError(str(e)) from e

def get_data(self):
"""Receive the message."""
try:
Expand Down Expand Up @@ -86,6 +119,14 @@ def get_sequence_number(self):
except Exception:
return

def get_timesent(self):
"""Get timestamp when the message was sent."""
try:
md = self.message.metadata()
return md.timestamp
except Exception:
return

async def delay(self, delay):
"""Delay and resend the message after delay seconds"""
if (
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
setup(
name="memphis-py",
packages=["memphis"],
version="1.2.1",
version="1.3.0",
license="Apache-2.0",
description="A powerful messaging platform for modern developers",
long_description=long_description,
Expand All @@ -17,7 +17,7 @@
author="Memphis.dev",
author_email="[email protected]",
url="https://github.com/memphisdev/memphis.py",
download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.2.1.tar.gz",
download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.3.0.tar.gz",
keywords=["message broker", "devtool", "streaming", "data"],
install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core", "fastavro", "mmh3"],
classifiers=[
Expand Down
2 changes: 1 addition & 1 deletion version-beta.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.17
1.1.20
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.1
1.3.0

0 comments on commit 9197b9c

Please sign in to comment.