diff --git a/.travis.yml b/.travis.yml index 4d4d237..fe2a417 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,21 +1,20 @@ sudo: false language: python -matrix: - include: - - python: 3.5 - - python: 3.6 - - python: 3.7 - dist: xenial - sudo: true +python: + - "3.5" + - "3.6" + - "3.7" + - "3.8" services: - docker before_install: - docker run --name kafka -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 spotify/kafka install: - - pip install flake8 mock - - pip install pytest==3.5.1 - - pip install pytest-cov==2.5.1 - - pip install python-coveralls==2.9.1 + - pip install pytest==4.6.11 + - pip install pytest-cov==2.10.0 + - pip install coveralls==2.0.0 + - pip install mock + - pip install flake8 - pip install sphinx sphinx_rtd_theme - pip install . script: diff --git a/README.rst b/README.rst index 5cc1da1..729c416 100644 --- a/README.rst +++ b/README.rst @@ -13,7 +13,7 @@ KQ: Kafka-based Job Queue for Python :target: https://badge.fury.io/py/kq :alt: Package Version -.. image:: https://img.shields.io/badge/python-3.5%2C%203.6%2C%203.7-blue.svg +.. image:: https://img.shields.io/badge/python-3.5%2C%203.6%2C%203.7%2C%203.8-blue.svg :target: https://github.com/joowani/kq :alt: Python Versions diff --git a/example/consumer.py b/example/consumer.py new file mode 100644 index 0000000..6290275 --- /dev/null +++ b/example/consumer.py @@ -0,0 +1,6 @@ +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_topic') + +print('Starting consumer...') +for msg in consumer: + print(msg) diff --git a/example/producer.py b/example/producer.py new file mode 100644 index 0000000..874b75d --- /dev/null +++ b/example/producer.py @@ -0,0 +1,6 @@ +from kafka import KafkaProducer +producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + +for _ in range(10000): + producer.send('my_topic', b'message') + # producer.flush() diff --git a/example/queue.py b/example/queue.py new file mode 100644 index 0000000..6278fe6 --- /dev/null +++ b/example/queue.py @@ -0,0 +1,16 @@ +from kafka import KafkaProducer +from kq import Queue + + +def add(a, b): + return a + b + + +# Set up a Kafka producer. +producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + +# Set up a queue. +queue = Queue(topic='topic', producer=producer) + +# Enqueue a function call. +job = queue.enqueue(add, 1, 2) diff --git a/example/worker-cli.py b/example/worker.py similarity index 98% rename from example/worker-cli.py rename to example/worker.py index 078db65..1356c6f 100644 --- a/example/worker-cli.py +++ b/example/worker.py @@ -78,7 +78,7 @@ def deserializer(serialized): consumer = KafkaConsumer( bootstrap_servers='127.0.0.1:9092', group_id='group', - enable_auto_commit=False, + enable_auto_commit=True, auto_offset_reset='latest' ) worker = Worker( diff --git a/kq/queue.py b/kq/queue.py index 33bcca1..bd16292 100644 --- a/kq/queue.py +++ b/kq/queue.py @@ -272,7 +272,7 @@ class EnqueueSpec(object): '_logger', '_timeout', '_key', - '_part', + '_partition', 'delay' ] @@ -294,7 +294,7 @@ def __init__(self, self._logger = logger self._timeout = timeout self._key = key - self._part = partition + self._partition = partition def enqueue(self, obj, *args, **kwargs): """Enqueue a function call or :doc:`job` instance. @@ -318,7 +318,7 @@ def enqueue(self, obj, *args, **kwargs): kwargs = {} if obj.kwargs is None else obj.kwargs timeout = self._timeout if obj.timeout is None else obj.timeout key = self._key if obj.key is None else obj.key - partition = self._part if obj.partition is None else obj.partition + part = self._partition if obj.partition is None else obj.partition assert is_str(job_id), 'Job.id must be a str' assert callable(func), 'Job.func must be a callable' @@ -326,7 +326,7 @@ def enqueue(self, obj, *args, **kwargs): assert is_dict(kwargs), 'Job.kwargs must be a dict' assert is_number(timeout), 'Job.timeout must be an int or float' assert is_none_or_bytes(key), 'Job.key must be a bytes' - assert is_none_or_int(partition), 'Job.partition must be an int' + assert is_none_or_int(part), 'Job.partition must be an int' else: assert callable(obj), 'first argument must be a callable' job_id = uuid.uuid4().hex @@ -335,7 +335,7 @@ def enqueue(self, obj, *args, **kwargs): kwargs = kwargs timeout = self._timeout key = self._key - partition = self._part + part = self._partition job = Job( id=job_id, @@ -346,14 +346,15 @@ def enqueue(self, obj, *args, **kwargs): kwargs=kwargs, timeout=timeout, key=key, - partition=partition + partition=part ) self._logger.info('Enqueueing {} ...'.format(job)) self._producer.send( self._topic, value=self._serializer(job), key=self._serializer(key) if key else None, - partition=partition, + partition=part, timestamp_ms=timestamp ) + self._producer.flush() return job diff --git a/kq/version.py b/kq/version.py index afced14..3f39079 100644 --- a/kq/version.py +++ b/kq/version.py @@ -1 +1 @@ -__version__ = '2.0.0' +__version__ = '2.0.1' diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..c7b487d --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +python_files = tests.py test_*.py *_tests.py +addopts = -s -vv -p no:warnings +norecursedirs = venv htmlcov build dist .idea .git kq.egg-info diff --git a/setup.py b/setup.py index 8f02379..4afe4d4 100644 --- a/setup.py +++ b/setup.py @@ -19,10 +19,10 @@ include_package_data=True, license='MIT', install_requires=[ - 'dill>=0.2.5', - 'kafka-python>=1.3.1', + 'dill>=0.3.2', + 'kafka-python>=2.0.0', ], - tests_require=['pytest', 'mock', 'flake8', 'tinydb'], + tests_require=['pytest', 'mock', 'flake8'], classifiers=[ 'Intended Audience :: Developers', 'Intended Audience :: End Users/Desktop', diff --git a/tests/conftest.py b/tests/conftest.py index 2e966ef..2e512f7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -118,7 +118,7 @@ def lines(self): time.sleep(0.5) with open(log_file, 'r') as fp: lines = fp.read().splitlines() - return [l for l in lines if l.startswith('[')] + return [line for line in lines if line.startswith('[')] @property def last_line(self):