Skip to content

Commit

Permalink
timed-ble-beacon: add options to set min/max beacon intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
mk-fg committed Jan 18, 2025
1 parent 3e63d9c commit c345010
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
28 changes: 19 additions & 9 deletions timed-ble-beacon
Original file line number Diff line number Diff line change
Expand Up @@ -103,29 +103,31 @@ class BLEAdMsg(dbus.service.Object): # see man org.bluez.LEAdvertisement
m = ( lambda si='',so='',iface='org.freedesktop.DBus.Properties':
dbus.service.method(iface, in_signature=si, out_signature=so) )

def __init__(self, bus, mid, secret, td, n=None):
n, td = n or int(time.time() - 1_735_689_600), round(td / 100)
def __init__(self, bus, mid, secret, td, bms_a, bms_b):
n, td = int(time.time() - 1_735_689_600), round(td / 100)
msg = struct.pack('<HL', td, n)
mac = hmac.HMAC(secret, msg, digestmod=hl.sha256).digest()[:20]
ad_name = base64.urlsafe_b64encode(mac).decode()
ad_name = ad_name.translate(dict(zip(b'_-=', 'qQX'))) # dbus name limits
self.bus, self.path = bus, f'/org/bluez/example/advertisement/{ad_name}'
p(f'-- new BLE ad [ {self.path} ]: ( {mid:,d}, {td:,d} {n:,d} {secret} )')
self.data = dbus.Dictionary( # uint16 key, <=27B value
{mid: dbus.Array(msg + mac, signature='y')}, signature='qv' )
self.broadcast = dict(
MinInterval=dbus.UInt32(bms_a), MaxInterval=dbus.UInt32(bms_a),
ManufacturerData=dbus.Dictionary( # uint16 key, <=27B value
{mid: dbus.Array(msg + mac, signature='y')}, signature='qv' ) )
super().__init__(self.bus, self.path)

@m('s', 'a{sv}')
def GetAll(self, iface):
try:
if iface != self.iface: raise BLErrInvalidArgs
return dict(Type='broadcast', ManufacturerData=self.data)
return dict(Type='broadcast', **self.broadcast)
except: tb.print_exc(file=sys.stderr); sys.stderr.flush()
Get = m('ss', 'v')(lambda s, iface, k: s.GetAll(iface)[k])
Release = m(iface=iface)(lambda s: None)


def run_ad(mid, secret, td_updates, td_total):
def run_ad(mid, secret, td_updates, td_total, td_beacon):
ts_end = time.monotonic() + td_total
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
Expand All @@ -147,16 +149,16 @@ def run_ad(mid, secret, td_updates, td_total):
except: pass
if close: return
if not (td := max(0, ts_end - time.monotonic())): return loop.quit()
ad = BLEAdMsg(bus, mid, secret, td)
ad = BLEAdMsg(bus, mid, secret, td, *td_beacon)
ad_mgr.RegisterAdvertisement( ad.path, {},
reply_handler=lambda: None, error_handler=lambda err:
(p_err(f'BLE ad setup/update failed - {err}' ), loop.quit()) )
return True # for glib to run this again after interval
except: tb.print_exc(file=sys.stderr); sys.stderr.flush(); loop.quit()

ts_start = dt.datetime.now().isoformat(sep=' ', timespec='seconds')
p( f'loop: Setup start=[ {ts_start} ]'
f' duration=[ {ts_repr(td_total)} ] update-interval=[ {ts_repr(td_updates)} ]' )
p( f'loop: Setup start=[ {ts_start} ] duration=[ {ts_repr(td_total)} ]'
f' update-interval=[ {ts_repr(td_updates)} ] beacon_ms={td_beacon}' )
loop = GLib.MainLoop()
for sig in signal.SIGTERM, signal.SIGINT:
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, sig, loop.quit)
Expand Down Expand Up @@ -200,6 +202,10 @@ def main(argv=None):
Interval between updating BLE broadcast data. Default: %(default)s
Same exact broadcast is repeated within these intervals.
Should be specified as a relative short time (e.g. 1m45s) or [[HH:]MM:]SS.'''))
parser.add_argument('-b', '--beacon-interval',
metavar='ms[-ms]', default='60-120', help=dd('''
Range for an interval between sending beacons
when those are active, in milliseconds. Default: %(default)s'''))

parser.add_argument('--mid', metavar='uint16', type=int, default=61_634, help=dd('''
Manufacturer "Company Identifier" field in 0-65535 range. Default: %(default)s
Expand All @@ -223,7 +229,11 @@ def main(argv=None):

if not opts.debug: global p; p = lambda *a,**kw: None
if not opts.time_span: parser.error('-t/--time-span option must be specified')

bms_a, s, bms_b = map(str.strip, opts.beacon_interval.partition('-'))
bms_a, bms_b = int(bms_a or bms_b), int(bms_b or bms_a)
run_ad( opts.mid, _read_file(opts.secret).encode(),
td_beacon=(bms_a, bms_b),
td_total=ts_parse(opts.time_span).total_seconds(),
td_updates=ts_parse(opts.update_interval, relative_only=True).total_seconds() )

Expand Down
32 changes: 17 additions & 15 deletions timed-ble-beacon-mpy-led
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ import asyncio, machine, bluetooth, time, struct, random
class Conf:
verbose = False
ble_mid = 61_634 # uint16 manufacturer id to match in broadcasts
ble_secret = b'timed-beacon-test'
ble_secret = b'timed-beacon-test' # HMAC key

scan_duration_ms = 5_000
# 5s ~ 55x 30ms 30%dc listen-intervals
scan_interval_us = 60_000
scan_window_us = 30_000

# "clear" is the normal state, "beacon" is a state when those are detected
clear_sample_interval = 20.0
clear_sample_time = 4.0
beacon_sample_interval = 5.0 * 60 # won't get lower than this in a "beacon" state
# "clear" is the normal state, "beacon" is a state when valid beacons are detected
clear_sample_time = 4.0 # seconds to run BLE scan for, with duty cycle scan_* opts
clear_sample_interval = 20.0 # sleep delay between scans
beacon_sample_time = 3.0
beacon_sample_interval = 5.0 * 60 # won't get lower than this in a "beacon" state
beacon_repeat_grace = 6.0 * 60 # ok to repeat counter within this td
beacon_span_checks = 6 # do N checks within timespan sent in the beacon
beacon_span_checks = 6 # wakeup for N checks within timespan sent in the beacon

# led_time values are scanned with random chance, interval picked from min-max
_tdt = cs.namedtuple('LEDtdt', 'chance time_min time_max')
led_pin = 'LED'
led_time_on = [_tdt(1, 0.020, 1.0)]
Expand All @@ -30,16 +31,18 @@ class Conf:

class BLE_MSE_Scanner:

def __init__(self, duration_ms, interval_us, window_us):
self.ble, self.scan_args = bluetooth.BLE(), (duration_ms, interval_us, window_us)
self.ble.irq(self._irq_handler)
def __init__(self, interval_us, window_us):
self.scan_td, self.scan_dc = 5_000, (interval_us, window_us)
self.scan, self.queue, self.check = False, list(), asyncio.ThreadSafeFlag()
self.ble = bluetooth.BLE()
self.ble.irq(self._irq_handler)

def duration(self, duration_ms):
self.scan_td = duration_ms; return self
def __enter__(self):
if self.scan: raise RuntimeError('Concurrent scans not allowed')
self.scan = True; self.queue.clear(); self.check.clear()
self.ble.active(True); self.ble.gap_scan(*self.scan_args)

self.ble.active(True); self.ble.gap_scan(self.scan_td, *self.scan_dc)
def __exit__(self, *err):
self.scan = False; self.check.set(); self.ble.active(False)

Expand Down Expand Up @@ -113,8 +116,7 @@ async def run_ble(conf, leds):
(conf.clear_sample_interval, conf.clear_sample_time) ) )))
beacon_grace_td = round(conf.beacon_repeat_grace * 1000)
p_log = conf.verbose and (lambda *a: print('[main]', *a))
ble_scan = BLE_MSE_Scanner(
conf.scan_duration_ms, conf.scan_interval_us, conf.scan_window_us )
ble_scan = BLE_MSE_Scanner(conf.scan_interval_us, conf.scan_window_us)

p_log and p_log('Starting main loop...')
counter, st = 0, st_clear; td_sleep, td_sample = timings[st]
Expand Down Expand Up @@ -144,7 +146,7 @@ async def run_ble(conf, leds):
p_log and p_log(f'Scanning for broadcasts ({td_sample:,d}ms)...')
beacon = False

with ble_scan: # scan until first hmac-match
with ble_scan.duration(td_sample): # scan until first hmac-match
while data := await ble_scan.get_data():
if len(data) != 28 or int.from_bytes(data[:2], 'little') != conf.ble_mid: continue
data, hmac = data[2:8], data[8:]
Expand Down

0 comments on commit c345010

Please sign in to comment.