-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcot_verify.py
More file actions
200 lines (174 loc) · 6.43 KB
/
Copy pathcot_verify.py
File metadata and controls
200 lines (174 loc) · 6.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/env python3
"""
cot_verify.py — Listen for Cursor-on-Target multicast traffic and
print a one-line summary per message.
Joins the ATAK situational-awareness multicast group (default
239.2.3.1:6969) and parses every CoT XML it receives. Use this to
confirm cot_emitter is broadcasting correctly without needing a real
TAK client (iTAK, ATAK-CIV, WinTAK) configured.
Usage:
python3 cot_verify.py # default group/port
python3 cot_verify.py --raw # also print full XML
python3 cot_verify.py -o received.xml # save raw XML to file
python3 cot_verify.py --group 239.2.3.1 --port 6969 # custom
What you should see when cot_emitter is running:
time type callsign lat lon speed course
22:46:00.936 a-f-A REAL-N12345 37.79504 -122.39156 4.00 m/s 180.0°
22:46:01.042 a-h-A PHANTOM-A1B2C3 37.79700 -122.39600 25.00 m/s 180.0°
...
Type colors:
red = a-h-* (hostile — PHANTOM, DECEPTION verdicts)
green = a-f-* (friendly — CONFIRMED verdicts)
yellow = a-s-* (suspect)
plain = a-u-* / other (unknown)
"""
import argparse
import socket
import struct
import sys
import time
import xml.etree.ElementTree as ET
ANSI_RESET = "\033[0m"
ANSI_DIM = "\033[2m"
ANSI_RED = "\033[91m"
ANSI_GREEN = "\033[92m"
ANSI_YELLOW = "\033[93m"
ANSI_CYAN = "\033[96m"
def color_for_type(cot_type: str) -> str:
if cot_type.startswith("a-h"):
return ANSI_RED
if cot_type.startswith("a-f"):
return ANSI_GREEN
if cot_type.startswith("a-s"):
return ANSI_YELLOW
return ""
def parse_event(xml_text: str):
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return None
if root.tag != "event":
return None
point = root.find("point")
detail = root.find("detail")
contact = detail.find("contact") if detail is not None else None
track = detail.find("track") if detail is not None else None
remarks = detail.find("remarks") if detail is not None else None
return {
"uid": root.get("uid", "?"),
"type": root.get("type", "?"),
"time": root.get("time", "?"),
"stale": root.get("stale", "?"),
"lat": float(point.get("lat", 0)) if point is not None else 0.0,
"lon": float(point.get("lon", 0)) if point is not None else 0.0,
"hae": point.get("hae") if point is not None else None,
"callsign": (
contact.get("callsign", "?") if contact is not None else "?"
),
"course": track.get("course") if track is not None else None,
"speed": track.get("speed") if track is not None else None,
"remarks": (remarks.text or "") if remarks is not None else "",
}
def listen(group: str, port: int, show_raw: bool, out_path: str | None):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# SO_REUSEPORT helps when running multiple listeners on the same machine
# (macOS/BSD; Linux too on recent kernels).
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except (AttributeError, OSError):
pass
sock.bind(("", port))
mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
print(f"# CoT verifier listening on multicast {group}:{port}", flush=True)
print(f"# (Ctrl-C to stop)\n", flush=True)
header = (
f"{ANSI_DIM}"
f"{'time':<13}{'type':<14}{'callsign':<22}"
f"{'lat':<11}{'lon':<12}{'speed':<12}{'course':<8}"
f"{ANSI_RESET}"
)
print(header, flush=True)
out_handle = open(out_path, "w") if out_path else None
count = 0
try:
while True:
data, addr = sock.recvfrom(65536)
xml_text = data.decode("utf-8", errors="replace")
count += 1
ev = parse_event(xml_text)
if ev is None:
print(f"# {addr[0]}: unparseable CoT (showing first 80 chars):")
print(f" {xml_text[:80]}…")
continue
color = color_for_type(ev["type"])
# Time portion HH:MM:SS.mmm from ISO timestamp
ts = ev["time"]
ts_short = ts.split("T")[-1].rstrip("Z")[:12] if "T" in ts else ts[-13:-1]
speed = f"{ev['speed']} m/s" if ev["speed"] else "—"
course = f"{ev['course']}°" if ev["course"] else "—"
print(
f"{ts_short:<13}"
f"{color}{ev['type']:<14}{ANSI_RESET}"
f"{ANSI_CYAN}{ev['callsign']:<22}{ANSI_RESET}"
f"{ev['lat']:<11.5f}"
f"{ev['lon']:<12.5f}"
f"{speed:<12}"
f"{course:<8}",
flush=True,
)
if show_raw:
print(f" {ANSI_DIM}{xml_text}{ANSI_RESET}", flush=True)
if out_handle:
out_handle.write(xml_text + "\n")
out_handle.flush()
except KeyboardInterrupt:
print(f"\n# stopped after {count} message(s)", flush=True)
finally:
if out_handle:
out_handle.close()
try:
sock.setsockopt(
socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, mreq
)
except OSError:
pass
sock.close()
def main():
p = argparse.ArgumentParser(description="CoT multicast verifier")
p.add_argument(
"--group",
default="239.2.3.1",
help="Multicast group (default 239.2.3.1, ATAK SA channel)",
)
p.add_argument(
"--port",
type=int,
default=6969,
help="Multicast port (default 6969, ATAK SA)",
)
p.add_argument(
"--raw",
action="store_true",
help="Also print the full received XML",
)
p.add_argument(
"-o",
"--output",
default=None,
help="Append every received raw XML to this file",
)
args = p.parse_args()
try:
listen(args.group, args.port, args.raw, args.output)
except OSError as e:
print(
f"# socket error: {e}\n"
f"# (port {args.port} may be in use, "
f"or you need to run with sudo on some systems)",
file=sys.stderr,
)
sys.exit(1)
if __name__ == "__main__":
main()