-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.bro
309 lines (261 loc) · 11.1 KB
/
main.bro
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
#! This script defines the core components of Netbase.
@load base/utils/directions-and-hosts
# @load user_custom/flow_labels # Add me later...
# Define module namespace
module Netbase;
# Declare exports
export {
## The netbase log stream identifier.
redef enum Log::ID += { LOG };
## Add PCR to the conn log (Conn::Info record)
redef record Conn::Info += {
pcr: double &log &optional;
};
## The record type that defines the fields in the log stream as well
## the observables that are being tracked for each monitored host.
type observation: record {
address: addr &log &optional;
starttime: time &log &optional;
endtime: time &log &optional;
} &redef;
## Amount of time observations are tracked before being written to the log
const obs_interval: interval = 5 mins &redef;
## Function called when an entry expires from the observations table.
## It converts unique lists to counts, prepares the entry for logging
## and sends it on to the logging framework.
global close_obs: function(data: table[addr] of observation, idx: addr): interval;
## Table for housing running observations for monitored IP addresses. The observations
## table is distributed across proxy nodes in the cluster using the data partitioing API.
## Arbitrary time buckets are created using the &create_expire attribute, when a key
## expires from the table the close_obs function is executed to prepare the record for logging.
global observations: table[addr] of observation = table() &create_expire=obs_interval &expire_func=close_obs; # <-- CHANGE ME
## Event executed when preparing an observation for logging.
global log_observation: event(p: Netbase::observation);
## Event executed when an observation is written to the log, e.g.
## by calling the Log::write function.
global write_obs_log: event(p: Netbase::observation);
## Type for sharing observed attributes and behaviors (observables)
## with the aggregation nodes, typcially proxies in a cluster.
type observable: record {
## The name of the observable. This is a unique descriptor that
## should match any fields added to the observations record. The
## observable name is what is used to bind the script logic that makes
## the observation with the name of the behavior or attribute that is
## being tracked by netbase. Everything is in the name, so take care
## to make sure names are concise, easy to understand and follow a
## consistent convention that makes ingest and analysis easier downstream.
name: string;
## The optional value of the observable. Optional because not every
## observable is meant to track unique instances of a data point as it relates
## to monitored IP addresses. Use the val field to supply unique instances of
## a thing along with the name of the observable. Values should be cast as a
## string type for transport and can be later converted back to their original
## type as needed.
val: string &optional;
};
## Record for calculating and storing number stats
type numstats: record {
cnt: count &default=0;
min: double &optional;
max: double &optional;
sum: double &optional;
avg: double &optional;
};
## Function for updating an numstats record based on the provided value
global update_numstats: function(rec: numstats, val: double): numstats;
## Function for publishing observables to the proxy pool. Used in Netbase modules
## to ensure consistent hanlding of observables.
global SEND: function(ip: addr, obs: set[observable]);
## Reusable table type for temporary storage of observables within event handlers.
## The observables table is typically instantiaded as a local variable, then used
## to accumulate all observables for monitored IP's, for a given event. Once the
## event handler completes, the keys in the table (IP's) and values (set of observables)
## are passed to the SEND function for publishing to the proxy pool.
##
## The intent is to minimize overlapping/redundant transmissions of observables
## related to a given connection, reducing strain on the Broker comms
## and processing load on proxies.
type observables: table[addr] of set[observable];
## Hook used by netbase modules to customize fields in the observation entry after
## it has been added to the observations table. This hook is not frequently needed
## as observation record fields can be declared with a default initialization value
## (e.g. an empty set).
global customize_obs: hook(ip: addr, observations: table[addr] of observation);
## Event used by publish_hrw to invoke handling on the receiving node.
global add_observables: event(ip: addr, pkg: set[observable]);
## Patern describing IP addresses that should never be monitored; e.g. broadcast
## and multicast addresses that might fall within monitored networks.
# const excluded_hosts: pattern = /^255\.|\.255$|^2[23][0-9]\.|/;
## List of subnets that contain critical assets. Hosts or subnets defined in the list
## will be monitored in addition to those that match the monitoring mode.
## Values are of type subnet but individual IP addresses can be
## defined using a /32 mask: ex. 192.168.10.1/32
global critical_assets: set[subnet] &default=set() &redef;
## Enum that defines the available monitoring modes. Observations will be made and
## logged for IP's that match this type.
type mode: enum {
## Make observations for any IP within a non-routable RFC 1918 address range.
PRIVATE_NETS,
## Make observations for any IP within a Site::local_nets subnet.
LOCAL_NETS,
## Make observations for any IP within a Site:local_nets or Site::local_neighbors subnets.
LOCAL_AND_NEIGHBORS
};
## The monitoring mode Netbase is using to make and log observations. Refer to Netbase::mode
## for more information.
const monitoring_mode: Netbase::mode = LOCAL_NETS &redef;
## Function to determine if observations should be made for the given IP address.
global is_monitored: function(ip: addr): bool;
## Function to calculate PCR using the provided counts which are presumably
## originator bytes (o) and responder bytes (r)
global calc_pcr: function(o: count, r: count): double;
}
function SEND(ip: addr, obs: set[observable])
{
Cluster::publish_hrw(Cluster::proxy_pool, ip, add_observables, ip, obs);
event Netbase::add_observables(ip, obs);
}
# Event called when its time to write an observation to the log stream.
# Any handlers that need to modify the record before it is logged, should set a higher
# priority level
event log_observation(obs: observation) &priority=-10
{
# Write the IP observation log entry. Only proxies do this in a cluster
@if ( ! Cluster::is_enabled() || Cluster::local_node_type() == Cluster::PROXY )
Log::write(Netbase::LOG, obs);
@endif
}
## Function to determine if observations should be made for a given IP
function is_monitored(ip: addr): bool
{
if (/^255\.|\.255$/ in cat(ip) )
return F;
switch monitoring_mode {
case PRIVATE_NETS:
if ( ip in Site::private_address_space )
return T;
break;
case LOCAL_NETS:
if ( ip in Site::local_nets)
return T;
break;
case LOCAL_AND_NEIGHBORS:
if ( ip in Site::local_nets || ip in Site::neighbor_nets )
return T;
break;
}
# Now check if its a critical asset
if ( ip in critical_assets )
return T;
return F;
}
# Update stats for a given number value
function update_numstats(rec: numstats, value: double): numstats
{
# increment the sample count
rec$cnt += 1;
if ( rec?$sum )
rec$sum += value;
else
rec$sum = value;
# update the running average
rec$avg = rec$sum / rec$cnt;
# check if new min
if ( rec?$min )
{
if (value < rec$min)
{
rec$min = value;
}
}
else
rec$min = value;
# or if new max
if ( rec?$max )
{
if (value > rec$max)
{
rec$max = value;
}
}
else
rec$max = value;
return rec;
}
# Function to handle expiring observations
function close_obs(data: table[addr] of observation, idx: addr): interval
{
# Set the endtime
data[idx]$endtime = network_time();
# Event for handling by other scripts to update fields
# before the observation is logged
event Netbase::log_observation(data[idx]);
# Expire the entry now
return 0 secs;
}
# Function to calculate PCR given two numbers (presumably byte counts)
# we're not doing any safety checks so caller must ensure valid
# args are supplied.
function calc_pcr(o: count, r: count): double
{
local n = (o + 0.0) - (r + 0.0);
local d = (o + 0.0) + (r + 0.0);
return ( n / d );
}
# Add pcr to all connections where data was exchanged
event connection_state_remove (c: connection) &priority=3
{
if ( ! c$orig?$size || ! c$resp?$size ) {
return;
}
else if (c$orig$size == 0 && c$resp$size == 0 ) {
return;
}
else {
c$conn$pcr = calc_pcr(c$orig$size, c$resp$size);
}
}
# Drop local suppression cache on workers to force HRW key repartitioning.
# Taking the lead from known_hosts here...
@if ( ! Cluster::is_enabled() || Cluster::local_node_type() == Cluster::WORKER )
event Cluster::node_up(name: string, id: string)
{
Netbase::observations = table();
}
@endif
# Drop local suppression cache on workers to force HRW key repartitioning.
# Taking the lead from known_hosts here again...
@if ( ! Cluster::is_enabled() || Cluster::local_node_type() == Cluster::WORKER )
event Cluster::node_down(name: string, id: string)
{
Netbase::observations = table();
}
@endif
# Function to start observing for the provided IP.
function start_obs(ip: addr)
{
if ( Netbase::is_monitored(ip) )
{
observations[ip] = [$address=ip,$starttime=network_time()];
# Hook for allowing other scripts to modify the observation
# table. Mainly so scripts can initialize any
# set fields they are using
hook Netbase::customize_obs(ip, observations);
}
}
# High priority handler to ensure the IP observation record exists in the table before
# any new values are stored.
@if ( ! Cluster::is_enabled() || Cluster::local_node_type() == Cluster::PROXY )
event Netbase::add_observables(ip: addr, obs: set[observable]) &priority=100
{
if ( ip !in observations )
{
Netbase::start_obs(ip);
}
}
@endif
# Create the log stream
event bro_init()
{
Log::create_stream(Netbase::LOG, [$columns=observation, $ev=Netbase::write_obs_log, $path="netbase"]);
}