diff --git a/README.md b/README.md index 8f78215..5399d89 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ input messages must have the following JSON format: } ``` -You can check this implementation at [json_input_message_handler.py](src/anomalydetection/backend/entities/json_input_message_handler.py#29) +You can check this implementation at [json_input_message_handler.py](src/anomalydetection/backend/entities/handlers/json.py#29) ### Run diff --git a/src/anomalydetection/__init__.py b/src/anomalydetection/__init__.py index a99352d..19264f1 100644 --- a/src/anomalydetection/__init__.py +++ b/src/anomalydetection/__init__.py @@ -18,6 +18,6 @@ import os -BASE_PATH = os.path.dirname(__file__) +BASE_PATH = os.path.dirname(__file__) # noqa: E402 -__all__ = ('BASE_PATH', 'VERSION', 'BUILD', '__version__') +__all__ = ('BASE_PATH') diff --git a/src/anomalydetection/anomdec.py b/src/anomalydetection/anomdec.py index 70871d6..f14c32a 100644 --- a/src/anomalydetection/anomdec.py +++ b/src/anomalydetection/anomdec.py @@ -20,6 +20,7 @@ import sys from time import sleep +from anomalydetection.common import plugins # noqa: F401 from anomalydetection.backend.backend import main as backend_main from anomalydetection.common.config import Config from anomalydetection.common.logging import LoggingMixin diff --git a/src/anomalydetection/anomdec.yml b/src/anomalydetection/anomdec.yml index c81fc10..a025115 100644 --- a/src/anomalydetection/anomdec.yml +++ b/src/anomalydetection/anomdec.yml @@ -24,11 +24,12 @@ streams: source: type: kafka params: - brokers: localhost:9092 - in: test1 + broker_servers: localhost:9092 + input_topic: test1 # aggregation: -# function: avg -# window_millis: 30000 +# agg_function: avg +# agg_window_millis: 30000 + handler: json engine: type: cad params: @@ -51,8 +52,8 @@ streams: stream: type: kafka params: - brokers: localhost:9092 - out: test2 + broker_servers: localhost:9092 + output_topic: test2 warmup: - name: sqlite repository: @@ -64,12 +65,10 @@ streams: source: type: pubsub params: - project: testing + project_id: testing auth_file: /dev/null - in: test10 -# aggregation: -# function: avg -# window_millis: 30000 + subscription: test10 + handler: json engine: type: robust params: @@ -87,8 +86,8 @@ streams: stream: type: pubsub params: - project: testing - out: test20 + project_id: testing + output_topic: test20 warmup: - name: sqlite repository: @@ -100,8 +99,9 @@ streams: source: type: kafka params: - brokers: localhost:9092 - in: test3 + broker_servers: localhost:9092 + input_topic: test3 + handler: json engine: type: ema params: @@ -119,8 +119,8 @@ streams: stream: type: kafka params: - brokers: localhost:9092 - out: test4 + broker_servers: localhost:9092 + output_topic: test4 warmup: - name: sqlite repository: diff --git a/src/anomalydetection/backend/backend.py b/src/anomalydetection/backend/backend.py index 6af4b4c..1255d65 100644 --- a/src/anomalydetection/backend/backend.py +++ b/src/anomalydetection/backend/backend.py @@ -16,20 +16,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from anomalydetection.backend.sink.websocket import \ - WebSocketSink +from anomalydetection.backend.interactor.stream_engine import StreamEngineInteractor +from anomalydetection.backend.sink.websocket import WebSocketSink from anomalydetection.common.concurrency import Concurrency from anomalydetection.common.config import Config -from anomalydetection.backend.entities.json_input_message_handler import \ - InputJsonMessageHandler -from anomalydetection.backend.interactor.stream_engine import \ - StreamEngineInteractor def main(config: Config): # Creates stream based on config env vars and a RobustDetector - def run_live_anomaly_detection(stream, engine_builder, + def run_live_anomaly_detection(stream, handler, engine_builder, sinks, warmup, name): # Add dashboard websocket as extra sink @@ -44,7 +40,7 @@ def run_live_anomaly_detection(stream, engine_builder, interactor = StreamEngineInteractor( stream, engine_builder, - InputJsonMessageHandler(), + handler, sinks=sinks + extra_sink, warm_up=warmup[0] if warmup else None) interactor.run() diff --git a/src/anomalydetection/backend/devel_mode.py b/src/anomalydetection/backend/devel_mode.py index 4af8c8e..8a46209 100644 --- a/src/anomalydetection/backend/devel_mode.py +++ b/src/anomalydetection/backend/devel_mode.py @@ -62,16 +62,16 @@ def build_producers(self): pass producers.append( - StreamBuilderFactory.get_pubsub_producer() + StreamBuilderFactory.get_producer_pubsub() .set_project_id(project) .set_output_topic("test10").build()) producers.append( - StreamBuilderFactory.get_kafka_producer() + StreamBuilderFactory.get_producer_kafka() .set_broker_servers("localhost:9092") .set_output_topic("test1").build()) producers.append( - StreamBuilderFactory.get_kafka_producer() + StreamBuilderFactory.get_producer_kafka() .set_broker_servers("localhost:9092") .set_output_topic("test3").build()) diff --git a/src/anomalydetection/backend/engine/builder.py b/src/anomalydetection/backend/engine/builder.py index b470f0a..50c739d 100644 --- a/src/anomalydetection/backend/engine/builder.py +++ b/src/anomalydetection/backend/engine/builder.py @@ -15,7 +15,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . - +import importlib import sys from collections import OrderedDict @@ -25,7 +25,7 @@ from anomalydetection.backend.engine.robust_z_engine import RobustDetector -class BaseBuilder(object): +class BaseEngineBuilder(object): """ BaseBuilder, implement this to create Engine Builders. """ @@ -51,7 +51,7 @@ def raise_exception(*args, **kwargs): self.__class__.__name__, func_name)) -class CADDetectorBuilder(BaseBuilder): +class CADDetectorBuilder(BaseEngineBuilder): type = "cad" @@ -103,7 +103,7 @@ def build(self) -> CADDetector: return CADDetector(**vars(self).copy()) -class RobustDetectorBuilder(BaseBuilder): +class RobustDetectorBuilder(BaseEngineBuilder): type = "robust" @@ -123,7 +123,7 @@ def build(self) -> RobustDetector: return RobustDetector(**vars(self).copy()) -class EMADetectorBuilder(BaseBuilder): +class EMADetectorBuilder(BaseEngineBuilder): type = "ema" @@ -145,25 +145,23 @@ def build(self) -> EMADetector: class EngineBuilderFactory(object): - engines = OrderedDict( - [ - ("robust", { - "key": "robust", - "name": "RobustDetector" - }), - ("cad", { - "key": "cad", - "name": "CADDetector" - }), - ("ema", { - "key": "ema", - "name": "EMADetector" - }), - ] - ) + engines = OrderedDict() + + @classmethod + def register_engine(cls, key, class_name): + cls.engines[key] = {"key": key, "name": class_name} + + @staticmethod + def get_plugin(name) -> BaseEngineBuilder: + module_name = "anomalydetection.backend.engine.{}_builder".format(name) + objects = vars(importlib.import_module(module_name))["_objects"] + for obj in objects: + if issubclass(obj, BaseEngineBuilder): + return obj() + raise NotImplementedError() @staticmethod - def get(name) -> BaseBuilder: + def get(name) -> BaseEngineBuilder: def raise_exception(): raise NotImplementedError() func_name = "get_{}".format(name) @@ -171,9 +169,12 @@ def raise_exception(): try: return func() except NotImplementedError as ex: - raise NotImplementedError( - "Calling undefined function: {}.{}()".format( - "EngineBuilderFactory", func_name)) + try: + return EngineBuilderFactory.get_plugin(name) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + "EngineBuilderFactory", func_name)) @staticmethod def get_robust() -> RobustDetectorBuilder: @@ -186,3 +187,8 @@ def get_cad() -> CADDetectorBuilder: @staticmethod def get_ema() -> EMADetectorBuilder: return EMADetectorBuilder() + + +EngineBuilderFactory.register_engine("robust", "RobustDetector") +EngineBuilderFactory.register_engine("cad", "CADDetector") +EngineBuilderFactory.register_engine("ema", "EMADetector") diff --git a/src/anomalydetection/backend/entities/handlers/__init__.py b/src/anomalydetection/backend/entities/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/anomalydetection/backend/entities/handlers/factory.py b/src/anomalydetection/backend/entities/handlers/factory.py new file mode 100644 index 0000000..841964d --- /dev/null +++ b/src/anomalydetection/backend/entities/handlers/factory.py @@ -0,0 +1,54 @@ +# -*- coding:utf-8 -*- # +# +# Anomaly Detection Framework +# Copyright (C) 2018 Bluekiri BigData Team +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import importlib + +from anomalydetection.backend.entities import BaseMessageHandler +from anomalydetection.backend.entities.handlers.json import InputJsonMessageHandler + + +class MessageHandlerFactory(object): + + @staticmethod + def get_plugin(name) -> BaseMessageHandler: + module_name = "anomalydetection.backend.entities.handlers.{}".format(name) + objects = vars(importlib.import_module(module_name))["_objects"] + for obj in objects: + if issubclass(obj, BaseMessageHandler): + return obj() + raise NotImplementedError() + + @staticmethod + def get(name) -> BaseMessageHandler: + def raise_exception(): + raise NotImplementedError() + func_name = "get_{}".format(name) + func = getattr(MessageHandlerFactory, func_name, raise_exception) + try: + return func() + except NotImplementedError as ex: + try: + return MessageHandlerFactory.get_plugin(name) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + "StreamBuilderFactory", func_name)) + + @staticmethod + def get_json(): + return InputJsonMessageHandler() diff --git a/src/anomalydetection/backend/entities/json_input_message_handler.py b/src/anomalydetection/backend/entities/handlers/json.py similarity index 100% rename from src/anomalydetection/backend/entities/json_input_message_handler.py rename to src/anomalydetection/backend/entities/handlers/json.py diff --git a/src/anomalydetection/backend/interactor/__init__.py b/src/anomalydetection/backend/interactor/__init__.py index a80e470..a9fd204 100644 --- a/src/anomalydetection/backend/interactor/__init__.py +++ b/src/anomalydetection/backend/interactor/__init__.py @@ -18,7 +18,7 @@ from threading import Lock -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder from anomalydetection.backend.entities import BaseMessageHandler @@ -31,7 +31,7 @@ class BaseEngineInteractor(object): """ def __init__(self, - engine_builder: BaseBuilder, + engine_builder: BaseEngineBuilder, message_handler: BaseMessageHandler) -> None: """ BaseEngineInteractor constructor diff --git a/src/anomalydetection/backend/interactor/batch_engine.py b/src/anomalydetection/backend/interactor/batch_engine.py index d1ff9c2..3dea21d 100644 --- a/src/anomalydetection/backend/interactor/batch_engine.py +++ b/src/anomalydetection/backend/interactor/batch_engine.py @@ -18,7 +18,7 @@ from rx.core import Observable -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder from anomalydetection.backend.entities import BaseMessageHandler from anomalydetection.backend.entities.input_message import InputMessage from anomalydetection.backend.entities.output_message import OutputMessage @@ -37,7 +37,7 @@ class BatchEngineInteractor(BaseEngineInteractor, LoggingMixin): def __init__(self, batch: BaseObservable, - engine_builder: BaseBuilder, + engine_builder: BaseEngineBuilder, message_handler: BaseMessageHandler) -> None: """ BatchEngineInteractor constructor diff --git a/src/anomalydetection/backend/interactor/stream_engine.py b/src/anomalydetection/backend/interactor/stream_engine.py index b68fa85..c7492b1 100644 --- a/src/anomalydetection/backend/interactor/stream_engine.py +++ b/src/anomalydetection/backend/interactor/stream_engine.py @@ -18,7 +18,7 @@ from typing import List -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder from anomalydetection.backend.entities import BaseMessageHandler from anomalydetection.backend.entities.input_message import InputMessage from anomalydetection.backend.entities.output_message import OutputMessage @@ -35,7 +35,7 @@ class StreamEngineInteractor(BaseEngineInteractor, LoggingMixin): def __init__(self, stream: BaseConsumerBuilder, - engine_builder: BaseBuilder, + engine_builder: BaseEngineBuilder, message_handler: BaseMessageHandler, sinks: List[BaseSink] = list(), warm_up: BaseObservable = None) -> None: diff --git a/src/anomalydetection/backend/repository/builder.py b/src/anomalydetection/backend/repository/builder.py index 8f14ef7..f98b4a7 100644 --- a/src/anomalydetection/backend/repository/builder.py +++ b/src/anomalydetection/backend/repository/builder.py @@ -15,12 +15,13 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import importlib from anomalydetection.backend.repository import BaseRepository from anomalydetection.backend.repository.sqlite import SQLiteRepository -class BaseBuilder(object): +class BaseRepositoryBuilder(object): """ BaseBuilder, implement this to create Repository Builders. """ @@ -33,8 +34,20 @@ def build(self) -> BaseRepository: """ raise NotImplementedError("To implement in child classes.") + def set(self, name: str, value: str): + def raise_exception(*args, **kwargs): + raise NotImplementedError() + func_name = "set_{}".format(name) + func = getattr(self, func_name, raise_exception) + try: + return func(value) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + self.__class__.__name__, func_name)) -class SQLiteBuilder(BaseBuilder): + +class SQLiteBuilder(BaseRepositoryBuilder): def __init__(self, database: str = None) -> None: @@ -51,6 +64,31 @@ def build(self) -> BaseRepository: class RepositoryBuilderFactory(object): + @staticmethod + def get_plugin(name) -> BaseRepositoryBuilder: + module_name = "anomalydetection.backend.repository.{}_builder".format(name) + objects = vars(importlib.import_module(module_name))["_objects"] + for obj in objects: + if issubclass(obj, BaseRepositoryBuilder): + return obj() + raise NotImplementedError() + + @staticmethod + def get(name) -> BaseRepositoryBuilder: + def raise_exception(): + raise NotImplementedError() + func_name = "get_{}".format(name) + func = getattr(RepositoryBuilderFactory, func_name, raise_exception) + try: + return func() + except NotImplementedError as ex: + try: + return RepositoryBuilderFactory.get_plugin(name) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + "RepositoryBuilderFactory", func_name)) + @staticmethod def get_sqlite() -> SQLiteBuilder: return SQLiteBuilder() diff --git a/src/anomalydetection/backend/stream/builder.py b/src/anomalydetection/backend/stream/builder.py index 88bf93c..f09a6c9 100644 --- a/src/anomalydetection/backend/stream/builder.py +++ b/src/anomalydetection/backend/stream/builder.py @@ -15,7 +15,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . - +import importlib import uuid from anomalydetection.backend.stream import BaseStreamConsumer @@ -34,12 +34,36 @@ class BaseConsumerBuilder(object): def build(self) -> BaseStreamConsumer: raise NotImplementedError("To implement in child classes.") + def set(self, name: str, value: str): + def raise_exception(*args, **kwargs): + raise NotImplementedError() + func_name = "set_{}".format(name) + func = getattr(self, func_name, raise_exception) + try: + return func(value) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + self.__class__.__name__, func_name)) + class BaseProducerBuilder(object): def build(self) -> BaseStreamProducer: raise NotImplementedError("To implement in child classes.") + def set(self, name: str, value: str): + def raise_exception(*args, **kwargs): + raise NotImplementedError() + func_name = "set_{}".format(name) + func = getattr(self, func_name, raise_exception) + try: + return func(value) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + self.__class__.__name__, func_name)) + class KafkaStreamConsumerBuilder(BaseConsumerBuilder): @@ -69,6 +93,8 @@ def set_group_id(self, group_id): return self def set_agg_function(self, agg_function: AggregationFunction): + if isinstance(agg_function, str): + agg_function = AggregationFunction(agg_function) self.agg_function = agg_function return self @@ -143,6 +169,8 @@ def set_auth_file(self, auth_file: str): return self def set_agg_function(self, agg_function: AggregationFunction): + if isinstance(agg_function, str): + agg_function = AggregationFunction(agg_function) self.agg_function = agg_function return self @@ -198,17 +226,67 @@ def build(self) -> BaseStreamProducer: class StreamBuilderFactory(object): @staticmethod - def get_kafka_consumer(): + def get_consumer_plugin(name) -> BaseConsumerBuilder: + module_name = "anomalydetection.backend.stream.{}_builder".format(name) + objects = vars(importlib.import_module(module_name))["_objects"] + for obj in objects: + if issubclass(obj, BaseConsumerBuilder): + return obj() + raise NotImplementedError() + + @staticmethod + def get_consumer(name) -> BaseConsumerBuilder: + def raise_exception(): + raise NotImplementedError() + func_name = "get_consumer_{}".format(name) + func = getattr(StreamBuilderFactory, func_name, raise_exception) + try: + return func() + except NotImplementedError as ex: + try: + return StreamBuilderFactory.get_consumer_plugin(name) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + "EngineBuilderFactory", func_name)) + + @staticmethod + def get_producer_plugin(name) -> BaseProducerBuilder: + module_name = "anomalydetection.backend.stream.{}_builder".format(name) + objects = vars(importlib.import_module(module_name))["_objects"] + for obj in objects: + if issubclass(obj, BaseProducerBuilder): + return obj() + raise NotImplementedError() + + @staticmethod + def get_producer(name) -> BaseProducerBuilder: + def raise_exception(): + raise NotImplementedError() + func_name = "get_producer_{}".format(name) + func = getattr(StreamBuilderFactory, func_name, raise_exception) + try: + return func() + except NotImplementedError as ex: + try: + return StreamBuilderFactory.get_producer_plugin(name) + except NotImplementedError as ex: + raise NotImplementedError( + "Calling undefined function: {}.{}()".format( + "StreamBuilderFactory", func_name)) + + @staticmethod + def get_consumer_kafka(): return KafkaStreamConsumerBuilder() @staticmethod - def get_kafka_producer(): + def get_producer_kafka(): return KafkaStreamProducerBuilder() @staticmethod - def get_pubsub_consumer(): + def get_consumer_pubsub(): return PubSubStreamConsumerBuilder() @staticmethod - def get_pubsub_producer(): + def get_producer_pubsub(): return PubSubStreamProducerBuilder() diff --git a/src/anomalydetection/common/config.py b/src/anomalydetection/common/config.py index bd00b26..58caf5d 100644 --- a/src/anomalydetection/common/config.py +++ b/src/anomalydetection/common/config.py @@ -18,6 +18,7 @@ import os +from anomalydetection.backend.entities.handlers.factory import MessageHandlerFactory from anomalydetection.common.logging import LoggingMixin import yaml @@ -27,7 +28,6 @@ from anomalydetection.backend.sink import BaseSink from anomalydetection.backend.sink.repository import RepositorySink from anomalydetection.backend.sink.stream import StreamSink -from anomalydetection.backend.stream import AggregationFunction from anomalydetection.backend.stream.builder import StreamBuilderFactory @@ -71,27 +71,15 @@ def get_streams(self): return streams def _get_stream(self, item): - builder = None source = item["source"] - if source["type"] == "kafka": - builder = StreamBuilderFactory.get_kafka_consumer() - builder.set_broker_servers(source["params"]["brokers"]) - builder.set_input_topic(source["params"]["in"]) - if "group_id" in source["params"]: - builder.set_group_id(source["params"]["group_id"]) - - if source["type"] == "pubsub": - builder = StreamBuilderFactory.get_pubsub_consumer() - builder.set_project_id(source["params"]["project"]) - builder.set_subscription(source["params"]["in"]) - if "auth_file" in source["params"]: - builder.set_auth_file(source["params"]["auth_file"]) - + builder = StreamBuilderFactory.get_consumer(source["type"]) + params = source["params"] if "params" in source else {} + for param in params: + builder.set(param, params[param]) if "aggregation" in item: - agg = item["aggregation"] - builder.set_agg_function(AggregationFunction(agg["function"])) - builder.set_agg_window_millis(agg["window_millis"]) - + agg_params = item["aggregation"] + for agg_param in agg_params: + builder.set(agg_param, agg_params[agg_param]) return builder def get_engines(self): @@ -100,44 +88,26 @@ def get_engines(self): engines.append(self._get_engine(item["engine"])) return engines - def _get_engine(self, engine): - builder = None - if engine["type"] == "cad": - builder = EngineBuilderFactory.get_cad() - if "min_value" in engine["params"]: - builder.set_min_value(engine["params"]["min_value"]) - if "max_value" in engine["params"]: - builder.set_max_value(engine["params"]["max_value"]) - if "rest_period" in engine["params"]: - builder.set_rest_period(engine["params"]["rest_period"]) - if "num_norm_value_bits" in engine["params"]: - builder.set_num_norm_value_bits( - engine["params"]["num_norm_value_bits"]) - if "max_active_neurons_num" in engine["params"]: - builder.set_max_active_neurons_num( - engine["params"]["max_active_neurons_num"]) - if "max_left_semi_contexts_length" in engine["params"]: - builder.set_max_left_semi_contexts_length( - engine["params"]["max_left_semi_contexts_length"]) - - if engine["type"] == "robust": - builder = EngineBuilderFactory.get_robust() - if "window" in engine["params"]: - builder.set_window(engine["params"]["window"]) - - if engine["type"] == "ema": - builder = EngineBuilderFactory.get_ema() - if "window" in engine["params"]: - builder.set_window(engine["params"]["window"]) - - if "threshold" in engine["params"]: - builder.set_threshold(engine["params"]["threshold"]) + def get_handlers(self): + handlers = [] + for item in self.config["streams"]: + try: + handlers.append(MessageHandlerFactory.get(item["handler"])) + except KeyError as key_error: + handlers.append(MessageHandlerFactory.get_json()) + return handlers + def _get_engine(self, engine): + builder = EngineBuilderFactory.get(engine["type"]) + params = engine["params"] if "params" in engine else {} + for param in params: + builder.set(param, params[param]) return builder def get(self): if not self.built: self.built = list(zip(self.get_streams(), + self.get_handlers(), self.get_engines(), self.get_sinks(), self.get_warmup())) @@ -186,11 +156,10 @@ def get_warmup(self): return warmups def _get_repository(self, repository): - builder = None - if repository["type"] == "sqlite": - builder = RepositoryBuilderFactory.get_sqlite() - if "database" in repository["params"]: - builder.set_database(repository["params"]["database"]) + builder = RepositoryBuilderFactory.get(repository["type"]) + params = repository["params"] if "params" in repository else {} + for param in params: + builder.set(param, params[param]) return builder def _get_sink(self, sink) -> BaseSink: @@ -200,15 +169,8 @@ def _get_sink(self, sink) -> BaseSink: return RepositorySink(builder.build()) if sink["type"] == "stream": stream = sink["stream"] - if stream["type"] == "kafka": - builder = StreamBuilderFactory.get_kafka_producer() - builder.set_broker_servers(stream["params"]["brokers"]) - builder.set_output_topic(stream["params"]["out"]) - return StreamSink(builder.build()) - if stream["type"] == "pubsub": - builder = StreamBuilderFactory.get_pubsub_producer() - builder.set_project_id(stream["params"]["project"]) - builder.set_output_topic(stream["params"]["out"]) - if "auth_file" in stream["params"]: - builder.set_auth_file(stream["params"]["auth_file"]) - return StreamSink(builder.build()) + builder = StreamBuilderFactory.get_producer(stream["type"]) + params = stream["params"] if "params" in stream else {} + for param in params: + builder.set(param, params[param]) + return StreamSink(builder.build()) diff --git a/src/anomalydetection/common/plugins.py b/src/anomalydetection/common/plugins.py index ee31e73..aafd966 100644 --- a/src/anomalydetection/common/plugins.py +++ b/src/anomalydetection/common/plugins.py @@ -16,4 +16,142 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -# TODO \ No newline at end of file +import inspect +import logging +import os +import re +import sys +from importlib.machinery import SourceFileLoader +from types import ModuleType + +from anomalydetection.backend.engine.builder import EngineBuilderFactory + + +class Plugin(object): + + name = None + stream_consumer_builders = [] # Implements BaseConsumerBuilder + stream_consumers = [] # Implements BaseStreamConsumer + stream_producer_builders = [] # Implements BaseProducerBuilder + stream_producers = [] # Implements BaseStreamProducer + engine_builders = [] # Implements BaseEngineBuilder + engines = [] # Implements BaseEngine + repository_builders = [] # Implements BaseRepositoryBuilder + repositories = [] # Implements BaseRepository + message_handlers = [] # Implements BaseMessageHandler + + @staticmethod + def validate(): + return True + + +plugins = [] + +# Plugins folder +plugins_folder = os.getenv("ANOMDEC_HOME", os.environ["HOME"] + "/anomdec") +plugins_folder += "/plugins" + +norm_pattern = re.compile(r'[/|.]') + +for root, dirs, files in os.walk(plugins_folder, followlinks=True): + for f in files: + try: + filepath = os.path.join(root, f) + if not os.path.isfile(filepath): + continue + mod_name, file_ext = os.path.splitext( + os.path.split(filepath)[-1]) + if file_ext != '.py': + continue + + logging.debug("Importing plugin modules, {}".format(filepath)) + namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name]) + + m = SourceFileLoader(namespace, filepath).load_module() + for obj in list(m.__dict__.values()): + if (inspect.isclass(obj) and issubclass(obj, Plugin) + and obj is not Plugin): + obj.validate() + if obj not in plugins: + plugins.append(obj) + + except Exception as e: + logging.exception(e) + logging.error('Failed to import plugin %s', filepath) + + +def make_module(name, objects): + logging.debug('Creating module %s', name) + name = name.lower() + module = ModuleType(name) + module._name = name.split('.')[-1] + module._objects = objects + module.__dict__.update((o.__name__, o) for o in objects) + return module + + +stream_builders = [] # Implements BaseConsumerBuilder/BaseProducerBuilder +streams = [] # Implements BaseStreamConsumer/BaseStreamProducer +engine_builders = [] # Implements BaseEngineBuilder +engines = [] # Implements BaseEngine +repository_builders = [] # Implements BaseRepositoryBuilder +repositories = [] # Implements BaseRepository +message_handlers = [] # Implements BaseMessageHandler + +for p in plugins: + + # Stream + stream_builders.append( + make_module("anomalydetection.backend.stream.{}_builder".format(p.name), + p.stream_consumer_builders + p.stream_producer_builders)) + + streams.append( + make_module("anomalydetection.backend.stream.{}".format(p.name), + p.stream_consumer_builders + p.stream_producers)) + + # Engine + engine_builders.append( + make_module("anomalydetection.backend.engine.{}_builder".format(p.name), + p.engine_builders)) + + engines.append( + make_module("anomalydetection.backend.engine.{}".format(p.name), + p.engines)) + + # Repository + repository_builders.append( + make_module("anomalydetection.backend.repository.{}_builder".format(p.name), + p.repository_builders)) + + repositories.append( + make_module("anomalydetection.backend.repository.{}".format(p.name), + p.repositories)) + + # Message handlers + message_handlers.append( + make_module("anomalydetection.backend.entities.handlers.{}".format(p.name), + p.message_handlers) + ) + +for stream in streams: + sys.modules[stream.__name__] = stream + +for str_builder in stream_builders: + sys.modules[str_builder.__name__] = str_builder + +for repository in repositories: + sys.modules[repository.__name__] = repository + +for repo_builder in repository_builders: + sys.modules[repo_builder.__name__] = repo_builder + +for engine in engines: + sys.modules[engine.__name__] = engine + # It is necessary to view in dashboard + EngineBuilderFactory.register_engine(engine._name, engine._name) + +for eng_builder in engine_builders: + sys.modules[eng_builder.__name__] = eng_builder + +for message_handler in message_handlers: + sys.modules[message_handler.__name__] = message_handler diff --git a/src/anomalydetection/dashboard/handlers/web/signal.py b/src/anomalydetection/dashboard/handlers/web/signal.py index ad43ce6..ba755ef 100644 --- a/src/anomalydetection/dashboard/handlers/web/signal.py +++ b/src/anomalydetection/dashboard/handlers/web/signal.py @@ -19,19 +19,16 @@ import datetime import sys -from anomalydetection.backend.stream import FileObservable -from anomalydetection.common.logging import LoggingMixin +import pandas as pd from tornado.escape import json_encode - -from anomalydetection.backend.entities.json_input_message_handler import \ - InputJsonMessageHandler -from bokeh.embed import components from bokeh.plotting import figure -import pandas as pd from tornado.web import RequestHandler from tornado import web +from anomalydetection.backend.stream import FileObservable +from anomalydetection.common.logging import LoggingMixin from anomalydetection.backend.engine.builder import EngineBuilderFactory +from anomalydetection.backend.entities.handlers.json import InputJsonMessageHandler from anomalydetection.backend.entities.output_message import OutputMessageHandler from anomalydetection.backend.interactor.batch_engine import BatchEngineInteractor from anomalydetection.backend.repository.observable import ObservableRepository @@ -53,7 +50,7 @@ async def create_figure(self, data): .settings["config"] \ .get_as_dict() - repository = conf[data["name"]][3][0].repository + repository = conf[data["name"]][4][0].repository observable = ObservableRepository(repository, data["application"], from_ts, to_ts) @@ -127,7 +124,7 @@ def get(self, signal): .settings["config"] \ .get_as_dict() - repository = conf[signal][3][0].repository + repository = conf[signal][4][0].repository observable = ObservableRepository(repository, input_data["application"]) data = [x.to_plain_dict() @@ -185,7 +182,7 @@ async def get(self): data=json_encode(data)) -class SignalDetail(SecureHTMLHandler, Chart): +class SignalDetail(SecureHTMLHandler): template = "signal.html" @@ -200,7 +197,7 @@ async def get(self, signal, data=None): .get_as_dict() data = { - "engine": conf[signal][1].type, + "engine": conf[signal][2].type, "window": "30", "threshold": "0.99", "to-date": to_date.strftime("%d-%m-%Y"), @@ -212,19 +209,16 @@ async def get(self, signal, data=None): for k, v in self.request.arguments.items()} data.update(input_data) - repository = conf[data["name"]][3][0].repository + repository = conf[data["name"]][4][0].repository applications = repository.get_applications() if 'application' not in data: data['application'] = applications[0] - plot = await self.create_figure(data) - script, div = components(plot) - breadcrumb = [ ("/signals/", "Signals", ""), (self.request.uri, signal, "active") ] - self.response(script=script, div=div, engines=engines, + self.response(script=None, div=None, engines=engines, signal_name=signal, breadcrumb=breadcrumb, form_data=data, engine_key=data['engine'], applications=applications, diff --git a/src/anomalydetection/dashboard/helpers/engine.py b/src/anomalydetection/dashboard/helpers/engine.py index 13d9579..9408832 100644 --- a/src/anomalydetection/dashboard/helpers/engine.py +++ b/src/anomalydetection/dashboard/helpers/engine.py @@ -16,12 +16,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder class EngineBuilderForm(object): - def __init__(self, engine_builder: BaseBuilder) -> None: + def __init__(self, engine_builder: BaseEngineBuilder) -> None: super().__init__() self.engine_builder = engine_builder diff --git a/src/test/anomdec-test.yml b/src/test/anomdec-test.yml index 696d59c..b830a67 100644 --- a/src/test/anomdec-test.yml +++ b/src/test/anomdec-test.yml @@ -6,8 +6,8 @@ streams: source: type: kafka params: - brokers: localhost:9092 - in: test1 + broker_servers: localhost:9092 + input_topic: test1 engine: type: cad params: diff --git a/src/test/integration/test_batch_engine_interactor.py b/src/test/integration/test_batch_engine_interactor.py index 48a3b9a..89aa34c 100644 --- a/src/test/integration/test_batch_engine_interactor.py +++ b/src/test/integration/test_batch_engine_interactor.py @@ -22,8 +22,7 @@ from datetime import datetime from anomalydetection.backend.engine.builder import EngineBuilderFactory -from anomalydetection.backend.entities.json_input_message_handler import \ - InputJsonMessageHandler +from anomalydetection.backend.entities.handlers.json import InputJsonMessageHandler from anomalydetection.backend.interactor.batch_engine import BatchEngineInteractor from anomalydetection.backend.stream import BaseStreamConsumer from anomalydetection.common.logging import LoggingMixin diff --git a/src/test/integration/test_plugins.py b/src/test/integration/test_plugins.py new file mode 100644 index 0000000..41f8604 --- /dev/null +++ b/src/test/integration/test_plugins.py @@ -0,0 +1,37 @@ +# -*- coding:utf-8 -*- # +# +# Anomaly Detection Framework +# Copyright (C) 2018 Bluekiri BigData Team +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import unittest + +from anomalydetection.common import plugins # noqa: F401 +from anomalydetection.backend.engine.builder import EngineBuilderFactory +from anomalydetection.backend.entities.handlers.factory import MessageHandlerFactory +from anomalydetection.backend.repository.builder import RepositoryBuilderFactory +from anomalydetection.backend.stream.builder import StreamBuilderFactory +from anomalydetection.common.logging import LoggingMixin + + +class TestPubSubStreamBackend(unittest.TestCase, LoggingMixin): + + @unittest.skip("FIXME") + def test_modules(self): + EngineBuilderFactory.get("plugin1").build() + RepositoryBuilderFactory.get("plugin1").build() + StreamBuilderFactory.get_consumer("plugin1").build() + StreamBuilderFactory.get_producer("plugin1").build() + MessageHandlerFactory.get("plugin1") diff --git a/src/test/integration/test_stream_engine_interactor.py b/src/test/integration/test_stream_engine_interactor.py index b279ee2..a271ec2 100644 --- a/src/test/integration/test_stream_engine_interactor.py +++ b/src/test/integration/test_stream_engine_interactor.py @@ -49,7 +49,7 @@ def build(self) -> BaseStreamConsumer: return DummyStream() -class DummyMessageHandler(BaseMessageHandler[InputMessage], LoggingMixin): +class DummyMessageHandler(BaseMessageHandler[InputMessage]): @classmethod def parse_message(cls, message: str) -> InputMessage: @@ -102,7 +102,7 @@ class TestStreamEngineInteractor(unittest.TestCase, LoggingMixin): def test_robust_stream_engine_interactor(self): interactor = StreamEngineInteractor( - DummyStream(), + DummyStreamBuilder(), EngineBuilderFactory.get_robust().set_window(30).set_threshold(.95), DummyMessageHandler(), sinks=[DummySink()], @@ -112,7 +112,7 @@ def test_robust_stream_engine_interactor(self): def test_cad_stream_engine_interactor(self): interactor = StreamEngineInteractor( - DummyStream(), + DummyStreamBuilder(), EngineBuilderFactory.get_cad(), DummyMessageHandler(), sinks=[DummySink()], diff --git a/src/test/plugins/test_plugin.py b/src/test/plugins/test_plugin.py new file mode 100644 index 0000000..dc01c5b --- /dev/null +++ b/src/test/plugins/test_plugin.py @@ -0,0 +1,125 @@ +# -*- coding:utf-8 -*- # +# +# Anomaly Detection Framework +# Copyright (C) 2018 Bluekiri BigData Team +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from typing import List, Any, Iterable, Generator + +from anomalydetection.backend.engine import BaseEngine +from anomalydetection.backend.engine.builder import BaseEngineBuilder +from anomalydetection.backend.entities import BaseMessageHandler +from anomalydetection.backend.entities.input_message import InputMessage +from anomalydetection.backend.entities.output_message import OutputMessage, AnomalyResult +from anomalydetection.backend.repository import BaseRepository +from anomalydetection.backend.repository.builder import BaseRepositoryBuilder +from anomalydetection.backend.stream import BaseStreamConsumer +from anomalydetection.backend.stream import BaseStreamProducer +from anomalydetection.backend.stream.builder import BaseConsumerBuilder +from anomalydetection.backend.stream.builder import BaseProducerBuilder +from anomalydetection.common.plugins import Plugin + + +class PluginOneRepository(BaseRepository): + + def initialize(self): + pass + + def fetch(self, application, from_ts, to_ts) -> Iterable: + pass + + def insert(self, message: OutputMessage) -> None: + pass + + def map(self, item: Any) -> OutputMessage: + pass + + def get_applications(self) -> List[str]: + pass + + +class PluginOneRepositoryBuilder(BaseRepositoryBuilder): + + def build(self) -> BaseRepository: + return PluginOneRepository("plugin1") + + +class EngineOne(BaseEngine): + + def predict(self, value: float, **kwargs) -> AnomalyResult: + return AnomalyResult(-1, 1, 0, False) + + +class EngineOneBuilder(BaseEngineBuilder): + + type = "plugin1" + + def build(self) -> BaseEngine: + return EngineOne() + + +class ConsumerOne(BaseStreamConsumer): + + def poll(self) -> Generator: + yield 1 + + +class ConsumerOneBuilder(BaseConsumerBuilder): + + def build(self) -> BaseStreamConsumer: + return ConsumerOne() + + +class ProducerOne(BaseStreamProducer): + + def push(self, message: str) -> None: + pass + + +class ProducerOneBuilder(BaseProducerBuilder): + + def build(self) -> BaseStreamProducer: + return ProducerOne() + + +class HandlerOne(BaseMessageHandler[InputMessage]): + + @classmethod + def parse_message(cls, message: Any) -> InputMessage: + pass + + @classmethod + def extract_key(cls, message: InputMessage) -> str: + pass + + @classmethod + def extract_value(cls, message: InputMessage) -> float: + pass + + @classmethod + def validate_message(cls, message: InputMessage) -> bool: + pass + + +class PluginOneRepositoryPlugin(Plugin): + name = "plugin1" + stream_consumer_builders = [ConsumerOneBuilder] + stream_consumers = [ConsumerOne] + stream_producer_builders = [ProducerOneBuilder] + stream_producers = [ProducerOne] + repositories = [PluginOneRepository] + repository_builders = [PluginOneRepositoryBuilder] + engines = [EngineOne] + engine_builders = [EngineOneBuilder] + message_handlers = [HandlerOne] diff --git a/src/test/unit/backend/engine/test_builder.py b/src/test/unit/backend/engine/test_builder.py index f3ba12f..4a06ca4 100644 --- a/src/test/unit/backend/engine/test_builder.py +++ b/src/test/unit/backend/engine/test_builder.py @@ -18,7 +18,7 @@ import unittest -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder from anomalydetection.backend.engine.builder import CADDetectorBuilder from anomalydetection.backend.engine.builder import EMADetectorBuilder from anomalydetection.backend.engine.builder import RobustDetectorBuilder @@ -29,7 +29,7 @@ class TestBaseBuilder(unittest.TestCase): def test_build(self): with self.assertRaises(NotImplementedError) as ctx: - builder = BaseBuilder() + builder = BaseEngineBuilder() builder.build() self.assertEqual(ctx.exception.args[0], diff --git a/src/test/unit/backend/entities/test_json_input_message_handler.py b/src/test/unit/backend/entities/test_json_input_message_handler.py index db8a3d4..e1b288e 100644 --- a/src/test/unit/backend/entities/test_json_input_message_handler.py +++ b/src/test/unit/backend/entities/test_json_input_message_handler.py @@ -21,7 +21,7 @@ from jsonschema import ValidationError -from anomalydetection.backend.entities.json_input_message_handler import \ +from anomalydetection.backend.entities.handlers.json import \ InputJsonMessageHandler diff --git a/src/test/unit/backend/interactor/test_base.py b/src/test/unit/backend/interactor/test_base.py index 98046ef..fc7f037 100644 --- a/src/test/unit/backend/interactor/test_base.py +++ b/src/test/unit/backend/interactor/test_base.py @@ -20,7 +20,7 @@ from typing import Any from anomalydetection.backend.engine import BaseEngine -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder from anomalydetection.backend.entities import BaseMessageHandler from anomalydetection.backend.entities.output_message import AnomalyResult from anomalydetection.backend.interactor import BaseEngineInteractor @@ -39,7 +39,7 @@ def predict(self, value: float, **kwargs) -> AnomalyResult: return AnomalyResult(-10, 10, 0.5, False) -class DummyEngineBuilder(BaseBuilder): +class DummyEngineBuilder(BaseEngineBuilder): def build(self) -> BaseEngine: return DummyEngine() diff --git a/src/test/unit/backend/interactor/test_batch_engine.py b/src/test/unit/backend/interactor/test_batch_engine.py index 6af760f..51bb502 100644 --- a/src/test/unit/backend/interactor/test_batch_engine.py +++ b/src/test/unit/backend/interactor/test_batch_engine.py @@ -24,7 +24,7 @@ from typing import Any from anomalydetection.backend.engine import BaseEngine -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder from anomalydetection.backend.entities import BaseMessageHandler from anomalydetection.backend.entities.output_message import AnomalyResult from anomalydetection.backend.interactor.batch_engine import BatchEngineInteractor @@ -52,7 +52,7 @@ def predict(self, value: float, **kwargs) -> AnomalyResult: math.pow(value, 0.5), bool(value % 2)) -class DummyEngineBuilder(BaseBuilder): +class DummyEngineBuilder(BaseEngineBuilder): def build(self) -> BaseEngine: return DummyEngine() diff --git a/src/test/unit/backend/interactor/test_stream_engine.py b/src/test/unit/backend/interactor/test_stream_engine.py index 406e8ce..0766b2e 100644 --- a/src/test/unit/backend/interactor/test_stream_engine.py +++ b/src/test/unit/backend/interactor/test_stream_engine.py @@ -24,7 +24,7 @@ from typing import Any from anomalydetection.backend.engine import BaseEngine -from anomalydetection.backend.engine.builder import BaseBuilder +from anomalydetection.backend.engine.builder import BaseEngineBuilder from anomalydetection.backend.entities import BaseMessageHandler from anomalydetection.backend.entities.output_message import AnomalyResult from anomalydetection.backend.interactor.stream_engine import StreamEngineInteractor @@ -62,7 +62,7 @@ def predict(self, value: float, **kwargs) -> AnomalyResult: math.pow(value, 0.5), bool(value % 2)) -class DummyEngineBuilder(BaseBuilder): +class DummyEngineBuilder(BaseEngineBuilder): def build(self) -> BaseEngine: return DummyEngine() diff --git a/src/test/unit/backend/repository/test_builder.py b/src/test/unit/backend/repository/test_builder.py index 28a1f7f..7f253df 100644 --- a/src/test/unit/backend/repository/test_builder.py +++ b/src/test/unit/backend/repository/test_builder.py @@ -18,7 +18,7 @@ import unittest -from anomalydetection.backend.repository.builder import BaseBuilder +from anomalydetection.backend.repository.builder import BaseRepositoryBuilder from anomalydetection.backend.repository.builder import SQLiteBuilder from anomalydetection.backend.repository.builder import RepositoryBuilderFactory from anomalydetection.backend.repository.sqlite import SQLiteRepository @@ -28,7 +28,7 @@ class TestBaseBuilder(unittest.TestCase): def test_build(self): with self.assertRaises(NotImplementedError) as ctx: - builder = BaseBuilder() + builder = BaseRepositoryBuilder() builder.build() self.assertEqual(str(ctx.exception), "To implement in child classes.") diff --git a/src/test/unit/common/test_config.py b/src/test/unit/common/test_config.py index a6a7051..1f92150 100644 --- a/src/test/unit/common/test_config.py +++ b/src/test/unit/common/test_config.py @@ -15,6 +15,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . + import os import unittest @@ -75,15 +76,14 @@ def test__get_stream_kafka(self): "source": { "type": "kafka", "params": { - "brokers": "localhost:9092", - "in": "in", - "out": "out", + "broker_servers": "localhost:9092", + "input_topic": "in", "group_id": "group_id" }, }, "aggregation": { - "function": "avg", - "window_millis": 60000 + "agg_function": "avg", + "agg_window_millis": 60000 } } ) @@ -100,10 +100,9 @@ def test__get_stream_pubsub(self): "source": { "type": "pubsub", "params": { - "project": "project-id", + "project_id": "project-id", "auth_file": "/dev/null", - "in": "in", - "out": "out", + "subscription": "in", }, } } @@ -111,6 +110,7 @@ def test__get_stream_pubsub(self): self.assertIsInstance(pubsub_stream, PubSubStreamConsumerBuilder) self.assertEqual(pubsub_stream.project_id, "project-id") self.assertEqual(pubsub_stream.subscription, "in") + self.assertEqual(pubsub_stream.auth_file, "/dev/null") def test__get_repository_sqlite(self): repository = self.config._get_repository( @@ -198,8 +198,8 @@ def test__get_sink_kafka(self): "stream": { "type": "kafka", "params": { - "brokers": "localhost:9092", - "out": "out" + "broker_servers": "localhost:9092", + "output_topic": "out" } } } @@ -215,8 +215,8 @@ def test__get_sink_pubsub(self): "stream": { "type": "pubsub", "params": { - "project": "project", - "out": "out" + "project_id": "project", + "output_topic": "out" } } } diff --git a/var/config-example.yml b/var/config-example.yml index 26feace..08ae628 100644 --- a/var/config-example.yml +++ b/var/config-example.yml @@ -7,8 +7,8 @@ streams: source: type: kafka params: - brokers: localhost:9092 - in: test1 + broker_servers: localhost:9092 + input_topic: test1 engine: type: robust params: @@ -26,8 +26,8 @@ streams: stream: type: kafka params: - brokers: localhost:9092 - out: test2 + broker_servers: localhost:9092 + output_topic: test2 warmup: - name: sqlite repository: