Skip to content

Commit

Permalink
Writing docs, some fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Cristòfol Torrens committed Jul 10, 2018
1 parent 47738ae commit 8e84726
Show file tree
Hide file tree
Showing 18 changed files with 211 additions and 57 deletions.
1 change: 1 addition & 0 deletions build.info
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dev1
51 changes: 41 additions & 10 deletions doc/source/configuration.rst
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
.. _configuration:

Configuration
=============

Configuration is statically defined in a YAML file. This section will explain
Configuration is statically defined in a YAML file. The application reads
``anomdec.yml`` from ``$ANOMDEC_HOME`` path. The following section describes
this file structure.

streams
*******

``Streams`` is the main section of this file. It's a named list that defines how
is a signal processed.

source / engine
~~~~~~~~~~~~~~~

Streams is the main section of this file. It's a named list that defines how is
a signal processed. It has two required sections, the ``source`` and the ``engine``
It has two required sections, the ``source`` and the ``engine``
This is the minimal configuration to start processing signals but, at this point
we are not persisting the result.

Expand All @@ -23,19 +28,32 @@ we are not persisting the result.
sink
~~~~

To persist the result we need to add a ``sink``
configuration section.
To persist the result we need to add a ``sink`` configuration section. This can
be a list of *sinks*.

.. literalinclude:: var/config-example.yml
:linenos:
:language: yaml
:lines: 1-2,5-30

warmup
~~~~~~

``warmup`` section has two roles, the first is to be used to *warm up* the
engine before starting making predictions. The second one is to make the data
accessible from the dashboard to visualize it. We will define a ``warmup``
configuration section with one ``repository`` that is also used in ``sink``.

.. literalinclude:: var/config-example.yml
:linenos:
:language: yaml
:lines: 1-2,5-23
:lines: 1-2,5-

repository
~~~~~~~~~~

Repository section could be found in ``sink`` and in ``warmup`` sections, it defines
an storage backend that is supported by :any:`BaseSink` implementations,
Repository section could be found in ``sink`` and in ``warmup`` sections, it
defines an storage backend that is supported by :any:`BaseSink` implementations,
:any:`RepositorySink` and :any:`ObservableRepository` respectively.


Expand All @@ -61,8 +79,11 @@ dashboard. This allows to update dashboard plots in realtime.
:language: yaml
:lines: 3

Full configuration file
***********************
Example configuration file
**************************

anomdec.yml
~~~~~~~~~~~

A full example configuration. This configuration reflects a full message flow
reading from a kafka broker, processing with robust detector warmed up with the
Expand All @@ -72,3 +93,13 @@ same repository that persists the output.
:linenos:
:language: yaml
:lines: 1-

diagram
~~~~~~~

Here it is a diagram that represents the full configuration file. We can see
that the output of the engine could be *sinked* to a repository and to an
streaming system to visualize and react for anomalies, and is also used to
*warm up* the engine in case of restart or failure.

.. image:: var/config-example.svg
59 changes: 59 additions & 0 deletions doc/source/deploy.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
Deploy
======

.. important::
Before continue, please read the :ref:`configuration` section, so you will
need to create a configuration file to start distinct components of the
framework.

Backend
*******

Start only the backend for the given configuration. This allows you to split the
configuration in small streams chunks to deploy independently to scale
horizontally.

.. code-block:: bash
export ANOMDEC_HOME=~/anomdec
anomdec backend
.. code-block:: text
- anomalydetection.anomdec.Anomdec:35 - INFO - Starting anomdec
- anomalydetection.anomdec.Anomdec:44 - INFO - Run backend
...
Dashboard
*********

Start only the dashboard for the given configuration. Using a unique config
file you are able to visualize all streams.

.. code-block:: bash
export ANOMDEC_HOME=~/anomdec
anomdec dashboard
.. code-block:: text
- anomalydetection.anomdec.Anomdec:35 - INFO - Starting anomdec
- anomalydetection.anomdec.Anomdec:41 - INFO - Run dashboard
...
Embedded
********

You can start it embedded. This will create a subprocess in the dashboard
instance to process the streams.

