diff --git a/src/karapace/karapace_all.py b/src/karapace/karapace_all.py index d2e8e1a13..714cd2073 100644 --- a/src/karapace/karapace_all.py +++ b/src/karapace/karapace_all.py @@ -6,8 +6,6 @@ from collections.abc import AsyncGenerator from contextlib import asynccontextmanager - -# from aiohttp.web_log import AccessLogger from fastapi import FastAPI, HTTPException, Request, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse @@ -19,21 +17,19 @@ from karapace.content_type import check_schema_headers from karapace.dependencies.config_dependency import ConfigDependencyManager from karapace.dependencies.schema_registry_dependency import SchemaRegistryDependencyManager +from karapace.dependencies.stats_dependeny import StatsDependencyManager from karapace.instrumentation.prometheus import PrometheusInstrumentation from karapace.routers.errors import KarapaceValidationError from karapace.schema_registry import KarapaceSchemaRegistry from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.requests import Request as StarletteHTTPRequest +from typing import Final import logging import sys +import uvicorn # from karapace.kafka_rest_apis import KafkaRest -# from karapace.utils import DebugAccessLogger - - -# class KarapaceAll(KafkaRest, KarapaceSchemaRegistryController): -# pass def _configure_logging(*, config: Config) -> None: @@ -61,24 +57,6 @@ def _configure_logging(*, config: Config) -> None: logging.getLogger("uvicorn.error").setLevel(config.log_level) -# if config.access_logs_debug is True: -# config["access_log_class"] = DebugAccessLogger -# logging.getLogger("aiohttp.access").setLevel(logging.DEBUG) -# else: -# config["access_log_class"] = AccessLogger - -CONFIG = ConfigDependencyManager.get_config() -_configure_logging(config=CONFIG) - -# config_without_secrets = {} -# for key, value in config.items(): -# if "password" in key: -# value = "****" -# config_without_secrets[key] = value -# logging.log(logging.DEBUG, "Config %r", config_without_secrets) -logging.log(logging.INFO, "Karapace version %s", karapace_version) - - @asynccontextmanager async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: schema_registry: KarapaceSchemaRegistry | None = None @@ -88,8 +66,8 @@ async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: await schema_registry.start() await schema_registry.get_master() authorizer = AuthorizationDependencyManager.get_authorizer() - # if authorizer is not None: - # await authorizer.start(StatsDependencyManager.get_stats()) + if authorizer is not None: + await authorizer.start(StatsDependencyManager.get_stats()) yield finally: if schema_registry: @@ -98,76 +76,87 @@ async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: await authorizer.close() -app = FastAPI(lifespan=lifespan) - - -@app.exception_handler(StarletteHTTPException) -async def http_exception_handler(_: StarletteHTTPRequest, exc: StarletteHTTPException): - return JSONResponse(status_code=exc.status_code, content=exc.detail) - - -@app.exception_handler(RequestValidationError) -async def validation_exception_handler(_: StarletteHTTPRequest, exc: RequestValidationError): - error_code = HTTPStatus.UNPROCESSABLE_ENTITY.value - if isinstance(exc, KarapaceValidationError): - error_code = exc.error_code - message = exc.body - else: - message = exc.errors() - return JSONResponse( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - content={ - "error_code": error_code, - "message": message, - }, - ) - - -@app.middleware("http") -async def set_content_types(request: Request, call_next): - try: - response_content_type = check_schema_headers(request) - except HTTPException as exc: +def create_karapace_application(*, config: Config) -> FastAPI: + # TODO: this lifespan is SR related lifespan + app = FastAPI(lifespan=lifespan) + _configure_logging(config=config) + + config_without_secrets = {} + for key, value in config.dict().items(): + if "password" in key: + value = "****" + elif "keyfile" in key: + value = "****" + config_without_secrets[key] = value + logging.log(logging.DEBUG, "Config %r", config_without_secrets) + logging.log(logging.INFO, "Karapace version %s", karapace_version) + + @app.exception_handler(StarletteHTTPException) + async def http_exception_handler(_: StarletteHTTPRequest, exc: StarletteHTTPException): + return JSONResponse(status_code=exc.status_code, content=exc.detail) + + @app.exception_handler(RequestValidationError) + async def validation_exception_handler(_: StarletteHTTPRequest, exc: RequestValidationError): + error_code = HTTPStatus.UNPROCESSABLE_ENTITY.value + if isinstance(exc, KarapaceValidationError): + error_code = exc.error_code + message = exc.body + else: + message = exc.errors() return JSONResponse( - status_code=exc.status_code, - headers=exc.headers, - content=exc.detail, + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content={ + "error_code": error_code, + "message": message, + }, ) - # Schema registry supports application/octet-stream, assumption is JSON object body. - # Force internally to use application/json in this case for compatibility. - if request.headers.get("Content-Type") == "application/octet-stream": - new_headers = request.headers.mutablecopy() - new_headers["Content-Type"] = "application/json" - request._headers = new_headers - request.scope.update(headers=request.headers.raw) - - response = await call_next(request) - response.headers["Content-Type"] = response_content_type - return response - - -if CONFIG.karapace_registry: - from karapace.routers.compatibility_router import compatibility_router - from karapace.routers.config_router import config_router - from karapace.routers.health_router import health_router - from karapace.routers.master_available_router import master_availability_router - from karapace.routers.mode_router import mode_router - from karapace.routers.root_router import root_router - from karapace.routers.schemas_router import schemas_router - from karapace.routers.subjects_router import subjects_router - - app.include_router(compatibility_router) - app.include_router(config_router) - app.include_router(health_router) - app.include_router(master_availability_router) - app.include_router(mode_router) - app.include_router(root_router) - app.include_router(schemas_router) - app.include_router(subjects_router) -if CONFIG.karapace_rest: - # add rest router. - pass + @app.middleware("http") + async def set_content_types(request: Request, call_next): + try: + response_content_type = check_schema_headers(request) + except HTTPException as exc: + return JSONResponse( + status_code=exc.status_code, + headers=exc.headers, + content=exc.detail, + ) + + # Schema registry supports application/octet-stream, assumption is JSON object body. + # Force internally to use application/json in this case for compatibility. + if request.headers.get("Content-Type") == "application/octet-stream": + new_headers = request.headers.mutablecopy() + new_headers["Content-Type"] = "application/json" + request._headers = new_headers + request.scope.update(headers=request.headers.raw) + + response = await call_next(request) + response.headers["Content-Type"] = response_content_type + return response + + if config.karapace_registry: + from karapace.routers.compatibility_router import compatibility_router + from karapace.routers.config_router import config_router + from karapace.routers.health_router import health_router + from karapace.routers.master_available_router import master_availability_router + from karapace.routers.mode_router import mode_router + from karapace.routers.root_router import root_router + from karapace.routers.schemas_router import schemas_router + from karapace.routers.subjects_router import subjects_router + + app.include_router(compatibility_router) + app.include_router(config_router) + app.include_router(health_router) + app.include_router(master_availability_router) + app.include_router(mode_router) + app.include_router(root_router) + app.include_router(schemas_router) + app.include_router(subjects_router) + if config.karapace_rest: + # add rest router. + pass + + return app def __old_main() -> int: @@ -180,5 +169,8 @@ def __old_main() -> int: return 0 -# if __name__ == "__main__": -# sys.exit(main()) +CONFIG: Final = ConfigDependencyManager.get_config() + +if __name__ == "__main__": + app = create_karapace_application(config=CONFIG) + uvicorn.run(app, host=CONFIG.host, port=CONFIG.port, log_level=CONFIG.log_level.lower()) diff --git a/tests/integration/test_karapace.py b/tests/integration/test_karapace.py index 374593157..043e3e21d 100644 --- a/tests/integration/test_karapace.py +++ b/tests/integration/test_karapace.py @@ -49,6 +49,6 @@ def test_regression_server_must_exit_on_exception( errfile = stack.enter_context((tmp_path / "karapace.err").open("w")) write_env_file(dot_env_path=env_path, config=config) - process = popen_karapace_all(host=config.host, port=port, env_path=env_path, stdout=logfile, stderr=errfile) + process = popen_karapace_all(env_path=env_path, stdout=logfile, stderr=errfile) stack.callback(stop_process, process) # make sure to stop the process if the test fails assert process.wait(timeout=10) != 0, "Process should have exited with an error, port is already is use" diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py index 358c28d23..66df9335a 100644 --- a/tests/integration/utils/cluster.py +++ b/tests/integration/utils/cluster.py @@ -77,7 +77,7 @@ async def start_schema_registry_cluster( logfile = stack.enter_context(open(log_path, "w")) errfile = stack.enter_context(open(error_path, "w")) - process = popen_karapace_all(host=host, port=port, env_path=env_path, stdout=logfile, stderr=errfile) + process = popen_karapace_all(env_path=env_path, stdout=logfile, stderr=errfile) stack.callback(stop_process, process) all_processes.append(process) diff --git a/tests/utils.py b/tests/utils.py index 77a7cd54e..e36093dc0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -344,23 +344,14 @@ def python_exe() -> str: return python -def fastapi_executable() -> str: - if os.environ.get("VIRTUAL_ENV"): - return os.environ["VIRTUAL_ENV"] + "/bin/fastapi" - else: - # TODO: Is this feasible, run which to find fastapi if not in virtualenv? - return os.popen("which fastapi").read() - - -def popen_karapace_all(*, host: str, port: int, env_path: Union[Path, str], stdout: IO, stderr: IO, **kwargs) -> Popen: +def popen_karapace_all(*, env_path: Union[Path, str], stdout: IO, stderr: IO, **kwargs) -> Popen: kwargs["stdout"] = stdout kwargs["stderr"] = stderr return Popen( - [fastapi_executable(), "run", "--host", host, "--port", str(port), "src/karapace/karapace_all.py"], + [python_exe(), "-m", "karapace.karapace_all"], env={"KARAPACE_DOTENV": str(env_path)}, **kwargs, ) - # return Popen([python_exe(), "-m", "karapace.karapace_all", str(config_path)], **kwargs) class StubMessage: