Skip to content

Commit

Permalink
Implementing plugins.
Browse files Browse the repository at this point in the history
  • Loading branch information
Cristòfol Torrens committed Jul 10, 2018
1 parent 8e84726 commit 37a9425
Show file tree
Hide file tree
Showing 32 changed files with 619 additions and 191 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ input messages must have the following JSON format:
}
```
You can check this implementation at [json_input_message_handler.py](src/anomalydetection/backend/entities/json_input_message_handler.py#29)
You can check this implementation at [json_input_message_handler.py](src/anomalydetection/backend/entities/handlers/json.py#29)
### Run
Expand Down
4 changes: 2 additions & 2 deletions src/anomalydetection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

import os

BASE_PATH = os.path.dirname(__file__)
BASE_PATH = os.path.dirname(__file__) # noqa: E402

__all__ = ('BASE_PATH', 'VERSION', 'BUILD', '__version__')
__all__ = ('BASE_PATH')
1 change: 1 addition & 0 deletions src/anomalydetection/anomdec.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from time import sleep

from anomalydetection.common import plugins # noqa: F401
from anomalydetection.backend.backend import main as backend_main
from anomalydetection.common.config import Config
from anomalydetection.common.logging import LoggingMixin
Expand Down
34 changes: 17 additions & 17 deletions src/anomalydetection/anomdec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ streams:
source:
type: kafka
params:
brokers: localhost:9092
in: test1
broker_servers: localhost:9092
input_topic: test1
# aggregation:
# function: avg
# window_millis: 30000
# agg_function: avg
# agg_window_millis: 30000
handler: json
engine:
type: cad
params:
Expand All @@ -51,8 +52,8 @@ streams:
stream:
type: kafka
params:
brokers: localhost:9092
out: test2
broker_servers: localhost:9092
output_topic: test2
warmup:
- name: sqlite
repository:
Expand All @@ -64,12 +65,10 @@ streams:
source:
type: pubsub
params:
project: testing
project_id: testing
auth_file: /dev/null
in: test10
# aggregation:
# function: avg
# window_millis: 30000
subscription: test10
handler: json
engine:
type: robust
params:
Expand All @@ -87,8 +86,8 @@ streams:
stream:
type: pubsub
params:
project: testing
out: test20
project_id: testing
output_topic: test20
warmup:
- name: sqlite
repository:
Expand All @@ -100,8 +99,9 @@ streams:
source:
type: kafka
params:
brokers: localhost:9092
in: test3
broker_servers: localhost:9092
input_topic: test3
handler: json
engine:
type: ema
params:
Expand All @@ -119,8 +119,8 @@ streams:
stream:
type: kafka
params:
brokers: localhost:9092
out: test4
broker_servers: localhost:9092
output_topic: test4
warmup:
- name: sqlite
repository:
Expand Down
12 changes: 4 additions & 8 deletions src/anomalydetection/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from anomalydetection.backend.sink.websocket import \
WebSocketSink
from anomalydetection.backend.interactor.stream_engine import StreamEngineInteractor
from anomalydetection.backend.sink.websocket import WebSocketSink
from anomalydetection.common.concurrency import Concurrency
from anomalydetection.common.config import Config
from anomalydetection.backend.entities.json_input_message_handler import \
InputJsonMessageHandler
from anomalydetection.backend.interactor.stream_engine import \
StreamEngineInteractor


def main(config: Config):

# Creates stream based on config env vars and a RobustDetector
def run_live_anomaly_detection(stream, engine_builder,
def run_live_anomaly_detection(stream, handler, engine_builder,
sinks, warmup, name):

# Add dashboard websocket as extra sink
Expand All @@ -44,7 +40,7 @@ def run_live_anomaly_detection(stream, engine_builder,
interactor = StreamEngineInteractor(
stream,
engine_builder,
InputJsonMessageHandler(),
handler,
sinks=sinks + extra_sink,
warm_up=warmup[0] if warmup else None)
interactor.run()
Expand Down
6 changes: 3 additions & 3 deletions src/anomalydetection/backend/devel_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ def build_producers(self):
pass

producers.append(
StreamBuilderFactory.get_pubsub_producer()
StreamBuilderFactory.get_producer_pubsub()
.set_project_id(project)
.set_output_topic("test10").build())
producers.append(
StreamBuilderFactory.get_kafka_producer()
StreamBuilderFactory.get_producer_kafka()
.set_broker_servers("localhost:9092")
.set_output_topic("test1").build())

producers.append(
StreamBuilderFactory.get_kafka_producer()
StreamBuilderFactory.get_producer_kafka()
.set_broker_servers("localhost:9092")
.set_output_topic("test3").build())

Expand Down
56 changes: 31 additions & 25 deletions src/anomalydetection/backend/engine/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import importlib
import sys
from collections import OrderedDict

Expand All @@ -25,7 +25,7 @@
from anomalydetection.backend.engine.robust_z_engine import RobustDetector


class BaseBuilder(object):
class BaseEngineBuilder(object):
"""
BaseBuilder, implement this to create Engine Builders.
"""
Expand All @@ -51,7 +51,7 @@ def raise_exception(*args, **kwargs):
self.__class__.__name__, func_name))


class CADDetectorBuilder(BaseBuilder):
class CADDetectorBuilder(BaseEngineBuilder):

type = "cad"

Expand Down Expand Up @@ -103,7 +103,7 @@ def build(self) -> CADDetector:
return CADDetector(**vars(self).copy())


class RobustDetectorBuilder(BaseBuilder):
class RobustDetectorBuilder(BaseEngineBuilder):

type = "robust"

Expand All @@ -123,7 +123,7 @@ def build(self) -> RobustDetector:
return RobustDetector(**vars(self).copy())


class EMADetectorBuilder(BaseBuilder):
class EMADetectorBuilder(BaseEngineBuilder):

type = "ema"

Expand All @@ -145,35 +145,36 @@ def build(self) -> EMADetector:

class EngineBuilderFactory(object):

engines = OrderedDict(
[
("robust", {
"key": "robust",
"name": "RobustDetector"
}),
("cad", {
"key": "cad",
"name": "CADDetector"
}),
("ema", {
"key": "ema",
"name": "EMADetector"
}),
]
)
engines = OrderedDict()

@classmethod
def register_engine(cls, key, class_name):
cls.engines[key] = {"key": key, "name": class_name}

@staticmethod
def get_plugin(name) -> BaseEngineBuilder:
module_name = "anomalydetection.backend.engine.{}_builder".format(name)
objects = vars(importlib.import_module(module_name))["_objects"]
for obj in objects:
if issubclass(obj, BaseEngineBuilder):
return obj()
raise NotImplementedError()

@staticmethod
def get(name) -> BaseBuilder:
def get(name) -> BaseEngineBuilder:
def raise_exception():
raise NotImplementedError()
func_name = "get_{}".format(name)
func = getattr(EngineBuilderFactory, func_name, raise_exception)
try:
return func()
except NotImplementedError as ex:
raise NotImplementedError(
"Calling undefined function: {}.{}()".format(
"EngineBuilderFactory", func_name))
try:
return EngineBuilderFactory.get_plugin(name)
except NotImplementedError as ex:
raise NotImplementedError(
"Calling undefined function: {}.{}()".format(
"EngineBuilderFactory", func_name))

@staticmethod
def get_robust() -> RobustDetectorBuilder:
Expand All @@ -186,3 +187,8 @@ def get_cad() -> CADDetectorBuilder:
@staticmethod
def get_ema() -> EMADetectorBuilder:
return EMADetectorBuilder()


EngineBuilderFactory.register_engine("robust", "RobustDetector")
EngineBuilderFactory.register_engine("cad", "CADDetector")
EngineBuilderFactory.register_engine("ema", "EMADetector")
Empty file.
54 changes: 54 additions & 0 deletions src/anomalydetection/backend/entities/handlers/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding:utf-8 -*- #
#
# Anomaly Detection Framework
# Copyright (C) 2018 Bluekiri BigData Team <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import importlib

from anomalydetection.backend.entities import BaseMessageHandler
from anomalydetection.backend.entities.handlers.json import InputJsonMessageHandler


class MessageHandlerFactory(object):

@staticmethod
def get_plugin(name) -> BaseMessageHandler:
module_name = "anomalydetection.backend.entities.handlers.{}".format(name)
objects = vars(importlib.import_module(module_name))["_objects"]
for obj in objects:
if issubclass(obj, BaseMessageHandler):
return obj()
raise NotImplementedError()

@staticmethod
def get(name) -> BaseMessageHandler:
def raise_exception():
raise NotImplementedError()
func_name = "get_{}".format(name)
func = getattr(MessageHandlerFactory, func_name, raise_exception)
try:
return func()
except NotImplementedError as ex:
try:
return MessageHandlerFactory.get_plugin(name)
except NotImplementedError as ex:
raise NotImplementedError(
"Calling undefined function: {}.{}()".format(
"StreamBuilderFactory", func_name))

@staticmethod
def get_json():
return InputJsonMessageHandler()
4 changes: 2 additions & 2 deletions src/anomalydetection/backend/interactor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from threading import Lock

from anomalydetection.backend.engine.builder import BaseBuilder
from anomalydetection.backend.engine.builder import BaseEngineBuilder
from anomalydetection.backend.entities import BaseMessageHandler


Expand All @@ -31,7 +31,7 @@ class BaseEngineInteractor(object):
"""

def __init__(self,
engine_builder: BaseBuilder,
engine_builder: BaseEngineBuilder,
message_handler: BaseMessageHandler) -> None:
"""
BaseEngineInteractor constructor
Expand Down
4 changes: 2 additions & 2 deletions src/anomalydetection/backend/interactor/batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from rx.core import Observable

from anomalydetection.backend.engine.builder import BaseBuilder
from anomalydetection.backend.engine.builder import BaseEngineBuilder
from anomalydetection.backend.entities import BaseMessageHandler
from anomalydetection.backend.entities.input_message import InputMessage
from anomalydetection.backend.entities.output_message import OutputMessage
Expand All @@ -37,7 +37,7 @@ class BatchEngineInteractor(BaseEngineInteractor, LoggingMixin):

def __init__(self,
batch: BaseObservable,
engine_builder: BaseBuilder,
engine_builder: BaseEngineBuilder,
message_handler: BaseMessageHandler) -> None:
"""
BatchEngineInteractor constructor
Expand Down
4 changes: 2 additions & 2 deletions src/anomalydetection/backend/interactor/stream_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from typing import List

from anomalydetection.backend.engine.builder import BaseBuilder
from anomalydetection.backend.engine.builder import BaseEngineBuilder
from anomalydetection.backend.entities import BaseMessageHandler
from anomalydetection.backend.entities.input_message import InputMessage
from anomalydetection.backend.entities.output_message import OutputMessage
Expand All @@ -35,7 +35,7 @@ class StreamEngineInteractor(BaseEngineInteractor, LoggingMixin):

def __init__(self,
stream: BaseConsumerBuilder,
engine_builder: BaseBuilder,
engine_builder: BaseEngineBuilder,
message_handler: BaseMessageHandler,
sinks: List[BaseSink] = list(),
warm_up: BaseObservable = None) -> None:
Expand Down
Loading

0 comments on commit 37a9425

Please sign in to comment.