Skip to content

Commit

Permalink
fix: run Karapace as a module, __main__
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Nov 29, 2024
1 parent 232cd94 commit 81e14d3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 109 deletions.
184 changes: 88 additions & 96 deletions src/karapace/karapace_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

# from aiohttp.web_log import AccessLogger
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
Expand All @@ -19,21 +17,19 @@
from karapace.content_type import check_schema_headers
from karapace.dependencies.config_dependency import ConfigDependencyManager
from karapace.dependencies.schema_registry_dependency import SchemaRegistryDependencyManager
from karapace.dependencies.stats_dependeny import StatsDependencyManager
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from karapace.routers.errors import KarapaceValidationError
from karapace.schema_registry import KarapaceSchemaRegistry
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.requests import Request as StarletteHTTPRequest
from typing import Final

import logging
import sys
import uvicorn

# from karapace.kafka_rest_apis import KafkaRest
# from karapace.utils import DebugAccessLogger


# class KarapaceAll(KafkaRest, KarapaceSchemaRegistryController):
# pass


def _configure_logging(*, config: Config) -> None:
Expand Down Expand Up @@ -61,24 +57,6 @@ def _configure_logging(*, config: Config) -> None:
logging.getLogger("uvicorn.error").setLevel(config.log_level)


# if config.access_logs_debug is True:
# config["access_log_class"] = DebugAccessLogger
# logging.getLogger("aiohttp.access").setLevel(logging.DEBUG)
# else:
# config["access_log_class"] = AccessLogger

CONFIG = ConfigDependencyManager.get_config()
_configure_logging(config=CONFIG)

# config_without_secrets = {}
# for key, value in config.items():
# if "password" in key:
# value = "****"
# config_without_secrets[key] = value
# logging.log(logging.DEBUG, "Config %r", config_without_secrets)
logging.log(logging.INFO, "Karapace version %s", karapace_version)


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
schema_registry: KarapaceSchemaRegistry | None = None
Expand All @@ -88,8 +66,8 @@ async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
await schema_registry.start()
await schema_registry.get_master()
authorizer = AuthorizationDependencyManager.get_authorizer()
# if authorizer is not None:
# await authorizer.start(StatsDependencyManager.get_stats())
if authorizer is not None:
await authorizer.start(StatsDependencyManager.get_stats())
yield
finally:
if schema_registry:
Expand All @@ -98,76 +76,87 @@ async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
await authorizer.close()


app = FastAPI(lifespan=lifespan)


@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(_: StarletteHTTPRequest, exc: StarletteHTTPException):
return JSONResponse(status_code=exc.status_code, content=exc.detail)


@app.exception_handler(RequestValidationError)
async def validation_exception_handler(_: StarletteHTTPRequest, exc: RequestValidationError):
error_code = HTTPStatus.UNPROCESSABLE_ENTITY.value
if isinstance(exc, KarapaceValidationError):
error_code = exc.error_code
message = exc.body
else:
message = exc.errors()
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error_code": error_code,
"message": message,
},
)


@app.middleware("http")
async def set_content_types(request: Request, call_next):
try:
response_content_type = check_schema_headers(request)
except HTTPException as exc:
def create_karapace_application(*, config: Config) -> FastAPI:
# TODO: this lifespan is SR related lifespan
app = FastAPI(lifespan=lifespan)
_configure_logging(config=config)

config_without_secrets = {}
for key, value in config.dict().items():
if "password" in key:
value = "****"
elif "keyfile" in key:
value = "****"
config_without_secrets[key] = value
logging.log(logging.DEBUG, "Config %r", config_without_secrets)
logging.log(logging.INFO, "Karapace version %s", karapace_version)

@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(_: StarletteHTTPRequest, exc: StarletteHTTPException):
return JSONResponse(status_code=exc.status_code, content=exc.detail)

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(_: StarletteHTTPRequest, exc: RequestValidationError):
error_code = HTTPStatus.UNPROCESSABLE_ENTITY.value
if isinstance(exc, KarapaceValidationError):
error_code = exc.error_code
message = exc.body
else:
message = exc.errors()
return JSONResponse(
status_code=exc.status_code,
headers=exc.headers,
content=exc.detail,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error_code": error_code,
"message": message,
},
)

# Schema registry supports application/octet-stream, assumption is JSON object body.
# Force internally to use application/json in this case for compatibility.
if request.headers.get("Content-Type") == "application/octet-stream":
new_headers = request.headers.mutablecopy()
new_headers["Content-Type"] = "application/json"
request._headers = new_headers
request.scope.update(headers=request.headers.raw)

response = await call_next(request)
response.headers["Content-Type"] = response_content_type
return response


