Skip to content

Commit

Permalink
Bump up dependency versions and flush producer on Queue.enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
joowani committed Aug 31, 2020
1 parent 37f0d82 commit 52e681d
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 25 deletions.
21 changes: 10 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
sudo: false
language: python
matrix:
include:
- python: 3.5
- python: 3.6
- python: 3.7
dist: xenial
sudo: true
python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"
services:
- docker
before_install:
- docker run --name kafka -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 spotify/kafka
install:
- pip install flake8 mock
- pip install pytest==3.5.1
- pip install pytest-cov==2.5.1
- pip install python-coveralls==2.9.1
- pip install pytest==4.6.11
- pip install pytest-cov==2.10.0
- pip install coveralls==2.0.0
- pip install mock
- pip install flake8
- pip install sphinx sphinx_rtd_theme
- pip install .
script:
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ KQ: Kafka-based Job Queue for Python
:target: https://badge.fury.io/py/kq
:alt: Package Version

.. image:: https://img.shields.io/badge/python-3.5%2C%203.6%2C%203.7-blue.svg
.. image:: https://img.shields.io/badge/python-3.5%2C%203.6%2C%203.7%2C%203.8-blue.svg
:target: https://github.com/joowani/kq
:alt: Python Versions

Expand Down
6 changes: 6 additions & 0 deletions example/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic')

print('Starting consumer...')
for msg in consumer:
print(msg)
6 changes: 6 additions & 0 deletions example/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

for _ in range(10000):
producer.send('my_topic', b'message')
# producer.flush()
16 changes: 16 additions & 0 deletions example/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from kafka import KafkaProducer
from kq import Queue


def add(a, b):
return a + b


# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='topic', producer=producer)

# Enqueue a function call.
job = queue.enqueue(add, 1, 2)
2 changes: 1 addition & 1 deletion example/worker-cli.py → example/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def deserializer(serialized):
consumer = KafkaConsumer(
bootstrap_servers='127.0.0.1:9092',
group_id='group',
enable_auto_commit=False,
enable_auto_commit=True,
auto_offset_reset='latest'
)
worker = Worker(
Expand Down
15 changes: 8 additions & 7 deletions kq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class EnqueueSpec(object):
'_logger',
'_timeout',
'_key',
'_part',
'_partition',
'delay'
]

Expand All @@ -294,7 +294,7 @@ def __init__(self,
self._logger = logger
self._timeout = timeout
self._key = key
self._part = partition
self._partition = partition

def enqueue(self, obj, *args, **kwargs):
"""Enqueue a function call or :doc:`job` instance.
Expand All @@ -318,15 +318,15 @@ def enqueue(self, obj, *args, **kwargs):
kwargs = {} if obj.kwargs is None else obj.kwargs
timeout = self._timeout if obj.timeout is None else obj.timeout
key = self._key if obj.key is None else obj.key
partition = self._part if obj.partition is None else obj.partition
part = self._partition if obj.partition is None else obj.partition

assert is_str(job_id), 'Job.id must be a str'
assert callable(func), 'Job.func must be a callable'
assert is_iter(args), 'Job.args must be a list or tuple'
assert is_dict(kwargs), 'Job.kwargs must be a dict'
assert is_number(timeout), 'Job.timeout must be an int or float'
assert is_none_or_bytes(key), 'Job.key must be a bytes'
assert is_none_or_int(partition), 'Job.partition must be an int'
assert is_none_or_int(part), 'Job.partition must be an int'
else:
assert callable(obj), 'first argument must be a callable'
job_id = uuid.uuid4().hex
Expand All @@ -335,7 +335,7 @@ def enqueue(self, obj, *args, **kwargs):
kwargs = kwargs
timeout = self._timeout
key = self._key
partition = self._part
part = self._partition

job = Job(
id=job_id,
Expand All @@ -346,14 +346,15 @@ def enqueue(self, obj, *args, **kwargs):
kwargs=kwargs,
timeout=timeout,
key=key,
partition=partition
partition=part
)
self._logger.info('Enqueueing {} ...'.format(job))
self._producer.send(
self._topic,
value=self._serializer(job),
key=self._serializer(key) if key else None,
partition=partition,
partition=part,
timestamp_ms=timestamp
)
self._producer.flush()
return job
2 changes: 1 addition & 1 deletion kq/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.0.0'
__version__ = '2.0.1'
4 changes: 4 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[pytest]
python_files = tests.py test_*.py *_tests.py
addopts = -s -vv -p no:warnings
norecursedirs = venv htmlcov build dist .idea .git kq.egg-info
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
include_package_data=True,
license='MIT',
install_requires=[
'dill>=0.2.5',
'kafka-python>=1.3.1',
'dill>=0.3.2',
'kafka-python>=2.0.0',
],
tests_require=['pytest', 'mock', 'flake8', 'tinydb'],
tests_require=['pytest', 'mock', 'flake8'],
classifiers=[
'Intended Audience :: Developers',
'Intended Audience :: End Users/Desktop',
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def lines(self):
time.sleep(0.5)
with open(log_file, 'r') as fp:
lines = fp.read().splitlines()
return [l for l in lines if l.startswith('[')]
return [line for line in lines if line.startswith('[')]

@property
def last_line(self):
Expand Down

0 comments on commit 52e681d

Please sign in to comment.