-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
308 lines (253 loc) · 11 KB
/
Copy pathmain.py
File metadata and controls
308 lines (253 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import gc
import sys
import esp32
import aioble
gc.collect()
import asyncio
import network
gc.collect()
from time import time
from binascii import hexlify
from micropython import const
gc.collect()
from machine import HARD_RESET, PWRON_RESET, SOFT_RESET, reset, reset_cause
gc.collect()
from uthingsboard_as.gateway import TBGatewayClient
gc.collect()
from config import config, gwcfg, BEACON_REGISTRY
__version__ = "0.1.0"
# Keep a copy of default config values for reset on delete
_GWCFG_DEFAULT = dict(gwcfg)
# BLE scan duration set to 0 for continuous scanning with no timeout, as we will manage
# timeouts ourselves based on beacon-specific scan intervals and last seen timestamps.
_BLE_SCAN_DURATION = const(0)
# Multiplier for scan interval to determine beacon timeout. Timedout beacons will be
# removed an disconnected. Setting this to 3 allows for some missed scans without
# immediately disconnecting a beacon.
_BEACON_TIMEOUT_MULTIPLIER = const(3)
# Interval in seconds to check for beacon timeouts. This determines how quickly we will
# detect and remove inactive beacons.
_BEACON_TIMEOUT_INTERVAL = const(30)
class Gateway:
"""BLE to ThingsBoard Gateway implementation.
Scans for BLE beacons and forwards their data to ThingsBoard IoT platform.
Supports iBeacon, AltBeacon, RuuviTag, MikroTik and Eddystone UID beacons.
"""
# Shared attributes that can be configured via ThingsBoard
shared_keys = {
"bleScanFilterIBeaconUuid",
"bleScanFilterIBeaconMajor",
"bleScanFilterIBeaconMinor",
"bleScanFilterAltBeaconUuid",
"bleScanFilterAltBeaconMajor",
"bleScanFilterAltBeaconMinor",
"bleScanFilterEddystoneUidNamespace",
"bleScanFilterEddystoneUidInstance",
}
def __init__(self):
# Initialize ThingsBoard client and register callbacks
self.client = TBGatewayClient(config)
self.client._device_on_server_side_rpc_response = self.handle_rpc_response
self.client._device_on_server_side_attr_response = self.handle_attr_response
self.is_connected = False
# Database to track beacon scan intervals and timeouts
self._db = {}
self._timeout = {}
# Network interface for gateway identification
self._sta_if = network.WLAN(network.STA_IF)
self._mac = hexlify(self._sta_if.config("mac")).upper().decode()
def update_db(self, mac, interval):
"""Track when to next scan a beacon based on its scan interval."""
_time = time()
# Update last seen time for the device
self._db[mac] = _time + interval
# Set timeout to X times the scan interval to allow for some missed scans without immediately disconnecting
self._timeout[mac] = _time + (interval * _BEACON_TIMEOUT_MULTIPLIER)
async def handle_attr_response(self, req_id, msg_type, payload):
"""Process attribute updates from ThingsBoard."""
if msg_type in ["shared", "update"]:
# If scope is shared attr response or attr changed
for key, value in payload.items():
if key.endswith("Active"):
gwcfg[key] = value
# Only reload on shared attribute response, not on individual attribute change
if req_id is None:
self.load_beacon_handlers()
elif key in _GWCFG_DEFAULT:
gwcfg[key] = value
elif key.startswith("bleScanFilter"):
for handler in self.handlers + self.service_handlers:
if handler.handles_filter_key(key):
handler.set_filter(key, value)
break
elif msg_type == "deleted":
# Reset deleted keys to default values from config.py
for key in payload:
if key in _GWCFG_DEFAULT:
gwcfg[key] = _GWCFG_DEFAULT[key]
elif key.startswith("bleScanFilter"):
for handler in self.handlers + self.service_handlers:
if handler.handles_filter_key(key):
handler.remove_filter(key)
break
async def handle_rpc_response(self, requ_id, method, params):
"""Process RPC commands from ThingsBoard."""
if method == "gateway_device_deleted":
self._db.pop(params, None)
self._timeout.pop(params, None)
self.client.devices.discard(params)
elif method == "gateway_device_offline":
self._db.pop(params, None)
self._timeout.pop(params, None)
self.client.devices.discard(params)
elif method == "command":
if params == "restart":
reset() # Hard reset
def publish_gateway_attributes(self):
"""Publish gateway attributes to ThingsBoard for device identification."""
attributes = {
"ipAddress": self._sta_if.ifconfig()[0],
"macAddress": self._mac,
"firmwareVersion": __version__,
"ramTotal": gc.mem_free() + gc.mem_alloc(),
}
self.client.send_attributes(attributes)
async def publish_gateway_telemetry(self):
"""Periodically publish gateway telemetry data to ThingsBoard."""
while True:
if self.client.isconnected:
try:
mvu_temp_c = (esp32.raw_temperature() - 32) * 5 / 9
telemetry = {
"rssi": self._sta_if.status("rssi"),
"ramFree": gc.mem_free(),
"ramAlloc": gc.mem_alloc(),
"devicesConnected": len(self.client.devices),
"internalMcuTemperature": round(mvu_temp_c, 2),
"uptime": time(),
}
self.client.send_telemetry(telemetry)
except Exception as err:
print(err)
await asyncio.sleep(gwcfg["gwStatusInterval"])
def load_beacon_handlers(self):
"""Import and instantiate only enabled beacon handlers."""
self.handlers = []
self.service_handlers = [] # Eddystone-style (service UUID based)
shared_keys = set()
for active_key, (mod_path, cls_name) in BEACON_REGISTRY.items():
if gwcfg[active_key]:
mod = __import__(mod_path, None, None, (cls_name,))
cls = getattr(mod, cls_name)
handler = cls(gwcfg, self.client, self._mac)
shared_keys.update(cls.shared_filter_keys)
if getattr(handler, '_is_service_beacon', False):
self.service_handlers.append(handler)
else:
self.handlers.append(handler)
else:
# Free module if it was previously loaded
if mod_path in sys.modules:
del sys.modules[mod_path]
gc.collect()
self.shared_keys = shared_keys
def process_beacon(self, mac, result):
"""Process a single BLE beacon result and forward to ThingsBoard if it matches any handler."""
adv_data = result.adv_data
rssi = result.rssi
try:
# Check service-based beacons first (Eddystone)
for handler in self.service_handlers:
if handler.match(0, b"", result):
handler.handle(mac, adv_data, rssi, self.update_db)
return
# Check manufacturer-data beacons
for mfg_id, mfg_data in result.manufacturer():
for handler in self.handlers:
if handler.match(mfg_id, mfg_data, result):
handler.handle(mac, mfg_data, rssi, self.update_db)
break
except ValueError:
pass # Ignore parsing errors and continue scanning
async def scan(self):
"""Continuous BLE scan for beacons."""
db = self._db
client = self.client
process_beacon = self.process_beacon
_hexlify = hexlify
_time = time
# Configure BLE scan parameters
async with aioble.scan(
_BLE_SCAN_DURATION,
interval_us=gwcfg["bleScanInterval"],
window_us=gwcfg["bleScanWindow"],
) as scanner:
async for result in scanner:
if client.isconnected:
mac = _hexlify(result.device.addr).upper().decode()
# Skip if beacon was recently processed
if mac in db and db[mac] >= _time():
continue
process_beacon(mac, result)
else:
return # Stop scanning if disconnected
async def check_timeouts(self):
"""Periodically check for and remove inactive devices."""
while True:
# Check every 10 seconds
await asyncio.sleep(_BEACON_TIMEOUT_INTERVAL)
_time = time()
# Create a list to avoid modifying dict during iteration
macs_to_remove = []
for mac, timeout in self._timeout.items():
if _time > timeout:
macs_to_remove.append(mac)
for mac in macs_to_remove:
self.client.dprint("Device %s timed out.", mac)
self.client.gw_disconnect_device(mac)
self._db.pop(mac, None)
self._timeout.pop(mac, None)
async def main(self):
"""Main gateway loop."""
# Connect to ThingsBoard
while self.is_connected is False:
try:
await self.client.connect()
self.is_connected = True
except OSError:
self.client.close()
print("Retry in 10 seconds.")
await asyncio.sleep(10)
# Initialize gateway
self.publish_gateway_attributes()
# Get initial configuration from ThingsBoard
shared_attributes = set(self.shared_keys)
for key in _GWCFG_DEFAULT:
shared_attributes.add(key)
self.client.request_attributes(shared_keys=shared_attributes)
await asyncio.sleep(2) # Give some time to get attributes response
# Start gateway telemetry task
asyncio.create_task(self.publish_gateway_telemetry())
# Start timeout check task
asyncio.create_task(self.check_timeouts())
# Main scan loop
while True:
if self.client.isconnected:
self.client.dprint("Load active beacon handlers.")
self.load_beacon_handlers()
self.client.dprint("Start BLE scan.")
await self.scan()
self.client.dprint("BLE scan stopped.")
await asyncio.sleep(1)
def close(self):
"""Clean shutdown of BLE and MQTT connections."""
aioble.stop()
self.client.gw_disconnect_all()
self.client.close()
if __name__ == "__main__":
gw = Gateway()
try:
asyncio.run(gw.main())
finally:
gw.close()
_ = asyncio.new_event_loop()