-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBopTestProxy.py
291 lines (234 loc) · 11.1 KB
/
BopTestProxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# -*- coding: utf-8 -*-
"""
BOPTEST Proxy Server
This sample application uses BOPTEST framework to create a virtual building
and make it available over BACnet. To run it, follow the install instructions
from https://github.com/ibpsa/project1-boptest and run it with testcase1,
ala
TESTCASE=multizone_office_simple_air docker compose up
Then, in another terminal, run python BopTestProxy.py simple.ttl
Finally, on another machine, use the BACnet client of your choice to
observe the state of the simulation throough BACnet and optionally
overwrite some control point values to influence the simulation
This code is based on the OpenWeatherServer.py sample and remains almost
identical to that sample.
"""
import os
import requests
import time
import json
import rdflib
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, deferred
from bacpypes.task import recurring_function
from bacpypes.basetypes import DateTime
from bacpypes.primitivedata import Real
from bacpypes.object import AnalogValueObject, DateTimeValueObject
from bacpypes.app import BIPSimpleApplication
from bacpypes.service.device import DeviceCommunicationControlServices
from bacpypes.service.object import ReadWritePropertyMultipleServices
from bacpypes.local.device import LocalDeviceObject
from bacpypes.local.object import AnalogValueCmdObject, Commandable, AnalogOutputCmdObject
from bacpypes.object import register_object_type, AnalogInputObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# application interval is refresh time in seconds
APPINTERVAL = 5 * 1000 # 5 seconds
# dictionary of names to objects
objects = {}
inputs = {}
activation_signal = {}
nextState = None
g = None
baseurl = "http://localhost:5000"
boptest_measurements = None
boptest_inputs = None
# TODO - what should some of the BOPTEST objects be - maybe
# AnalogInputs or BinaryInputs?
# Not currently using this class
# TODO - why does BACpyes have a builtin AnalogValueCmdObject but not a builtin AnalogInputCmdObject?
@bacpypes_debugging
@register_object_type(vendor_id=999)
class AnalogInputCmdObject(Commandable(Real), AnalogInputObject):
def _set_value(self, value):
if _debug:
AnalogInputCmdObject._debug("_set_value %r", value)
# numeric values are easy to set
self.presentValue = value
# We are using this class
# TODO - why are we using this class instead of just AnalogValueCmdObject?
# This was how OpenWeatherServer.py did it - was it just so it could log the change?
@bacpypes_debugging
@register_object_type(vendor_id=999)
class LocalAnalogValueObject(AnalogValueCmdObject):
def _set_value(self, value):
if _debug:
LocalAnalogValueObject._debug("_set_value %r", value)
# numeric values are easy to set
self.presentValue = value
klassMapping = {'analog-value': LocalAnalogValueObject, 'analog-input': AnalogInputObject, 'analog-output': AnalogOutputCmdObject}
unitMapping = {'http://qudt.org/vocab/unit/K': "degreesKelvin", 'http://qudt.org/vocab/unit/PPM': "partsPerMillion"}
@bacpypes_debugging
def create_objects(app, configfile):
"""Create the objects that hold the result values."""
if _debug:
create_objects._debug("create_objects %r", app)
global objects, inputs, g, nextState
g= rdflib.Graph()
g.parse(configfile)
points = g.query("select ?point ?name ?bacnetRef ?unit where {?point ref:hasExternalReference ?bo . ?bo bacnet:object-identifier ?bacnetRef . ?bo bacnet:object-name ?name OPTIONAL {?point brick:hasUnit ?unit} }")
for point in points:
rdfBacnetName = point[1]
rdfBacnetRef = point[2]
rdfBacnetUnit = point[3]
if _debug:
create_objects._debug(" - name: %r", point[1])
klassName, instanceNum = rdfBacnetRef.split(",", 2)
klass = klassMapping[klassName]
instanceNum = int(instanceNum)
name = str(rdfBacnetName)
units = None
if rdfBacnetUnit:
units = unitMapping[str(rdfBacnetUnit)]
initialValue = None
if name in nextState['payload']:
initialValue = nextState['payload'][name]
else:
initialValue = 0.0
if klassName == 'analog-input' or klassName == 'analog-value':
obj = klass(objectName = name, objectIdentifier=(klass.objectType, instanceNum), presentValue=initialValue)
else:
obj = klass(objectName = name, objectIdentifier=(klass.objectType, instanceNum), relinquishDefault = initialValue)
if _debug:
create_objects._debug(" - obj: %r", obj)
if units is not None:
obj.units = units
# add it to the application
app.add_object(obj)
# keep track of the object by name
objects[name] = obj
if name in boptest_inputs['payload']:
activation_name = name[:-2] + "_activate"
# TODO: Check to make sure there actually is an activation signal!
activation_signal[name] = activation_name
inputs[name] = obj
@recurring_function(APPINTERVAL)
@bacpypes_debugging
def update_boptest_data():
"""Read the current simulation data from the API and set the object values."""
if _debug:
update_boptest_data._debug("update_boptest_data")
global objects, inputs, baseurl
# ask the web service
# We get results direct from /advance now but you could ask the simulation for historic data
# response = requests.put(
# "http://localhost:5000/results", data={'point_name':'TRooAir_y', 'start_time': timestep * 30, 'final_time': (timestep+1)*30}
#)
signals = {}
for signal_name, signal_activation in activation_signal.items():
signals[signal_activation] = 0.0
# For "commandable" objects, BACnet maintains a priorityArray that can be written to from levels 1-16, which are
# used to replace the 'presentValue' of an object. So, for the points that are 'inputs' in BOPtest, we created those as
# commandable objects, so check to see if there is a higher priority value set for this object that is overwriting
# what we should normally use - and if so, turn on the '_activate' signal for that point as well
#
# (BACpypes automatically turns a write to presentValue into a write to the priority array) - e.g. a client that
# does this from a client:
# python samples/ReadWriteProperty.py
# > write 10.0.2.7 analogValue:63 presentValue 310
#
# BACpypes-based servers will change that into a modification of priorityArray at priority 16
#
# the _activate signals are sticky in BOPtest, so we turn them off by default and only turn them back on when
# there is a signal to overwrite. Because they are sticky we could just leave out any that were previously set to 0
# but by just setting them all to 0, we don't have to worry about tracking any state, at the expense of sending in a
# bunch of parameters to boptest that don't really do anything
#
for k,v in inputs.items():
#print("k: %s %s %s" % (str(k), v._highest_priority_value(), type(v._highest_priority_value()[1])))
signal = v._highest_priority_value()
if signal[1]:
signals[k] = signal[0]
activation_name = activation_signal[k]
signals[activation_name] = 1.0
#print("Advancing with signals: " + str(signals))
response = requests.post(
# "http://localhost:5000/advance", data={"oveAct_u": next_oveAct_u, "oveAct_activate": next_oveAct_activate}
'{0}/advance'.format(baseurl), data=signals
)
if response.status_code != 200:
print("Error response: %r" % (response.status_code,))
return
# turn the response string into a JSON object
json_response = response.json()
# set the object values
# We advance the simulation by 5 seconds at each call to the loop, but we don't update the external world
# with those results until the NEXT call to this function.
#
# TODO: don't ACK BACnet writes until we get to here - instead, buffer the write request and send the ACKs later
# don't worry about concurrency, last-writer-wins is fine, but be sure to send multiple acks, one to each writer
#
# TODO: rather than using recurring_function, we should schedule ourselves to be called again at (5secs-elapsed_function_time)
# because if say the call to /advance takes 3 seconds, we want to get called again in 2 seconds, not in 5 seconds.
global nextState
if nextState:
for k, v in nextState['payload'].items():
if _debug:
update_boptest_data._debug(" - k, v: %r, %r", k, v)
if k in objects:
#objects[k]._set_value(v)
objects[k].presentValue = v
nextState = json_response
# BAC0 uses the ReadPropertyMultiple service so make that available
@bacpypes_debugging
class ReadPropertyMultipleApplication(
BIPSimpleApplication,
ReadWritePropertyMultipleServices,
DeviceCommunicationControlServices,
):
pass
@bacpypes_debugging
def main():
global vendor_id, g, baseurl
parser = ConfigArgumentParser(description=__doc__)
parser.add_argument('brick_model', type=str, help='Brick model that defines the site, a ttl file')
parser.add_argument('start_time', type=int, default=0, help="timestamp (in seconds) at which to start the simulation")
parser.add_argument('warmup_period', type=int, default=0, help="timestamp (in seconds) at which to start the simulation")
parser.add_argument('--baseurl', dest='baseurl', type=str, default='http://localhost:5000', help="URL for BOPTest endpoint")
# parse the command line arguments
args = parser.parse_args()
baseurl = args.baseurl
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# TODO: check the results to make sure we acutally get an OK!
#
global nextState
res = requests.put('{0}/initialize'.format(baseurl), data={'start_time':args.start_time, 'warmup_period':args.warmup_period} ).json()
nextState = res
global boptest_measurements, boptest_inputs
boptest_measurements = requests.get(baseurl + "/measurements").json()
boptest_inputs = requests.get(baseurl+"/inputs").json()
# We advance the simulation by 5 seconds at each call to /advance, and APPINTERVAL is also 5 seconds, so the simulationo
# moves in sync with wallclock time. To see things happen faster, set this time greater than 5 seconds
res = requests.put('{0}/step'.format(baseurl), data={'step':5})
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug:
_log.debug(" - this_device: %r", this_device)
# make a sample application
this_application = ReadPropertyMultipleApplication(this_device, args.ini.address)
# create the objects and add them to the application
create_objects(this_application, args.brick_model)
# run this update when the stack is ready
deferred(update_boptest_data)
if _debug:
_log.debug("running")
run()
if _debug:
_log.debug("fini")
if __name__ == "__main__":
main()