.. code-block:: bash
export ANOMDEC_HOME=~/anomdec
anomdec
.. code-block:: text
- anomalydetection.anomdec.Anomdec:35 - INFO - Starting anomdec
- anomalydetection.anomdec.Anomdec:38 - INFO - Run dashboard/backend embedded
...
37 changes: 10 additions & 27 deletions doc/source/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ The installation is quick and simple using pip.
# Install from pypi using pip
pip install anomalydetection
Dashboard/Sandbox
*****************
Sandbox
*******

After installation, you can start a dashboard instance and play with
the sandbox, there is no configuration required to do this.
Expand All @@ -40,6 +40,7 @@ You will get an output similar to this one.
Ignore the Config ``WARNING`` log if you are following this *Getting started* guide.
This is because we have not configured any signal to process yet.


Devel mode
**********

Expand All @@ -65,31 +66,13 @@ You should see how messages are pushed to Kafka and PubSub backends to ``stdout`

.. code-block:: text
2018-07-09 13:00:19,479 - anomalydetection.anomdec.Anomdec:35 - INFO - Starting anomdec
2018-07-09 13:00:19,480 - anomalydetection.anomdec.Anomdec:53 - INFO - Creating configuration
2018-07-09 13:00:24,496 - anomalydetection.anomdec.Anomdec:57 - INFO - Run dashboard, backend and producer
2018-07-09 13:00:24,679 - anomalydetection.backend.devel_mode.DevelConfigWrapper:50 - ERROR - Cannot load configuration.
[Errno 2] No such file or directory: '~/anomdec/anomdec.yml'
2018-07-09 13:00:25,798 - kafka.consumer.subscription_state:165 - WARNING - subscription unchanged by change_subscription(['test1'])
2018-07-09 13:00:26,909 - kafka.consumer.subscription_state:165 - WARNING - subscription unchanged by change_subscription(['test3'])
2018-07-09 13:00:26,927 - anomalydetection.backend.interactor.stream_engine.StreamEngineInteractor:81 - INFO - Warm up completed.
2018-07-09 13:00:26,930 - anomalydetection.backend.interactor.stream_engine.StreamEngineInteractor:81 - INFO - Warm up completed.
2018-07-09 13:00:26,934 - anomalydetection.backend.interactor.stream_engine.StreamEngineInteractor:81 - INFO - Warm up completed.
2018-07-09 13:00:26,938 - anomalydetection.backend.stream.kafka.KafkaStreamConsumer:68 - DEBUG - Polling messages (auto ack). START
2018-07-09 13:00:26,941 - anomalydetection.backend.stream.kafka.KafkaStreamConsumer:68 - DEBUG - Polling messages (auto ack). START
2018-07-09 13:00:27,439 - kafka.coordinator.assignors.range:47 - WARNING - No partition metadata for topic test3
2018-07-09 13:00:29,891 - anomalydetection.backend.stream.kafka.KafkaStreamProducer:106 - DEBUG - Pushing message: {"application": "devel0", "ts": "2018-07-09 13:00:29.887453", "value": 1}.
2018-07-09 13:00:29,895 - anomalydetection.backend.stream.kafka.KafkaStreamProducer:106 - DEBUG - Pushing message: {"application": "devel0", "ts": "2018-07-09 13:00:29.887075", "value": 2}.
2018-07-09 13:00:29,895 - anomalydetection.backend.stream.kafka.KafkaStreamProducer:106 - DEBUG - Pushing message: {"application": "devel1", "ts": "2018-07-09 13:00:29.895098", "value": 1}.
2018-07-09 13:00:29,900 - anomalydetection.backend.stream.pubsub.PubSubStreamProducer:121 - DEBUG - Pushing message: {"application": "devel0", "ts": "2018-07-09 13:00:29.886520", "value": 1}.
2018-07-09 13:00:29,900 - anomalydetection.backend.stream.kafka.KafkaStreamProducer:106 - DEBUG - Pushing message: {"application": "devel1", "ts": "2018-07-09 13:00:29.897012", "value": 3}.
2018-07-09 13:00:29,901 - anomalydetection.backend.stream.kafka.KafkaStreamProducer:106 - DEBUG - Pushing message: {"application": "devel2", "ts": "2018-07-09 13:00:29.896385", "value": 9}.
2018-07-09 13:00:29,903 - anomalydetection.backend.stream.kafka.KafkaStreamProducer:106 - DEBUG - Pushing message: {"application": "devel2", "ts": "2018-07-09 13:00:29.901965", "value": 2}.
2018-07-09 13:00:29,905 - anomalydetection.backend.stream.pubsub.PubSubStreamProducer:121 - DEBUG - Pushing message: {"application": "devel1", "ts": "2018-07-09 13:00:29.902405", "value": 2}.
2018-07-09 13:00:29,906 - anomalydetection.backend.stream.pubsub.PubSubStreamProducer:121 - DEBUG - Pushing message: {"application": "devel2", "ts": "2018-07-09 13:00:29.905861", "value": 3}.
2018-07-09 13:00:29,977 - anomalydetection.backend.stream.pubsub.PubSubStreamConsumer:70 - DEBUG - Message received: {"application": "devel0", "ts": "2018-07-09 13:00:29.886520", "value": 1}
2018-07-09 13:00:29,979 - anomalydetection.backend.stream.pubsub.PubSubStreamConsumer:70 - DEBUG - Message received: {"application": "devel2", "ts": "2018-07-09 13:00:29.905861", "value": 3}
2018-07-09 13:00:29,979 - anomalydetection.backend.stream.pubsub.PubSubStreamConsumer:70 - DEBUG - Message received: {"application": "devel1", "ts": "2018-07-09 13:00:29.902405", "value": 2}
- anomalydetection.anomdec.Anomdec:35 - INFO - Starting anomdec
- anomalydetection.anomdec.Anomdec:53 - INFO - Creating configuration for DEVEL MODE
- anomalydetection.anomdec.Anomdec:57 - INFO - Run dashboard, backend and producer
...
- anomalydetection.backend.stream.pubsub.PubSubStreamConsumer:70 - DEBUG - Message received: {"application": "devel0", "ts": "2018-07-09 13:00:29.886520", "value": 1}
- anomalydetection.backend.stream.pubsub.PubSubStreamConsumer:70 - DEBUG - Message received: {"application": "devel2", "ts": "2018-07-09 13:00:29.905861", "value": 3}
- anomalydetection.backend.stream.pubsub.PubSubStreamConsumer:70 - DEBUG - Message received: {"application": "devel1", "ts": "2018-07-09 13:00:29.902405", "value": 2}
Open the `dashboard
<http://localhost:5000/signals>`_
Expand Down
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Features
license
getting-started
configuration
deploy
api-reference

