From 87c757dcdec8b245b00a47b2dc4b4f9c2a8464a3 Mon Sep 17 00:00:00 2001 From: Gillian Basso Date: Tue, 4 Jul 2017 10:38:22 +0200 Subject: [PATCH] first public commit, v0.2.5.dev1 --- .gitignore | 168 +++++++++++ HOWTO | 12 + LICENSE | 202 +++++++++++++ MANIFEST.in | 6 + README.md | 97 ++++++ obnl/__init__.py | 2 + obnl/client.py | 65 ++++ obnl/impl/__init__.py | 1 + obnl/impl/loaders.py | 88 ++++++ obnl/impl/message.py | 21 ++ obnl/impl/message/__init__.py | 17 ++ obnl/impl/message/default_pb2.py | 502 +++++++++++++++++++++++++++++++ obnl/impl/node.py | 272 +++++++++++++++++ obnl/impl/server.py | 177 +++++++++++ obnl/server.py | 14 + requirements.txt | 4 + setup.cfg | 34 +++ setup.py | 27 ++ test/data/initobnl.json | 48 +++ test/data/schedule.json | 4 + test/testreceive.py | 33 ++ 21 files changed, 1794 insertions(+) create mode 100644 .gitignore create mode 100644 HOWTO create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 README.md create mode 100644 obnl/__init__.py create mode 100644 obnl/client.py create mode 100644 obnl/impl/__init__.py create mode 100644 obnl/impl/loaders.py create mode 100644 obnl/impl/message.py create mode 100644 obnl/impl/message/__init__.py create mode 100644 obnl/impl/message/default_pb2.py create mode 100644 obnl/impl/node.py create mode 100644 obnl/impl/server.py create mode 100644 obnl/server.py create mode 100644 requirements.txt create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 test/data/initobnl.json create mode 100644 test/data/schedule.json create mode 100644 test/testreceive.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4b31d5d --- /dev/null +++ b/.gitignore @@ -0,0 +1,168 @@ +# Created by .ignore support plugin (hsz.mobi) +### VirtualEnv template +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +.Python +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +.venv +pip-selfcheck.json +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Example user template template +### Example user template + +# IntelliJ project files +.idea +*.iml +out +gen### Ada template +# Object file +*.o + +# Ada Library Information +*.ali + +.idea/ diff --git a/HOWTO b/HOWTO new file mode 100644 index 0000000..14c754d --- /dev/null +++ b/HOWTO @@ -0,0 +1,12 @@ +OBNL is a full python project thus as long as Python is installed on your +system you can install it by moving in the root folder (the folder this README +file should be) and run: + + python setup.py install + +In some systems you need Administrator right to run this command. + +Warning: OBNL requires these packages to be used in full: + + * pika + * protobuf \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..9c0527a --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,6 @@ +include *.pyDuring the first development and tests, we realise that it was very complicated +to match the requirement of OBN. Therefore we decide to realise a co-simulator based on +OBN - an "OpenBuildNet Like" co-simulator. + +include *.json +include requirements.txt \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0b9adfd --- /dev/null +++ b/README.md @@ -0,0 +1,97 @@ +Table of Contents +=== + 0. Foreword + 1. Synopsis + 2. Latest Version + 3. Installation + 4. Documentation + 5. Bug Reporting + 6. Contributors + 7. Contacts + 8. License + 9. Copyright + +Foreword +=== + +As OBNL uses AMQP/MQTT protocol (with pika), a server SHALL be running. If docker is +installed the following command starts a RabbitMQ server: + + docker run -d --hostname my-rabbit -p 5672:5672 --name some-rabbit rabbitmq:alpine + +Synopsis +=== +The main purpose of OBNL is simulator communication to simply realise a co-simulation. + +Latest Version +=== +You can find the latest version of OBNL on: + https://github.com/ppuertocrem/obnl + + +Installation +=== +OBNL is a full python project thus as long as Python is installed on your +system you can install it by moving in the root folder (the folder this README +file should be) and run: + + python setup.py install + +In some systems you need Administrator right to run this command. + +Warning: OBNL requires these packages to be used in full: + + * pika + * protobuf + + +Documentation +=== +Currently, the documentation is only accessible in source code. + + +Bug Reporting +=== +If you find any bugs, or if you want new features you can put your request on +github at the following address: + + https://github.com/IntegrCiTy/obnl + + +Contributors +=== + +The OBNL Team is currently composed of: + + * Pablo Puerto (pablo.puerto@crem.ch) + * Gillian Basso (gillian.basso@hevs.ch) + * Jessen Page (jessen.page@hevs.ch) + + +Contacts +=== +For questions, bug reports, patches and new elements / modules, please use the Bug Reporting. + + +License +=== +You should have received a copy of the Apache License Version 2.0 along with +this program. +If not, see . + + +Copyright +=== +Copyright 2017 The OBNL Team + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/obnl/__init__.py b/obnl/__init__.py new file mode 100644 index 0000000..5c248b5 --- /dev/null +++ b/obnl/__init__.py @@ -0,0 +1,2 @@ +import pkg_resources # part of setuptools +__version__ = pkg_resources.require("obnl")[0].version \ No newline at end of file diff --git a/obnl/client.py b/obnl/client.py new file mode 100644 index 0000000..d267134 --- /dev/null +++ b/obnl/client.py @@ -0,0 +1,65 @@ +from threading import Thread +from obnl.impl.node import ClientNode as _ClientNodeImpl + + +class ClientNode(object): + + def __init__(self, host, name, input_attributes=None, output_attributes=None, is_first=False): + self._node_impl = _ClientNodeImpl(host, name, self, input_attributes, output_attributes, is_first) + + @property + def name(self): + """ + + :return: the node name. It is the ID of the Node inside the simulation + """ + return self._node_impl.name + + @property + def input_values(self): + """ + + :return: a map of input values. The keys are the input attributes + """ + return self._node_impl.input_values + + @property + def input_attributes(self): + """ + + :return: the list of input attributes + """ + return self._node_impl.input_attributes + + @property + def output_attributes(self): + """ + + :return: the list of output attributes + """ + return self._node_impl.output_attributes + + def start(self): + """ + Starts the listening + """ + Thread(target=self._node_impl.start).start() + + def step(self, current_time, time_step): + """ + Abstract function to be implemented by children. + This function is called once per Node per simulation step. + + :param current_time: the current time of the simulation + :param time_step: the time step from the last call of this function + """ + raise NotImplementedError('Abstract function call from '+str(self.__class__)) + + def update_attribute(self, attr, value): + """ + Sends the new attribute value to those who want to know. + + :param attr: the attribute to communicate + :param value: the new value of the attribute + """ + self._node_impl.update_attribute(attr, value) diff --git a/obnl/impl/__init__.py b/obnl/impl/__init__.py new file mode 100644 index 0000000..b5ddc7a --- /dev/null +++ b/obnl/impl/__init__.py @@ -0,0 +1 @@ +__version__ = 0.1 \ No newline at end of file diff --git a/obnl/impl/loaders.py b/obnl/impl/loaders.py new file mode 100644 index 0000000..9a46a01 --- /dev/null +++ b/obnl/impl/loaders.py @@ -0,0 +1,88 @@ +import json + + +class Loader(object): + """ + Base class of every Loaders + """ + + def __init__(self, scheduler): + """ + + :param host: the scheduler + """ + self._scheduler = scheduler + self._nodes = [] + self._links = [] + + def get_nodes(self): + """ + + :return: the loaded nodes or an empty list + """ + return self._nodes + + def get_links(self): + """ + + :return: the loaded links or an empty list + """ + return self._links + + +class JSONLoader(Loader): + """ + A JSON Loader that can load data which follows the structure: + { + "nodes":{ + "NodeName1":{ + "inputs": [list of inputs] + "outputs": [list of outputs] + }, + ... + } + "links":{ + "LinkName1":{ + "out":{ + "node": "NameNodeN" # MUST be is "nodes" + "attr": "AttributeName" + }, + "in":{ + "node": "NameNodeN" # MUST be is "nodes" + "attr": "AttributeName" + } + }, + ... + } + } + """ + def __init__(self, scheduler, config_file): + super(JSONLoader, self).__init__(scheduler) + + # load the data from json file + with open(config_file) as jsonfile: + config_data = json.loads(jsonfile.read()) + + # load the nodes + self._prepare_nodes(config_data['nodes']) + # then the links + self._prepare_links(config_data['links']) + + def _find_in_nodes(self, str_node): + for node in self._nodes: + if str_node == node: + return node + + def _prepare_nodes(self, nodes): + for name, data in nodes.items(): + self._nodes.append(name) + + def _prepare_links(self, links): + + for name, data in links.items(): + in_data = data["in"] + out_data = data["out"] + in_node = self._find_in_nodes(in_data['node']) + out_node = self._find_in_nodes(out_data['node']) + + self._scheduler.create_data_link(out_node, out_data['attr'], in_node, in_data['attr']) diff --git a/obnl/impl/message.py b/obnl/impl/message.py new file mode 100644 index 0000000..a9078c0 --- /dev/null +++ b/obnl/impl/message.py @@ -0,0 +1,21 @@ +# loads Protobuf messages (*_pb2.py) +import importlib +import os + +if not __name__ == '__main__': + + PROTBUF_EXT = '_pb2.py' + PACKAGE_SEPARATOR = '.' + + dir_path = os.path.dirname(os.path.realpath(__file__)) + only_protbuf_files = [f + for f in os.listdir(dir_path) + if os.path.isfile(os.path.join(dir_path, f)) and f.endswith(PROTBUF_EXT) + ] + + for opf in only_protbuf_files: + importlib.import_module(os.path.splitext(__name__)[0] + + PACKAGE_SEPARATOR + + os.path.splitext(opf)[0]) + temp = importlib.machinery.SourceFileLoader(opf, os.path.join(dir_path, opf)).load_module() + globals().update(temp.__dict__) diff --git a/obnl/impl/message/__init__.py b/obnl/impl/message/__init__.py new file mode 100644 index 0000000..0db2519 --- /dev/null +++ b/obnl/impl/message/__init__.py @@ -0,0 +1,17 @@ +# loads Protobuf messages (*_pb2.py) +import importlib +import os + +PROTBUF_EXT = '_pb2.py' +PACKAGE_SEPARATOR = '.' + +dir_path = os.path.dirname(os.path.realpath(__file__)) +only_protbuf_files = [f + for f in os.listdir(dir_path) + if os.path.isfile(os.path.join(dir_path, f)) and f.endswith(PROTBUF_EXT) + ] + +for opf in only_protbuf_files: + importlib.import_module(__name__ + PACKAGE_SEPARATOR + os.path.splitext(opf)[0]) + temp = importlib.machinery.SourceFileLoader(opf, os.path.join(dir_path, opf)).load_module() + globals().update(temp.__dict__) \ No newline at end of file diff --git a/obnl/impl/message/default_pb2.py b/obnl/impl/message/default_pb2.py new file mode 100644 index 0000000..e76fb85 --- /dev/null +++ b/obnl/impl/message/default_pb2.py @@ -0,0 +1,502 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: data/default.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='data/default.proto', + package='obnl.common', + syntax='proto3', + serialized_pb=_b('\n\x12\x64\x61ta/default.proto\x12\x0bobnl.common\x1a\x19google/protobuf/any.proto\"\x0c\n\nSystemInit\"\x8e\x01\n\x13SimulatorConnection\x12=\n\x04type\x18\x01 \x01(\x0e\x32/.obnl.common.SimulatorConnection.SimulationType\"8\n\x0eSimulationType\x12\t\n\x05OTHER\x10\x00\x12\x0b\n\x07THERMAL\x10\x01\x12\x0e\n\nELECTRICAL\x10\x02\"\x9e\x02\n\x13SchedulerConnection\x12K\n\x0einitial_values\x18\x01 \x03(\x0b\x32\x33.obnl.common.SchedulerConnection.InitialValuesEntry\x12M\n\x0f\x61ttribute_links\x18\x02 \x03(\x0b\x32\x34.obnl.common.SchedulerConnection.AttributeLinksEntry\x1a\x34\n\x12InitialValuesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x02:\x02\x38\x01\x1a\x35\n\x13\x41ttributeLinksEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\\\n\x10\x41ttributeMessage\x12\x17\n\x0fsimulation_time\x18\x01 \x01(\x02\x12\x16\n\x0e\x61ttribute_name\x18\x02 \x01(\t\x12\x17\n\x0f\x61ttribute_value\x18\x03 \x01(\x02\"3\n\x08NextStep\x12\x11\n\ttime_step\x18\x01 \x01(\x02\x12\x14\n\x0c\x63urrent_time\x18\x02 \x01(\x02\"\xd5\x01\n\x0bMetaMessage\x12\x11\n\tnode_name\x18\x01 \x01(\t\x12\x32\n\x04type\x18\x02 \x01(\x0e\x32$.obnl.common.MetaMessage.MessageType\x12%\n\x07\x64\x65tails\x18\x04 \x01(\x0b\x32\x14.google.protobuf.Any\"X\n\x0bMessageType\x12\x08\n\x04STEP\x10\x00\x12\x08\n\x04INIT\x10\x01\x12\x0c\n\x08UPDATE_X\x10\x02\x12\x0c\n\x08UPDATE_Y\x10\x03\x12\r\n\tATTRIBUTE\x10\x04\x12\n\n\x06\x41NSWER\x10\x05\"\x06\n\x04Quitb\x06proto3') + , + dependencies=[google_dot_protobuf_dot_any__pb2.DESCRIPTOR,]) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + +_SIMULATORCONNECTION_SIMULATIONTYPE = _descriptor.EnumDescriptor( + name='SimulationType', + full_name='obnl.common.SimulatorConnection.SimulationType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='OTHER', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='THERMAL', index=1, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ELECTRICAL', index=2, number=2, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=163, + serialized_end=219, +) +_sym_db.RegisterEnumDescriptor(_SIMULATORCONNECTION_SIMULATIONTYPE) + +_METAMESSAGE_MESSAGETYPE = _descriptor.EnumDescriptor( + name='MessageType', + full_name='obnl.common.MetaMessage.MessageType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='STEP', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='INIT', index=1, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='UPDATE_X', index=2, number=2, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='UPDATE_Y', index=3, number=3, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ATTRIBUTE', index=4, number=4, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ANSWER', index=5, number=5, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=783, + serialized_end=871, +) +_sym_db.RegisterEnumDescriptor(_METAMESSAGE_MESSAGETYPE) + + +_SYSTEMINIT = _descriptor.Descriptor( + name='SystemInit', + full_name='obnl.common.SystemInit', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=62, + serialized_end=74, +) + + +_SIMULATORCONNECTION = _descriptor.Descriptor( + name='SimulatorConnection', + full_name='obnl.common.SimulatorConnection', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='type', full_name='obnl.common.SimulatorConnection.type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _SIMULATORCONNECTION_SIMULATIONTYPE, + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=77, + serialized_end=219, +) + + +_SCHEDULERCONNECTION_INITIALVALUESENTRY = _descriptor.Descriptor( + name='InitialValuesEntry', + full_name='obnl.common.SchedulerConnection.InitialValuesEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='obnl.common.SchedulerConnection.InitialValuesEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='value', full_name='obnl.common.SchedulerConnection.InitialValuesEntry.value', index=1, + number=2, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=401, + serialized_end=453, +) + +_SCHEDULERCONNECTION_ATTRIBUTELINKSENTRY = _descriptor.Descriptor( + name='AttributeLinksEntry', + full_name='obnl.common.SchedulerConnection.AttributeLinksEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='obnl.common.SchedulerConnection.AttributeLinksEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='value', full_name='obnl.common.SchedulerConnection.AttributeLinksEntry.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=455, + serialized_end=508, +) + +_SCHEDULERCONNECTION = _descriptor.Descriptor( + name='SchedulerConnection', + full_name='obnl.common.SchedulerConnection', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='initial_values', full_name='obnl.common.SchedulerConnection.initial_values', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='attribute_links', full_name='obnl.common.SchedulerConnection.attribute_links', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[_SCHEDULERCONNECTION_INITIALVALUESENTRY, _SCHEDULERCONNECTION_ATTRIBUTELINKSENTRY, ], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=222, + serialized_end=508, +) + + +_ATTRIBUTEMESSAGE = _descriptor.Descriptor( + name='AttributeMessage', + full_name='obnl.common.AttributeMessage', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='simulation_time', full_name='obnl.common.AttributeMessage.simulation_time', index=0, + number=1, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='attribute_name', full_name='obnl.common.AttributeMessage.attribute_name', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='attribute_value', full_name='obnl.common.AttributeMessage.attribute_value', index=2, + number=3, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=510, + serialized_end=602, +) + + +_NEXTSTEP = _descriptor.Descriptor( + name='NextStep', + full_name='obnl.common.NextStep', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='time_step', full_name='obnl.common.NextStep.time_step', index=0, + number=1, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='current_time', full_name='obnl.common.NextStep.current_time', index=1, + number=2, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=604, + serialized_end=655, +) + + +_METAMESSAGE = _descriptor.Descriptor( + name='MetaMessage', + full_name='obnl.common.MetaMessage', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='node_name', full_name='obnl.common.MetaMessage.node_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='type', full_name='obnl.common.MetaMessage.type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='details', full_name='obnl.common.MetaMessage.details', index=2, + number=4, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _METAMESSAGE_MESSAGETYPE, + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=658, + serialized_end=871, +) + + +_QUIT = _descriptor.Descriptor( + name='Quit', + full_name='obnl.common.Quit', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=873, + serialized_end=879, +) + +_SIMULATORCONNECTION.fields_by_name['type'].enum_type = _SIMULATORCONNECTION_SIMULATIONTYPE +_SIMULATORCONNECTION_SIMULATIONTYPE.containing_type = _SIMULATORCONNECTION +_SCHEDULERCONNECTION_INITIALVALUESENTRY.containing_type = _SCHEDULERCONNECTION +_SCHEDULERCONNECTION_ATTRIBUTELINKSENTRY.containing_type = _SCHEDULERCONNECTION +_SCHEDULERCONNECTION.fields_by_name['initial_values'].message_type = _SCHEDULERCONNECTION_INITIALVALUESENTRY +_SCHEDULERCONNECTION.fields_by_name['attribute_links'].message_type = _SCHEDULERCONNECTION_ATTRIBUTELINKSENTRY +_METAMESSAGE.fields_by_name['type'].enum_type = _METAMESSAGE_MESSAGETYPE +_METAMESSAGE.fields_by_name['details'].message_type = google_dot_protobuf_dot_any__pb2._ANY +_METAMESSAGE_MESSAGETYPE.containing_type = _METAMESSAGE +DESCRIPTOR.message_types_by_name['SystemInit'] = _SYSTEMINIT +DESCRIPTOR.message_types_by_name['SimulatorConnection'] = _SIMULATORCONNECTION +DESCRIPTOR.message_types_by_name['SchedulerConnection'] = _SCHEDULERCONNECTION +DESCRIPTOR.message_types_by_name['AttributeMessage'] = _ATTRIBUTEMESSAGE +DESCRIPTOR.message_types_by_name['NextStep'] = _NEXTSTEP +DESCRIPTOR.message_types_by_name['MetaMessage'] = _METAMESSAGE +DESCRIPTOR.message_types_by_name['Quit'] = _QUIT + +SystemInit = _reflection.GeneratedProtocolMessageType('SystemInit', (_message.Message,), dict( + DESCRIPTOR = _SYSTEMINIT, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.SystemInit) + )) +_sym_db.RegisterMessage(SystemInit) + +SimulatorConnection = _reflection.GeneratedProtocolMessageType('SimulatorConnection', (_message.Message,), dict( + DESCRIPTOR = _SIMULATORCONNECTION, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.SimulatorConnection) + )) +_sym_db.RegisterMessage(SimulatorConnection) + +SchedulerConnection = _reflection.GeneratedProtocolMessageType('SchedulerConnection', (_message.Message,), dict( + + InitialValuesEntry = _reflection.GeneratedProtocolMessageType('InitialValuesEntry', (_message.Message,), dict( + DESCRIPTOR = _SCHEDULERCONNECTION_INITIALVALUESENTRY, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.SchedulerConnection.InitialValuesEntry) + )) + , + + AttributeLinksEntry = _reflection.GeneratedProtocolMessageType('AttributeLinksEntry', (_message.Message,), dict( + DESCRIPTOR = _SCHEDULERCONNECTION_ATTRIBUTELINKSENTRY, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.SchedulerConnection.AttributeLinksEntry) + )) + , + DESCRIPTOR = _SCHEDULERCONNECTION, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.SchedulerConnection) + )) +_sym_db.RegisterMessage(SchedulerConnection) +_sym_db.RegisterMessage(SchedulerConnection.InitialValuesEntry) +_sym_db.RegisterMessage(SchedulerConnection.AttributeLinksEntry) + +AttributeMessage = _reflection.GeneratedProtocolMessageType('AttributeMessage', (_message.Message,), dict( + DESCRIPTOR = _ATTRIBUTEMESSAGE, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.AttributeMessage) + )) +_sym_db.RegisterMessage(AttributeMessage) + +NextStep = _reflection.GeneratedProtocolMessageType('NextStep', (_message.Message,), dict( + DESCRIPTOR = _NEXTSTEP, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.NextStep) + )) +_sym_db.RegisterMessage(NextStep) + +MetaMessage = _reflection.GeneratedProtocolMessageType('MetaMessage', (_message.Message,), dict( + DESCRIPTOR = _METAMESSAGE, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.MetaMessage) + )) +_sym_db.RegisterMessage(MetaMessage) + +Quit = _reflection.GeneratedProtocolMessageType('Quit', (_message.Message,), dict( + DESCRIPTOR = _QUIT, + __module__ = 'data.default_pb2' + # @@protoc_insertion_point(class_scope:obnl.common.Quit) + )) +_sym_db.RegisterMessage(Quit) + + +_SCHEDULERCONNECTION_INITIALVALUESENTRY.has_options = True +_SCHEDULERCONNECTION_INITIALVALUESENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) +_SCHEDULERCONNECTION_ATTRIBUTELINKSENTRY.has_options = True +_SCHEDULERCONNECTION_ATTRIBUTELINKSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) +# @@protoc_insertion_point(module_scope) diff --git a/obnl/impl/node.py b/obnl/impl/node.py new file mode 100644 index 0000000..cfd3a9c --- /dev/null +++ b/obnl/impl/node.py @@ -0,0 +1,272 @@ +import sys +import pika + +from obnl.impl.message import MetaMessage, AttributeMessage, SimulatorConnection, NextStep, SchedulerConnection, Quit + + +class Node(object): + """ + This is the base class for all Nodes of the system + """ + + SCHEDULER_NAME = 'scheduler' + + LOCAL_NODE_QUEUE = 'obnl.local.node.' + """Base of every local queue (followed by the name of the Node)""" + LOCAL_NODE_EXCHANGE = 'obnl.local.node.' + """Base of every local exchange (followed by the name of the Node)""" + + SIMULATION_NODE_QUEUE = 'obnl.simulation.node.' + """Base of every update queue (followed by the name of the Node)""" + SIMULATION_NODE_EXCHANGE = 'obnl.simulation.node.' + """Base of every update exchange (followed by the name of the Node)""" + + DATA_NODE_QUEUE = 'obnl.data.node.' + """Base of every data queue (followed by the name of the Node)""" + DATA_NODE_EXCHANGE = 'obnl.data.node.' + """Base of every data/attr exchange (followed by the name of the Node)""" + + UPDATE_ROUTING = 'obnl.update.block.' + """Base of every routing key for block messages (followed by the number/position of the block)""" + + def __init__(self, host, name): + """ + The constructor creates the 3 main queues + - general: To receive data with everyone + - update: To receive data for the time management + - data: To receive attribute update + + :param host: the connection to AMQP + :param name: the id of the Node + """ + connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) + self._channel = connection.channel() + self._name = name + + self._simulation_queue = self._channel.queue_declare(queue=Node.SIMULATION_NODE_QUEUE + self._name) + self._simulation_exchange = self._channel.exchange_declare(exchange=Node.SIMULATION_NODE_EXCHANGE + self._name) + + self._channel.basic_consume(self.on_simulation_message, + consumer_tag='obnl_node_' + self._name + '_simulation', + queue=self._simulation_queue.method.queue, + no_ack=True) + + @property + def name(self): + """ + + :return: the name of the Node + """ + return self._name + + def start(self): + """ + Starts listening. + """ + self._channel.start_consuming() + + def on_local_message(self, ch, method, props, body): + """ + Callback when a message come from this node. + """ + raise NotImplementedError('Abstract function call from '+str(self.__class__)) + + def on_simulation_message(self, ch, method, props, body): + """ + Callback when a message come from another Node to inform about simulation. + """ + raise NotImplementedError('Abstract function call from '+str(self.__class__)) + + def on_data_message(self, ch, method, props, body): + """ + Callback when a message come from another Node to inform about data update. + """ + raise NotImplementedError('Abstract function call from '+str(self.__class__)) + + def send(self, exchange, routing, message, reply_to=None): + """ + + :param exchange: the MQTT exchange + :param routing: the MQTT routing key + :param message: the protobuf message + :param reply_to: the routing key to reply to + """ + + mm = MetaMessage() + mm.node_name = self._name + mm.details.Pack(message) + + self._channel.publish(exchange=exchange, + routing_key=routing, + properties=pika.BasicProperties(reply_to=reply_to), + body=mm.SerializeToString()) + + def reply_to(self, reply_to, message): + """ + Replies to a message. + + :param reply_to: the asker + :param message: the message (str) + """ + if reply_to: + m = MetaMessage() + m.node_name = self._name + m.type = MetaMessage.ANSWER + + m.details.Pack(message) + + self._channel.publish(exchange='', routing_key=reply_to, body=m.SerializeToString()) + + def send_simulation(self, routing, message, reply_to=None): + """ + + :param routing: the MQTT routing key + :param message: the protobuf message + :param reply_to: the routing key to reply to + """ + self.send(Node.SIMULATION_NODE_EXCHANGE + self._name, + routing, message, reply_to=reply_to) + + +class ClientNode(Node): + + def __init__(self, host, name, api, input_attributes=None, output_attributes=None, is_first=False): + super(ClientNode, self).__init__(host, name) + + # Local communication + self._local_queue = self._channel.queue_declare(queue=Node.LOCAL_NODE_QUEUE + self._name) + self._local_exchange = self._channel.exchange_declare(exchange=Node.LOCAL_NODE_EXCHANGE + self._name) + + self._channel.basic_consume(self.on_local_message, + consumer_tag='obnl_node_' + self._name + '_local', + queue=self._local_queue.method.queue, + no_ack=True) + self._channel.queue_bind(exchange=Node.LOCAL_NODE_EXCHANGE + self._name, + queue=Node.LOCAL_NODE_QUEUE + self._name) + + # Data communication + self._data_queue = self._channel.queue_declare(queue=Node.DATA_NODE_QUEUE + self._name) + + self._channel.basic_consume(self.on_data_message, + consumer_tag='obnl_node_' + self._name + '_data', + queue=self._data_queue.method.queue, + no_ack=True) + + self._api_node = api + + self._next_step = False + self._reply_to = None + self._is_first = is_first + self._current_time = 0 + self._time_step = 0 + + self._links = {} + self._input_values = {} + self._input_attributes = input_attributes + self._output_attributes = output_attributes + + si = SimulatorConnection() + si.type = SimulatorConnection.OTHER + + self.send_simulation(Node.SIMULATION_NODE_EXCHANGE + Node.SCHEDULER_NAME, + si, reply_to=Node.SIMULATION_NODE_QUEUE + self.name) + + @property + def input_values(self): + return self._input_values + + @property + def input_attributes(self): + return self._input_attributes + + @property + def output_attributes(self): + return self._output_attributes + + def step(self, current_time, time_step): + self._api_node.step(current_time, time_step) + + def update_attribute(self, attr, value): + """ + Sends the new attribute value to those who want to know. + + :param attr: the attribute to communicate + :param value: the new value of the attribute + """ + am = AttributeMessage() + am.simulation_time = self._current_time + am.attribute_name = attr + am.attribute_value = float(value) + + m = MetaMessage() + m.node_name = self._name + m.type = MetaMessage.ATTRIBUTE + m.details.Pack(am) + + if self._output_attributes: + self._channel.publish(exchange=Node.DATA_NODE_EXCHANGE + self._name, + routing_key=Node.DATA_NODE_EXCHANGE + attr, + body=m.SerializeToString()) + + def on_local_message(self, ch, method, props, body): + if self._next_step \ + and (self._is_first + or not self._input_attributes + or len(self._input_values.keys()) == len(self._input_attributes)): + # TODO: call updateX or updateY depending on the meta content + self.step(self._current_time, self._time_step) + self._next_step = False + self._input_values.clear() + nm = NextStep() + nm.current_time = self._current_time + nm.time_step = self._time_step + self.reply_to(self._reply_to, nm) + + def on_simulation_message(self, ch, method, props, body): + mm = MetaMessage() + mm.ParseFromString(body) + + if mm.details.Is(NextStep.DESCRIPTOR) and mm.node_name == Node.SCHEDULER_NAME: + nm = NextStep() + mm.details.Unpack(nm) + self._next_step = True + self._reply_to = props.reply_to + self._current_time = nm.current_time + self._time_step = nm.time_step + self.send_local(mm.details) + elif mm.details.Is(SchedulerConnection.DESCRIPTOR): + sc = SchedulerConnection() + mm.details.Unpack(sc) + self._links = dict(sc.attribute_links) + elif mm.details.Is(Quit.DESCRIPTOR): + sys.exit(0) + + def on_data_message(self, ch, method, props, body): + mm = MetaMessage() + mm.ParseFromString(body) + + if mm.details.Is(AttributeMessage.DESCRIPTOR): + am = AttributeMessage() + mm.details.Unpack(am) + self._input_values[self._links[am.attribute_name]] = am.attribute_value + self.send_local(mm.details) + + def send_local(self, message): + """ + Sends the content to local. + + :param message: a protobuf message + """ + self.send(Node.LOCAL_NODE_EXCHANGE + self._name, + Node.LOCAL_NODE_EXCHANGE + self._name, + message) + + def send_scheduler(self, message): + """ + Sends the content to scheduler. + + :param message: a protobuf message + """ + self.send(Node.SIMULATION_NODE_EXCHANGE + self._name, + Node.SIMULATION_NODE_EXCHANGE + Node.SCHEDULER_NAME, + message) diff --git a/obnl/impl/server.py b/obnl/impl/server.py new file mode 100644 index 0000000..cc38500 --- /dev/null +++ b/obnl/impl/server.py @@ -0,0 +1,177 @@ +import sys +import json + +from obnl.impl.node import Node +from obnl.impl.loaders import JSONLoader +from obnl.impl.message import SimulatorConnection, NextStep, MetaMessage, SchedulerConnection, Quit + + +class Scheduler(Node): + """ + The Scheduler is a Node that manage the time flow. + """ + + def __init__(self, host, config_file, schedule_file): + """ + + :param host: the AMQP host + :param config_file: a file containing time steps + :param schedule_file: a file containing schedule blocks + """ + super(Scheduler, self).__init__(host, Node.SCHEDULER_NAME) + self._current_step = 0 + self._current_block = 0 + + self._connected = set() + self._sent = set() + self._links = {} + + self._channel.exchange_declare(exchange=Node.SIMULATION_NODE_EXCHANGE + self._name) + + self._steps, self._blocks = self._load_data(config_file, schedule_file) + + self._current_time = 0 + + def _load_data(self, config_file, schedule_file): + """ + :param config_file: the file containing the structure + :param schedule_file: the file containing the schedule + """ + + # Currently only JSON can be loaded + with open(schedule_file) as jsonfile: + schedule_data = json.loads(jsonfile.read()) + steps = schedule_data['steps'] + blocks = schedule_data['schedule'] + + # Currently only JSON can be loaded + # Load all the Nodes and creates the associated links + loader = JSONLoader(self, config_file) + # Connects the created Nodes to the update exchanger + # using the schedule definition (blocks) + # TODO: Should it be in Creator or Scheduler ??? + for node in loader.get_nodes(): + i = 0 + for block in blocks: + if node in block: + self.create_simulation_links(node, i) + i += 1 + return steps, blocks + + def start(self): + """ + Starts listening. + """ + self._current_step = 0 + self._current_block = 0 + super(Scheduler, self).start() + + def create_data_link(self, node_out, attr_out, node_in, attr_in): + """ + Creates and connects the attribute communication from Node to Node. + + :param node_out: the Node sender name + :param attr_out: the name of the attribute the Node want to communicate + :param node_in: the Node receiver name + :param attr_in: the name of the attribute from the Node receiver point of view + """ + self._channel.exchange_declare(exchange=Node.DATA_NODE_EXCHANGE + node_out) + self._channel.queue_declare(queue=Node.DATA_NODE_QUEUE + node_in) + + self._channel.queue_bind(exchange=Node.DATA_NODE_EXCHANGE + node_out, + routing_key=Node.DATA_NODE_EXCHANGE + attr_out, + queue=Node.DATA_NODE_QUEUE + node_in) + if node_in not in self._links: + self._links[node_in] = {} + self._links[node_in][attr_out] = attr_in + + def create_simulation_links(self, node, position): + """ + Connects the scheduler exchange to the update queue of the Node + + :param node: the node to be connected to + :param position: the position of the containing block + """ + self._channel.exchange_declare(exchange=Node.SIMULATION_NODE_EXCHANGE + self._name) + self._channel.queue_declare(queue=Node.SIMULATION_NODE_QUEUE + node) + self._channel.queue_bind(exchange=Node.SIMULATION_NODE_EXCHANGE + self._name, + routing_key=Node.UPDATE_ROUTING + str(position), + queue=Node.SIMULATION_NODE_QUEUE + node) + + self._channel.exchange_declare(exchange=Node.SIMULATION_NODE_EXCHANGE + node) + self._channel.queue_declare(queue=Node.SIMULATION_NODE_QUEUE + self._name) + self._channel.queue_bind(exchange=Node.SIMULATION_NODE_EXCHANGE + node, + routing_key=Node.SIMULATION_NODE_EXCHANGE + self._name, + queue=Node.SIMULATION_NODE_QUEUE + self._name) + + def _update_time(self): + """ + Sends new time message to the current block. + """ + ns = NextStep() + ns.time_step = self._steps[self._current_step] + ns.current_time = self._current_time + + self.send_simulation(Node.UPDATE_ROUTING + str(self._current_block), + ns, reply_to=Node.SIMULATION_NODE_QUEUE + self.name) + + def on_local_message(self, ch, method, props, body): + """ + Callback when a message come from this node. Never append with Scheduler + """ + pass + + def on_simulation_message(self, ch, method, props, body): + """ + Callback when a message come from Node. + """ + m = MetaMessage() + m.ParseFromString(body) + + if m.details.Is(SimulatorConnection.DESCRIPTOR): + self._simulator_connection(m, props.reply_to) + if len(self._connected) == sum([len(b) for b in self._blocks]): + self._current_time += self._steps[self._current_step] + self._update_time() + + if m.details.Is(NextStep.DESCRIPTOR): + if m.node_name in self._blocks[self._current_block]: + self._sent.add(m.node_name) + + if len(self._connected) == sum([len(b) for b in self._blocks]): + # block management + if len(self._sent) == len(self._blocks[self._current_block]): + self._current_block = (self._current_block + 1) % len(self._blocks) + if self._current_block == 0: + self._current_step += 1 + if self._current_step >= len(self._steps): + self.broadcast_simulation(Quit()) + sys.exit(0) + else: + self._current_time += self._steps[self._current_step] + self._update_time() + self._sent.clear() + + def _simulator_connection(self, message, reply_to): + node_name = message.node_name + self._connected.add(node_name) + + sc = SchedulerConnection() + if node_name in self._links: + for k, v in self._links[node_name].items(): + sc.attribute_links[k] = v + + self.reply_to(reply_to, sc) + + def on_data_message(self, ch, method, props, body): + """ + Displays message receive from the data queue. + """ + pass + + def broadcast_simulation(self, message, reply_to=None): + + for block_id in range(len(self._blocks)): + self.send_simulation(Node.UPDATE_ROUTING + str(block_id), + message, reply_to=reply_to) + diff --git a/obnl/server.py b/obnl/server.py new file mode 100644 index 0000000..8c0a626 --- /dev/null +++ b/obnl/server.py @@ -0,0 +1,14 @@ +import argparse +from obnl.impl.server import Scheduler + +if __name__ == "__main__": + + parser = argparse.ArgumentParser() + parser.add_argument("host") + parser.add_argument("config_file") + parser.add_argument("schedule_file") + + args = parser.parse_args() + + c = Scheduler(args.host, args.config_file, args.schedule_file) + c.start() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..82531d9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +--index-url https://pypi.python.org/simple/ + +pika>=0.10 +protobuf>=3.0 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..1dd69d7 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,34 @@ +[metadata] +name=obnl + +copyright=2017, %(maintainer)s + +major=0 +minor=2 +micro=5 +version=%(major)s.%(minor)s.%(micro)s.dev1 + +maintainer=The OBNL Team +maintainer_email=gillian.basso@hevs.ch + +url=https://github.com/ppuertocrem/obnl + +summary=An open tool for co-simulation + +description_file=README.md + +licence=Apache License 2.0 + +keywords=co-simulation,AMQP,MQTT + +required=pika,protobuf + +classifiers=Development Status :: 4 - Beta + Environment :: Console + Intended Audience :: Science/Research + Intended Audience :: Developers + License :: OSI Approved :: Apache License 2.0 + Natural Language :: English + Operating System :: OS Independent + Programming Language :: Python :: 3.5 + Topic :: Scientific/Engineering :: Energy Simulation diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..197f5e1 --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup, find_packages +from setuptools.config import read_configuration +import platform + +conf_dict = read_configuration('./setup.cfg') + +setup(name=conf_dict['name'], + maintainer=conf_dict['maintainer'], + maintainer_email=conf_dict['maintainer_email'], + url=conf_dict['url'], + version=conf_dict['release'], + platforms=[platform.platform()], # TODO indicate really tested platforms + + packages=find_packages(), + install_requires=conf_dict['required'], + + # metadata + + description=conf_dict['summary'], + long_description=conf_dict['description_file'], + + license=conf_dict['licence'], + + keywords=conf_dict['keywords'], + + classifiers=conf_dict['classifiers'], + ) diff --git a/test/data/initobnl.json b/test/data/initobnl.json new file mode 100644 index 0000000..289f3f2 --- /dev/null +++ b/test/data/initobnl.json @@ -0,0 +1,48 @@ +{ + "nodes": { + "A": { + "inputs": ["seta"], + "outputs": ["ta"] + }, + "B": { + "inputs": [], + "outputs": ["tb"] + }, + "C": { + "inputs": ["t1", "t2"], + "outputs": ["setc"] + } + }, + "links": { + "l1": { + "out": { + "node":"A", + "attr": "ta" + }, + "in": { + "node":"C", + "attr": "t1" + } + }, + "l2": { + "out": { + "node": "B", + "attr": "tb" + }, + "in": { + "node": "C", + "attr": "t2" + } + }, + "l3": { + "out": { + "node":"C", + "attr": "setc" + }, + "in": { + "node":"A", + "attr": "seta" + } + } + } +} \ No newline at end of file diff --git a/test/data/schedule.json b/test/data/schedule.json new file mode 100644 index 0000000..4ba5319 --- /dev/null +++ b/test/data/schedule.json @@ -0,0 +1,4 @@ +{ + "schedule": [ ["A", "B"], ["C"] ], + "steps": [1, 2, 5, 10, 5] +} \ No newline at end of file diff --git a/test/testreceive.py b/test/testreceive.py new file mode 100644 index 0000000..7c76988 --- /dev/null +++ b/test/testreceive.py @@ -0,0 +1,33 @@ +import random +from obnl.client import ClientNode + + +class ClientTestNode(ClientNode): + + def __init__(self, host, name, input_attributes=None, output_attributes=None, is_first=False): + super(ClientTestNode, self).__init__(host, name, input_attributes, output_attributes, is_first) + + def step(self, current_time, time_step): + print('----- '+self.name+' -----') + print(self.name, time_step) + print(self.name, current_time) + print(self.name, self.input_values) + + for o in self.output_attributes: + rv = random.random() + print(self.name, o, ':', rv) + self.update_attribute(o, rv) + print('=============') + +if __name__ == "__main__": + + a = ClientTestNode('localhost', 'A', output_attributes=['ta'], input_attributes=['seta'], is_first=True) + b = ClientTestNode('localhost', 'B', output_attributes=['tb']) + c = ClientTestNode('localhost', 'C', input_attributes=['t1', 't2'], output_attributes=['setc']) + + print('Start A') + a.start() + print('Start B') + b.start() + print('Start C') + c.start()