Skip to content

Commit

Permalink
Serialization Stabilization (confluentinc#1426)
Browse files Browse the repository at this point in the history
* tidy up json serde (confluentinc#1340)

* tidy up json serde

* to_dict doc tweaks

* serializer text tweak

* produces -> outputs

* address review comments

* minor tweak

* tidy up protobuf serde (confluentinc#1337)

* tidy up protobuf serde

* tweak sr explanation text

* tweak SerializeError text

* changes following review of JSON Schema PR

* Fixed failing test cases by changing the function names (confluentinc#1419)

* tidy up avro serde + additional polish (confluentinc#1413)

* tidy up avro serde

* SerializingProducer & DeserializingConsumer polish + other tweaks

* Deemphasise SerializingProducer and DeserializingConsumer

* Removing use of SerializingProducer and DeserializingConsumer

* Tweak examples readme

* CI error fixes

* Additional CI error fixes

* Additional CI error fix

* Fix examples

* fix flake8 error

* A bit of README.md polish (confluentinc#1424)

* A bit of README.md polish

* additional tweaks

* updates based on review feedback

Co-authored-by: Pranav Rathi <[email protected]>
  • Loading branch information
Matt Howlett and pranavrth authored Sep 14, 2022
1 parent 6f7ffd8 commit 5936d0b
Show file tree
Hide file tree
Showing 26 changed files with 810 additions and 1,146 deletions.
157 changes: 30 additions & 127 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Confluent's Python Client for Apache Kafka<sup>TM</sup>

**confluent-kafka-python** provides a high-level Producer, Consumer and AdminClient compatible with all
[Apache Kafka<sup>TM<sup>](http://kafka.apache.org/) brokers >= v0.8, [Confluent Cloud](https://www.confluent.io/confluent-cloud/)
and the [Confluent Platform](https://www.confluent.io/product/compare/). The client is:
and [Confluent Platform](https://www.confluent.io/product/compare/). The client is:

- **Reliable** - It's a wrapper around [librdkafka](https://github.com/edenhill/librdkafka) (provided automatically via binary wheels) which is widely deployed in a diverse set of production scenarios. It's tested using [the same set of system tests](https://github.com/confluentinc/confluent-kafka-python/tree/master/src/confluent_kafka/kafkatest) as the Java client [and more](https://github.com/confluentinc/confluent-kafka-python/tree/master/tests). It's supported by [Confluent](https://confluent.io).

Expand All @@ -15,23 +15,25 @@ with Apache Kafka at its core. It's high priority for us that client features ke
pace with core Apache Kafka and components of the [Confluent Platform](https://www.confluent.io/product/compare/).


See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.
## Usage

For a step-by-step guide on using the client see [Getting Started with Apache Kafka and Python](https://developer.confluent.io/get-started/python/).

Aditional examples can be found in the [examples](examples) directory or the [confluentinc/examples](https://github.com/confluentinc/examples/tree/master/clients/cloud/python) github repo, which include demonstration of:
- Exactly once data processing using the transactional API.
- Integration with asyncio.
- (De)serializing Protobuf, JSON, and Avro data with Confluent Schema Registry integration.
- [Confluent Cloud](https://www.confluent.io/confluent-cloud/) configuration.

Usage
=====
Also refer to the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html).

Below are some examples of typical usage. For more examples, see the [examples](examples) directory or the [confluentinc/examples](https://github.com/confluentinc/examples/tree/master/clients/cloud/python) github repo for a [Confluent Cloud](https://www.confluent.io/confluent-cloud/) example.
Finally, the [tests](tests) are useful as a reference for example usage.


**Producer**
### Basic Producer Example

```python
from confluent_kafka import Producer


p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})

def delivery_report(err, msg):
Expand All @@ -46,23 +48,26 @@ for data in some_data_source:
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)

# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
# Asynchronously produce a message. The delivery report callback will
# be triggered from the call to poll() above, or flush() below, when the
# message has been successfully delivered or failed permanently.
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
```

For a discussion on the poll based producer API, refer to the
[Integrating Apache Kafka With Python Asyncio Web Applications](https://www.confluent.io/blog/kafka-python-asyncio-integration/)
blog post.


**High-level Consumer**
### Basic Consumer Example

```python
from confluent_kafka import Consumer


c = Consumer({
'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
Expand All @@ -85,101 +90,8 @@ while True:
c.close()
```

**AvroProducer**

```python
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
"namespace": "my.test",
"name": "value",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""

key_schema_str = """
{
"namespace": "my.test",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}


def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


avroProducer = AvroProducer({
'bootstrap.servers': 'mybroker,mybroker2',
'on_delivery': delivery_report,
'schema.registry.url': 'http://schema_registry_host:port'
}, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()
```

**AvroConsumer**

```python
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


c = AvroConsumer({
'bootstrap.servers': 'mybroker,mybroker2',
'group.id': 'groupid',
'schema.registry.url': 'http://127.0.0.1:8081'})

c.subscribe(['my_topic'])

while True:
try:
msg = c.poll(10)

except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break

if msg is None:
continue

if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue

print(msg.value())

c.close()
```

**AdminClient**
### Basic AdminClient Example

Create topics:

Expand All @@ -205,15 +117,12 @@ for topic, f in fs.items():
```



Thread Safety
-------------
## Thread Safety

The `Producer`, `Consumer` and `AdminClient` are all thread safe.


Install
=======
## Install

**Install self-contained binary wheels**

Expand All @@ -225,17 +134,13 @@ Install
confluent-kafka using the instructions in the
"Install from source" section below.

**Install AvroProducer and AvroConsumer**

$ pip install "confluent-kafka[avro]"

**Install from source**

For source install, see the *Install from source* section in [INSTALL.md](INSTALL.md).


Broker Compatibility
====================
## Broker Compatibility

The Python client (as well as the underlying C library librdkafka) supports
all broker versions &gt;= 0.8.
But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it
Expand All @@ -257,8 +162,8 @@ More info here:
https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility


SSL certificates
================
## SSL certificates

If you're connecting to a Kafka cluster through SSL you will need to configure
the client with `'security.protocol': 'SSL'` (or `'SASL_SSL'` if SASL
authentication is used).
Expand All @@ -277,22 +182,20 @@ Python package. To use certifi, add an `import certifi` line and configure the
client's CA location with `'ssl.ca.location': certifi.where()`.



License
=======
## License

[Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0)

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use
by confluent-kafka-python. confluent-kafka-python has no affiliation with and is not endorsed by
The Apache Software Foundation.

Developer Notes
===============

## Developer Notes

Instructions on building and testing confluent-kafka-python can be found [here](DEVELOPER.md).

Confluent Cloud
===============

## Confluent Cloud

For a step-by-step guide on using the Python client with Confluent Cloud see [Getting Started with Apache Kafka and Python](https://developer.confluent.io/get-started/python/) on [Confluent Developer](https://developer.confluent.io/).
Loading

0 comments on commit 5936d0b

Please sign in to comment.