Indices and tables
Expand Down
16 changes: 15 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,24 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import os

from setuptools import find_packages, setup


def _read_from_file(_file):
try:
_file = os.path.join(os.path.dirname(__file__), _file)
return open(_file, 'r').read().strip().split('.')
except Exception as _:
return []


VERSION = _read_from_file('version.info')
BUILD = _read_from_file('build.info')

__version__ = '.'.join([str(v) for v in VERSION + BUILD])

install_require = [
# Core dependencies
"Rx==1.6.1", "jsonschema==2.6.0", "python-dateutil==2.1", "scipy==1.1.0",
Expand All @@ -46,7 +60,7 @@

setup(
name='anomalydetection',
version='0.0.0.dev0',
version=__version__,
description='Anomaly detection bridge',
url='',
zip_safe=False,
Expand Down
2 changes: 1 addition & 1 deletion src/anomalydetection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@

BASE_PATH = os.path.dirname(__file__)

__all__ = ('BASE_PATH', )
__all__ = ('BASE_PATH', 'VERSION', 'BUILD', '__version__')
1 change: 1 addition & 0 deletions src/anomalydetection/anomdec.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def run(self):
self.logger.info("Starting anomdec")
try:
if len(sys.argv) == 1:
self.logger.info("Run dashboard/backend embedded")
dashboard_main([backend_main], Config())
elif len(sys.argv) == 2:
if sys.argv[1] == "dashboard":
Expand Down
11 changes: 8 additions & 3 deletions src/anomalydetection/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@ def run_live_anomaly_detection(stream, engine_builder,
warm_up=warmup[0] if warmup else None)
interactor.run()

pids = []
for name, item in config.get_as_dict().items():
item_list = list(item)
item_list.append(name)
Concurrency.run_thread(target=run_live_anomaly_detection,
args=tuple(item_list),
name="Detector {}".format(name))
pid = Concurrency.run_process(target=run_live_anomaly_detection,
args=tuple(item_list),
name="Detector {}".format(name))
pids.append(pid)

for pid in pids:
Concurrency.get_process(pid).join()


if __name__ == "__main__":
Expand Down
12 changes: 6 additions & 6 deletions src/anomalydetection/backend/interactor/stream_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,26 @@
from anomalydetection.backend.sink import BaseSink
from anomalydetection.backend.stream import BaseStreamAggregation
from anomalydetection.backend.stream import AggregationFunction
from anomalydetection.backend.stream import BaseStreamConsumer
from anomalydetection.backend.stream import BaseObservable
from anomalydetection.backend.stream.builder import BaseConsumerBuilder
from anomalydetection.common.logging import LoggingMixin


class StreamEngineInteractor(BaseEngineInteractor, LoggingMixin):

def __init__(self,
stream: BaseStreamConsumer,
stream: BaseConsumerBuilder,
engine_builder: BaseBuilder,
message_handler: BaseMessageHandler,
sinks: List[BaseSink] = list(),
warm_up: BaseObservable = None) -> None:
super().__init__(engine_builder, message_handler)
self.stream = stream
self.stream = stream.build()
self.sinks = sinks
self.warm_up = warm_up
if isinstance(stream, BaseStreamAggregation):
self.agg_function = stream.agg_function
self.agg_window_millis = stream.agg_window_millis
if isinstance(self.stream, BaseStreamAggregation):
self.agg_function = self.stream.agg_function
self.agg_window_millis = self.stream.agg_window_millis
else:
self.agg_function = AggregationFunction.NONE
self.agg_window_millis = 0
Expand Down
20 changes: 18 additions & 2 deletions src/anomalydetection/backend/stream/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self,
broker_servers: str = None,
input_topic: str = None,
group_id: str = str(uuid.uuid4()),
agg_function: AggregationFunction = None,
agg_function: AggregationFunction = AggregationFunction.NONE,
agg_window_millis: int = 0) -> None:
super().__init__()
self.broker_servers = broker_servers
Expand Down Expand Up @@ -85,6 +85,14 @@ def build(self) -> BaseStreamConsumer:
del args["agg_window_millis"]
return KafkaStreamConsumer(**args)

def __str__(self) -> str:
return "Kafka topic: " \
"brokers: {}, topic: {}, func: {}, window: {}ms".format(
self.broker_servers,
self.input_topic,
self.agg_function,
self.agg_window_millis)


class KafkaStreamProducerBuilder(BaseProducerBuilder):

Expand Down Expand Up @@ -113,7 +121,7 @@ def __init__(self,
project_id: str = None,
subscription: str = None,
auth_file: str = None,
agg_function: AggregationFunction = None,
agg_function: AggregationFunction = AggregationFunction.NONE,
agg_window_millis: int = 0) -> None:
super().__init__()
self.project_id = project_id
Expand Down Expand Up @@ -151,6 +159,14 @@ def build(self) -> BaseStreamConsumer:
del args["agg_window_millis"]
return PubSubStreamConsumer(**args)

def __str__(self) -> str:
return "PubSub subscription: " \
"project: {}, subscription: {}, func: {}, window: {}ms".format(
self.project_id,
self.subscription,
self.agg_function,
self.agg_window_millis)


class PubSubStreamProducerBuilder(BaseProducerBuilder):

Expand Down
2 changes: 1 addition & 1 deletion src/anomalydetection/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_streams(self):
streams = []
for item in self.config["streams"]:
builder = self._get_stream(item)
streams.append(builder.build() if builder else builder)
streams.append(builder if builder else builder)
return streams

def _get_stream(self, item):
Expand Down
Loading

0 comments on commit 8e84726

Please sign in to comment.