Skip to content

Commit

Permalink
Remove kafka lib dependency of OAuth token handling
Browse files Browse the repository at this point in the history
Since we moved away from kafka-python and aiokafka based clients, there
is no need to depend on either library's abstract token provider
mechanism. The confluent-kafka clients take a simple callable for token
fetching (pass-through in our case).
  • Loading branch information
Mátyás Kuti committed Feb 1, 2024
1 parent d28d9df commit c9e9e14
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 60 deletions.
2 changes: 1 addition & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def init_admin_client(self):
ssl_keyfile=self.config["ssl_keyfile"],
metadata_max_age_ms=self.config["metadata_max_age_ms"],
connections_max_idle_ms=self.config["connections_max_idle_ms"],
**get_kafka_client_auth_parameters_from_config(self.config, async_client=False),
**get_kafka_client_auth_parameters_from_config(self.config),
)
break
except: # pylint: disable=bare-except
Expand Down
32 changes: 4 additions & 28 deletions karapace/kafka_rest_apis/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
"""
from __future__ import annotations

from aiokafka.abc import AbstractTokenProvider as AbstractTokenProviderAsync
from http import HTTPStatus
from kafka.oauth.abstract import AbstractTokenProvider
from karapace.config import Config
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE
from typing import NoReturn, TypedDict
Expand Down Expand Up @@ -115,46 +113,25 @@ def get_expiration_time_from_header(auth_header: str) -> datetime.datetime | Non


@dataclasses.dataclass
class SimpleOauthTokenProvider(AbstractTokenProvider):
class SimpleOauthTokenProvider:
"""A pass-through OAuth token provider to be used by synchronous Kafka clients.
The token is meant to be extracted from an HTTP Authorization header.
"""

_token: str = dataclasses.field(repr=False)

def token(self) -> str:
return self._token

def token_with_expiry(self, _config: str | None = None) -> tuple[str, int | None]:
return (self._token, get_expiration_timestamp_from_jwt(self._token))


@dataclasses.dataclass
class SimpleOauthTokenProviderAsync(AbstractTokenProviderAsync):
"""A pass-through OAuth token provider to be used by asynchronous Kafka clients.
The token is meant to be extracted from an HTTP Authorization header.
"""

_token: str = dataclasses.field(repr=False)

async def token(self) -> str:
return self._token

def token_with_expiry(self, _config: str | None = None) -> tuple[str, int | None]:
return (self._token, get_expiration_timestamp_from_jwt(self._token))


class SASLOauthParams(TypedDict):
sasl_mechanism: str
sasl_oauth_token_provider: AbstractTokenProvider | AbstractTokenProviderAsync
sasl_oauth_token_provider: SimpleOauthTokenProvider


def get_kafka_client_auth_parameters_from_config(
config: Config,
*,
async_client: bool = True,
) -> SASLPlainConfig | SASLOauthParams:
"""Create authentication parameters for a Kafka client based on the Karapace config.
Expand All @@ -163,13 +140,12 @@ def get_kafka_client_auth_parameters_from_config(
decides whether this will be a sync or async one.
:param config: Current config of Karapace
:param async_client: Flag to indicate whether the Kafka client using the returned paramaters is async
"""
if config["sasl_mechanism"] == "OAUTHBEARER":
token_provider_cls = SimpleOauthTokenProviderAsync if async_client else SimpleOauthTokenProvider
assert config["sasl_oauth_token"] is not None, "Config missing `sasl_oauth_token` with OAUTHBEARER `sasl_mechanism`"
return {
"sasl_mechanism": config["sasl_mechanism"],
"sasl_oauth_token_provider": token_provider_cls(config["sasl_oauth_token"]),
"sasl_oauth_token_provider": SimpleOauthTokenProvider(config["sasl_oauth_token"]),
}

return {
Expand Down
36 changes: 5 additions & 31 deletions tests/unit/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
SimpleOauthTokenProvider,
SimpleOauthTokenProviderAsync,
)
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE

Expand Down Expand Up @@ -102,11 +101,6 @@ def test_get_expiration_time_from_header_malformed_bearer_token_raises_unauthori
_assert_unauthorized_http_response(exc_info.value)


def test_simple_oauth_token_provider_returns_configured_token() -> None:
token_provider = SimpleOauthTokenProvider("TOKEN")
assert token_provider.token() == "TOKEN"


def test_simple_oauth_token_provider_returns_configured_token_and_expiry() -> None:
expiry_timestamp = 1697013997
token = jwt.encode({"exp": expiry_timestamp}, "secret")
Expand All @@ -115,19 +109,6 @@ def test_simple_oauth_token_provider_returns_configured_token_and_expiry() -> No
assert token_provider.token_with_expiry() == (token, expiry_timestamp)


async def test_simple_oauth_token_provider_async_returns_configured_token() -> None:
token_provider = SimpleOauthTokenProviderAsync("TOKEN")
assert await token_provider.token() == "TOKEN"


def test_simple_oauth_token_provider_async_returns_configured_token_and_expiry() -> None:
expiry_timestamp = 1697013997
token = jwt.encode({"exp": expiry_timestamp}, "secret")
token_provider = SimpleOauthTokenProviderAsync(token)

assert token_provider.token_with_expiry() == (token, expiry_timestamp)


def test_get_client_auth_parameters_from_config_sasl_plain() -> None:
config = set_config_defaults(
{"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"}
Expand All @@ -143,18 +124,11 @@ def test_get_client_auth_parameters_from_config_sasl_plain() -> None:


def test_get_client_auth_parameters_from_config_oauth() -> None:
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"})

client_auth_params = get_kafka_client_auth_parameters_from_config(config, async_client=False)

assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER"
assert client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN"


async def test_get_client_auth_parameters_from_config_oauth_async() -> None:
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"})
expiry_timestamp = 1697013997
token = jwt.encode({"exp": expiry_timestamp}, "secret")
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": token})

client_auth_params = get_kafka_client_auth_parameters_from_config(config, async_client=True)
client_auth_params = get_kafka_client_auth_parameters_from_config(config)

assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER"
assert await client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN"
assert client_auth_params["sasl_oauth_token_provider"].token_with_expiry() == (token, expiry_timestamp)

0 comments on commit c9e9e14

Please sign in to comment.