if CONFIG.karapace_registry:
from karapace.routers.compatibility_router import compatibility_router
from karapace.routers.config_router import config_router
from karapace.routers.health_router import health_router
from karapace.routers.master_available_router import master_availability_router
from karapace.routers.mode_router import mode_router
from karapace.routers.root_router import root_router
from karapace.routers.schemas_router import schemas_router
from karapace.routers.subjects_router import subjects_router

app.include_router(compatibility_router)
app.include_router(config_router)
app.include_router(health_router)
app.include_router(master_availability_router)
app.include_router(mode_router)
app.include_router(root_router)
app.include_router(schemas_router)
app.include_router(subjects_router)
if CONFIG.karapace_rest:
# add rest router.
pass
@app.middleware("http")
async def set_content_types(request: Request, call_next):
try:
response_content_type = check_schema_headers(request)
except HTTPException as exc:
return JSONResponse(
status_code=exc.status_code,
headers=exc.headers,
content=exc.detail,
)

# Schema registry supports application/octet-stream, assumption is JSON object body.
# Force internally to use application/json in this case for compatibility.
if request.headers.get("Content-Type") == "application/octet-stream":
new_headers = request.headers.mutablecopy()
new_headers["Content-Type"] = "application/json"
request._headers = new_headers
request.scope.update(headers=request.headers.raw)

response = await call_next(request)
response.headers["Content-Type"] = response_content_type
return response

if config.karapace_registry:
from karapace.routers.compatibility_router import compatibility_router
from karapace.routers.config_router import config_router
from karapace.routers.health_router import health_router
from karapace.routers.master_available_router import master_availability_router
from karapace.routers.mode_router import mode_router
from karapace.routers.root_router import root_router
from karapace.routers.schemas_router import schemas_router
from karapace.routers.subjects_router import subjects_router

app.include_router(compatibility_router)
app.include_router(config_router)
app.include_router(health_router)
app.include_router(master_availability_router)
app.include_router(mode_router)
app.include_router(root_router)
app.include_router(schemas_router)
app.include_router(subjects_router)
if config.karapace_rest:
# add rest router.
pass

return app


def __old_main() -> int:
Expand All @@ -180,5 +169,8 @@ def __old_main() -> int:
return 0


# if __name__ == "__main__":
# sys.exit(main())
CONFIG: Final = ConfigDependencyManager.get_config()

if __name__ == "__main__":
app = create_karapace_application(config=CONFIG)
uvicorn.run(app, host=CONFIG.host, port=CONFIG.port, log_level=CONFIG.log_level.lower())
2 changes: 1 addition & 1 deletion tests/integration/test_karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ def test_regression_server_must_exit_on_exception(
errfile = stack.enter_context((tmp_path / "karapace.err").open("w"))

write_env_file(dot_env_path=env_path, config=config)
process = popen_karapace_all(host=config.host, port=port, env_path=env_path, stdout=logfile, stderr=errfile)
process = popen_karapace_all(env_path=env_path, stdout=logfile, stderr=errfile)
stack.callback(stop_process, process) # make sure to stop the process if the test fails
assert process.wait(timeout=10) != 0, "Process should have exited with an error, port is already is use"
2 changes: 1 addition & 1 deletion tests/integration/utils/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def start_schema_registry_cluster(

logfile = stack.enter_context(open(log_path, "w"))
errfile = stack.enter_context(open(error_path, "w"))
process = popen_karapace_all(host=host, port=port, env_path=env_path, stdout=logfile, stderr=errfile)
process = popen_karapace_all(env_path=env_path, stdout=logfile, stderr=errfile)
stack.callback(stop_process, process)
all_processes.append(process)

Expand Down
13 changes: 2 additions & 11 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,23 +344,14 @@ def python_exe() -> str:
return python


def fastapi_executable() -> str:
if os.environ.get("VIRTUAL_ENV"):
return os.environ["VIRTUAL_ENV"] + "/bin/fastapi"
else:
# TODO: Is this feasible, run which to find fastapi if not in virtualenv?
return os.popen("which fastapi").read()


def popen_karapace_all(*, host: str, port: int, env_path: Union[Path, str], stdout: IO, stderr: IO, **kwargs) -> Popen:
def popen_karapace_all(*, env_path: Union[Path, str], stdout: IO, stderr: IO, **kwargs) -> Popen:
kwargs["stdout"] = stdout
kwargs["stderr"] = stderr
return Popen(
[fastapi_executable(), "run", "--host", host, "--port", str(port), "src/karapace/karapace_all.py"],
[python_exe(), "-m", "karapace.karapace_all"],
env={"KARAPACE_DOTENV": str(env_path)},
**kwargs,
)
# return Popen([python_exe(), "-m", "karapace.karapace_all", str(config_path)], **kwargs)


class StubMessage:
Expand Down

0 comments on commit 81e14d3

Please sign in to comment.