-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathblah2_config.py
More file actions
167 lines (135 loc) · 5.68 KB
/
Copy pathblah2_config.py
File metadata and controls
167 lines (135 loc) · 5.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/env python3
"""
blah2_config.py — fetch (or fall back to hardcoded) rx/tx/fc for
bistatic geolocation.
blah2 exposes its own `/api/config` on port 3000 (default) or the URL
set by BLAH2_API. The Pi binds it to localhost only, so a script
running on a laptop typically can't reach it; this module retries
once and falls back to well-known defaults.
Hardcoded defaults reflect the demo setup:
- rx = Shack15 (Nemesis receiver location)
- tx = Sutro Tower (DTV channel ~36, ~605 MHz, 1 MW ERP, ~320m AGL)
These are public knowledge — Sutro is on city maps, Shack15 is a
San Francisco event venue at a published address.
"""
from __future__ import annotations
import json
import logging
import urllib.request
from typing import Optional
logger = logging.getLogger("blah2_config")
# Receiver: Shack15, San Francisco
DEFAULT_RX = {"latitude": 37.79504, "longitude": -122.39349, "altitude": 30.0}
# Transmitter: Sutro Tower, San Francisco. Public coordinates.
# Channel 36 DTV at ~605 MHz, 1 MW ERP; antenna height ~320 m AHAA.
DEFAULT_TX = {"latitude": 37.75530, "longitude": -122.45270, "altitude": 320.0}
DEFAULT_FC_MHZ = 605.0
def _normalize(rx: dict, tx: dict, fc_mhz: float, source: str) -> dict:
"""Coerce to a stable shape regardless of upstream key spelling."""
def coerce(d, default):
return {
"latitude": float(d.get("latitude", default["latitude"])),
"longitude": float(d.get("longitude", default["longitude"])),
"altitude": float(d.get("altitude", default["altitude"])),
}
return {
"rx": coerce(rx, DEFAULT_RX),
"tx": coerce(tx, DEFAULT_TX),
"fc_mhz": float(fc_mhz),
"source": source,
}
def fetch_pi_gps(
pi_gps_url: str,
timeout_s: float = 3.0,
) -> Optional[dict]:
"""GET the Pi's GPS endpoint; return rx-shaped dict or None.
Some hardware deployments (e.g. nemesis Pi 5) expose live GPS at a
sidecar service like :8081/api/gps. Used as a second-tier fallback
when blah2's /api/config is unreachable but we still want the
receiver position to track wherever the Pi is physically deployed.
"""
try:
with urllib.request.urlopen(pi_gps_url, timeout=timeout_s) as resp:
g = json.load(resp)
if not g.get("has_lock"):
logger.info("pi gps reachable but no fix yet (sat=%s)", g.get("satellites"))
return None
return {
"latitude": float(g["latitude"]),
"longitude": float(g["longitude"]),
"altitude": float(g.get("altitude", 0.0)),
}
except Exception as e:
logger.info("pi gps fetch failed (%s)", e)
return None
def fetch_blah2_config(
blah2_url: str = "http://10.1.62.202:3000",
timeout_s: float = 3.0,
pi_gps_url: Optional[str] = None,
) -> dict:
"""Try to GET {blah2_url}/api/config; if unreachable, optionally
enrich rx from pi_gps_url; otherwise fall back to defaults.
Source ordering (best → worst):
1. blah2 native /api/config — gives rx + tx + fc together
2. pi GPS sidecar (rx only; tx + fc inherit defaults)
3. hardcoded Shack15 + Sutro
Returns dict with shape:
{
"rx": {"latitude": ..., "longitude": ..., "altitude": ...},
"tx": {"latitude": ..., "longitude": ..., "altitude": ...},
"fc_mhz": float,
"source": "live" | "pi-gps" | "fallback",
}
"""
try:
url = f"{blah2_url.rstrip('/')}/api/config"
with urllib.request.urlopen(url, timeout=timeout_s) as resp:
cfg = json.load(resp)
loc = cfg.get("location", {}) or {}
cap = cfg.get("capture", {}) or {}
rx = loc.get("rx", DEFAULT_RX)
tx = loc.get("tx", DEFAULT_TX)
fc_hz = cap.get("fc")
fc_mhz = (float(fc_hz) / 1e6) if fc_hz else DEFAULT_FC_MHZ
result = _normalize(rx, tx, fc_mhz, "live")
logger.info(
"blah2 config from %s: rx=%s, tx=%s, fc=%.1f MHz",
url,
(result["rx"]["latitude"], result["rx"]["longitude"]),
(result["tx"]["latitude"], result["tx"]["longitude"]),
result["fc_mhz"],
)
return result
except Exception as e:
logger.info("blah2 config fetch failed (%s)", e)
if pi_gps_url:
rx = fetch_pi_gps(pi_gps_url, timeout_s=timeout_s)
if rx:
result = _normalize(rx, DEFAULT_TX, DEFAULT_FC_MHZ, "pi-gps")
logger.info(
"rx from pi GPS: (%.5f, %.5f, %.0f m); tx=Sutro fallback, fc=%.1f MHz",
result["rx"]["latitude"], result["rx"]["longitude"],
result["rx"]["altitude"], result["fc_mhz"],
)
return result
logger.info("using hardcoded fallback (Shack15 + Sutro Tower DTV ch36)")
return _normalize(DEFAULT_RX, DEFAULT_TX, DEFAULT_FC_MHZ, "fallback")
def doppler_max_for_fc(fc_mhz: float) -> float:
"""Reasonable max-Doppler envelope for an illuminator at fc_mhz.
For aircraft at ~150 m/s, Doppler ≈ 2 * 150 / λ. λ = c/fc.
fc = 605 MHz → Doppler ≈ 300 Hz
fc = 88 MHz → Doppler ≈ 80 Hz
fc = 5180 MHz → Doppler ≈ 2600 Hz
"""
SPEED_OF_LIGHT_MPS = 299_792_458.0
typical_speed = 150.0 # m/s, civilian aircraft
return 2.0 * typical_speed * fc_mhz * 1e6 / SPEED_OF_LIGHT_MPS
if __name__ == "__main__":
# Quick CLI: print current config (live attempt + fallback)
import argparse
p = argparse.ArgumentParser()
p.add_argument("--blah2", default="http://10.1.62.202:3000")
args = p.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
cfg = fetch_blah2_config(args.blah2)
print(json.dumps(cfg, indent=2))