Skip to content

Commit 639ce06

Browse files
committed
fix: let repeated sysg logs follow sessions reconnect cleanly
1 parent 81fbf99 commit 639ce06

3 files changed

Lines changed: 196 additions & 65 deletions

File tree

src/bin/main.rs

Lines changed: 103 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crossterm::{
1616
event::{self, Event, KeyCode, KeyEvent, KeyModifiers},
1717
terminal,
1818
};
19-
use libc::{SIGKILL, SIGTERM, getpgrp, getppid, killpg};
19+
use libc::{SIGKILL, SIGTERM, getppid};
2020
use nix::{
2121
sys::signal,
2222
unistd::{Pid, Uid},
@@ -1775,6 +1775,106 @@ fn force_kill(pid: libc::pid_t) {
17751775
let _ = wait_for_supervisor_exit(pid, Duration::from_secs(2));
17761776
}
17771777

1778+
/// Returns whether the tracked root PID is still alive.
1779+
fn tracked_service_alive(pid: libc::pid_t) -> bool {
1780+
let target = Pid::from_raw(pid);
1781+
match signal::kill(target, None) {
1782+
Ok(_) => true,
1783+
Err(err) => err != nix::Error::from(nix::errno::Errno::ESRCH),
1784+
}
1785+
}
1786+
1787+
/// Waits for a tracked service root PID to exit.
1788+
fn wait_for_tracked_service_exit(pid: libc::pid_t, timeout: Duration) -> bool {
1789+
let deadline = Instant::now() + timeout;
1790+
while Instant::now() < deadline {
1791+
if !tracked_service_alive(pid) {
1792+
return true;
1793+
}
1794+
thread::sleep(Duration::from_millis(100));
1795+
}
1796+
!tracked_service_alive(pid)
1797+
}
1798+
1799+
/// Sends a signal to a tracked service using its process group when available and falling back
1800+
/// to the root PID when group signaling is not possible.
1801+
fn signal_tracked_service(
1802+
service: &str,
1803+
pid: libc::pid_t,
1804+
pgid: Option<libc::pid_t>,
1805+
signal: libc::c_int,
1806+
) {
1807+
let mut delivered = false;
1808+
1809+
if let Some(group_id) = pgid {
1810+
unsafe {
1811+
if libc::killpg(group_id, signal) == 0 {
1812+
delivered = true;
1813+
} else {
1814+
let err = std::io::Error::last_os_error();
1815+
match err.raw_os_error() {
1816+
Some(code) if code == libc::ESRCH => {
1817+
delivered = true;
1818+
}
1819+
Some(code) if code == libc::EPERM => {}
1820+
_ => eprintln!(
1821+
"systemg: failed to send signal {signal} to '{service}' (pgid {group_id}): {err}"
1822+
),
1823+
}
1824+
}
1825+
}
1826+
}
1827+
1828+
if delivered {
1829+
return;
1830+
}
1831+
1832+
unsafe {
1833+
if libc::kill(pid, signal) == -1 {
1834+
let err = std::io::Error::last_os_error();
1835+
match err.raw_os_error() {
1836+
Some(code) if code == libc::ESRCH => {}
1837+
_ => eprintln!(
1838+
"systemg: failed to send signal {signal} to '{service}' (pid {pid}): {err}"
1839+
),
1840+
}
1841+
}
1842+
}
1843+
}
1844+
1845+
/// Terminates tracked services from the pid file during foreground Ctrl-C shutdown.
1846+
fn terminate_tracked_services_on_shutdown() {
1847+
let mut tracked: Vec<(String, libc::pid_t, Option<libc::pid_t>)> = Vec::new();
1848+
if let Ok(pid_file) = PidFile::load() {
1849+
for (service, pid) in pid_file.services() {
1850+
tracked.push((
1851+
service.clone(),
1852+
*pid as libc::pid_t,
1853+
pid_file.pgid_for(service).map(|group| group as libc::pid_t),
1854+
));
1855+
}
1856+
}
1857+
1858+
for (service, pid, pgid) in &tracked {
1859+
signal_tracked_service(service, *pid, *pgid, libc::SIGTERM);
1860+
}
1861+
1862+
for (service, pid, pgid) in &tracked {
1863+
if wait_for_tracked_service_exit(*pid, Duration::from_secs(2)) {
1864+
continue;
1865+
}
1866+
signal_tracked_service(service, *pid, *pgid, libc::SIGKILL);
1867+
}
1868+
1869+
for (service, pid, _) in &tracked {
1870+
if !wait_for_tracked_service_exit(*pid, Duration::from_secs(2)) {
1871+
eprintln!(
1872+
"systemg: service '{service}' (pid {pid}) survived foreground shutdown"
1873+
);
1874+
}
1875+
}
1876+
}
1877+
17781878
/// Handles process exited.
17791879
fn process_exited(pid: libc::pid_t) -> bool {
17801880
let proc_root = PathBuf::from(format!("/proc/{pid}"));
@@ -2280,55 +2380,8 @@ fn daemonize_systemg() -> std::io::Result<()> {
22802380
fn register_signal_handler() -> Result<(), Box<dyn Error>> {
22812381
ctrlc::set_handler(move || {
22822382
println!("systemg is shutting down... terminating child services");
2283-
2284-
let mut service_pids: Vec<(String, libc::pid_t)> = Vec::new();
2285-
if let Ok(pid_file) = PidFile::load() {
2286-
for (service, pid) in pid_file.services() {
2287-
service_pids.push((service.clone(), *pid as libc::pid_t));
2288-
}
2289-
}
2290-
2291-
for (service, pgid) in &service_pids {
2292-
unsafe {
2293-
if libc::killpg(*pgid, libc::SIGTERM) == -1 {
2294-
let err = std::io::Error::last_os_error();
2295-
match err.raw_os_error() {
2296-
Some(code) if code == libc::ESRCH => {}
2297-
Some(code) if code == libc::EPERM => {
2298-
let _ = libc::kill(*pgid, libc::SIGTERM);
2299-
}
2300-
_ => eprintln!(
2301-
"systemg: failed to send SIGTERM to '{service}' (pgid {pgid}): {err}"
2302-
),
2303-
}
2304-
}
2305-
}
2306-
}
2307-
2308-
std::thread::sleep(std::time::Duration::from_millis(150));
2309-
2310-
for (service, pgid) in &service_pids {
2311-
unsafe {
2312-
if libc::killpg(*pgid, libc::SIGKILL) == -1 {
2313-
let err = std::io::Error::last_os_error();
2314-
match err.raw_os_error() {
2315-
Some(code) if code == libc::ESRCH => {}
2316-
Some(code) if code == libc::EPERM => {
2317-
let _ = libc::kill(*pgid, libc::SIGKILL);
2318-
}
2319-
_ => eprintln!(
2320-
"systemg: failed to send SIGKILL to '{service}' (pgid {pgid}): {err}"
2321-
),
2322-
}
2323-
}
2324-
}
2325-
}
2326-
2327-
unsafe {
2328-
let pgid = getpgrp();
2329-
killpg(pgid, SIGKILL);
2330-
}
2331-
2383+
terminate_tracked_services_on_shutdown();
2384+
let _ = ipc::cleanup_runtime();
23322385
std::process::exit(0);
23332386
})?;
23342387

src/logs.rs

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ use std::{
99
fs::{self, OpenOptions},
1010
io::{IsTerminal, Read, Write},
1111
path::{Path, PathBuf},
12-
sync::{Arc, Mutex, OnceLock, mpsc},
12+
sync::{Arc, Mutex, OnceLock, mpsc, mpsc::RecvTimeoutError},
1313
thread,
14+
time::Duration,
1415
};
1516
#[cfg(any(target_os = "linux", target_os = "macos"))]
1617
use std::{
1718
os::unix::{
18-
io::{FromRawFd, IntoRawFd},
19+
io::{AsRawFd, FromRawFd, IntoRawFd},
1920
net::UnixStream,
2021
},
2122
process::{Command, Stdio},
@@ -106,6 +107,35 @@ fn subscribe_live_log(service: &str, kind: &str) -> mpsc::Receiver<Vec<u8>> {
106107
rx
107108
}
108109

110+
/// Returns whether the client side of a Unix socket has disconnected.
111+
#[cfg(any(target_os = "linux", target_os = "macos"))]
112+
fn socket_peer_disconnected(stream: &UnixStream) -> bool {
113+
let fd = stream.as_raw_fd();
114+
let mut byte = 0_u8;
115+
let result = unsafe {
116+
libc::recv(
117+
fd,
118+
&mut byte as *mut u8 as *mut libc::c_void,
119+
1,
120+
libc::MSG_PEEK | libc::MSG_DONTWAIT,
121+
)
122+
};
123+
124+
if result == 0 {
125+
return true;
126+
}
127+
128+
if result < 0 {
129+
let err = std::io::Error::last_os_error();
130+
return !matches!(
131+
err.raw_os_error(),
132+
Some(code) if code == libc::EAGAIN || code == libc::EWOULDBLOCK
133+
);
134+
}
135+
136+
false
137+
}
138+
109139
/// Normalizes this item.
110140
fn normalize(name: &str) -> String {
111141
name.chars()
@@ -630,9 +660,30 @@ impl LogManager {
630660
}
631661
if matches!(mode, TailMode::Follow) {
632662
let receiver = subscribe_live_log(service_name, kind_name);
633-
while let Ok(chunk) = receiver.recv() {
634-
socket.write_all(&chunk)?;
635-
socket.flush()?;
663+
loop {
664+
match receiver.recv_timeout(Duration::from_millis(250)) {
665+
Ok(chunk) => match socket.write_all(&chunk) {
666+
Ok(()) => {
667+
socket.flush()?;
668+
}
669+
Err(err)
670+
if matches!(
671+
err.kind(),
672+
std::io::ErrorKind::BrokenPipe
673+
| std::io::ErrorKind::ConnectionReset
674+
) =>
675+
{
676+
break;
677+
}
678+
Err(err) => return Err(err.into()),
679+
},
680+
Err(RecvTimeoutError::Timeout) => {
681+
if socket_peer_disconnected(&socket) {
682+
break;
683+
}
684+
}
685+
Err(RecvTimeoutError::Disconnected) => break,
686+
}
636687
}
637688
}
638689
return Ok(());

src/supervisor.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -648,12 +648,41 @@ impl Supervisor {
648648
follow,
649649
} = command
650650
{
651-
if let Err(err) = self.handle_logs_command(
652-
service, lines, &kind, follow, &stream,
653-
) {
654-
error!("Supervisor logs command failed: {err}");
655-
let _ = writeln!(stream, "{err}");
656-
}
651+
let snapshot = match self.collect_live_snapshot() {
652+
Ok(snapshot) => {
653+
self.status_cache.replace(snapshot.clone());
654+
snapshot
655+
}
656+
Err(err) => {
657+
error!("Supervisor logs command failed: {err}");
658+
let _ = writeln!(stream, "{err}");
659+
continue;
660+
}
661+
};
662+
let pid_file = self.daemon.pid_file_handle();
663+
let mut log_stream = match stream.try_clone() {
664+
Ok(stream) => stream,
665+
Err(err) => {
666+
error!(
667+
"Failed to clone supervisor log stream: {err}"
668+
);
669+
continue;
670+
}
671+
};
672+
thread::spawn(move || {
673+
if let Err(err) = Supervisor::handle_logs_command(
674+
snapshot,
675+
pid_file,
676+
service,
677+
lines,
678+
&kind,
679+
follow,
680+
&log_stream,
681+
) {
682+
error!("Supervisor logs command failed: {err}");
683+
let _ = writeln!(log_stream, "{err}");
684+
}
685+
});
657686
continue;
658687
}
659688
match self.handle_command(command) {
@@ -698,16 +727,14 @@ impl Supervisor {
698727

699728
/// Streams logs through the supervisor-owned control socket.
700729
fn handle_logs_command(
701-
&mut self,
730+
snapshot: crate::status::StatusSnapshot,
731+
pid_file: std::sync::Arc<std::sync::Mutex<crate::daemon::PidFile>>,
702732
service: Option<String>,
703733
lines: usize,
704734
kind: &str,
705735
follow: bool,
706736
stream: &std::os::unix::net::UnixStream,
707737
) -> Result<(), SupervisorError> {
708-
let snapshot = self.collect_live_snapshot()?;
709-
self.status_cache.replace(snapshot.clone());
710-
let pid_file = self.daemon.pid_file_handle();
711738
let manager = LogManager::new(pid_file);
712739
let requested_kind = Some(kind);
713740

0 commit comments

Comments
 (0)