4343# ---------------------------------------------------------------------------
4444import os as _os
4545import re
46+ import threading
4647from collections import Counter
4748from dataclasses import dataclass
4849from pathlib import Path
4950from typing import Iterator
5051
5152from ada .config import logger
5253
54+ # Positional read, portable to Windows. POSIX os.pread is thread-safe and doesn't disturb a shared
55+ # offset; Windows has no os.pread, so fall back to seek+read serialized by a lock (mirrors
56+ # sesam.results.byte_source.FileByteSource).
57+ _HAS_PREAD = hasattr (_os , "pread" )
58+ _PREAD_LOCK = threading .Lock ()
59+
60+
61+ def _pread (fd : int , length : int , offset : int ) -> bytes :
62+ if _HAS_PREAD :
63+ return _os .pread (fd , length , offset )
64+ with _PREAD_LOCK :
65+ _os .lseek (fd , offset , _os .SEEK_SET )
66+ return _os .read (fd , length )
67+
5368
5469def _mem_probe_enabled () -> bool :
5570 return _os .environ .get ("ADA_STEP_STREAM_MEM_PROBE" , "" ) not in ("" , "0" , "false" , "no" )
@@ -726,7 +741,7 @@ def detect_step_length_unit_scale(filepath) -> float:
726741 overlap = len (needle ) - 1
727742 chunk = _UNIT_SCAN_CHUNK
728743 try :
729- fd = os .open (str (filepath ), os .O_RDONLY )
744+ fd = os .open (str (filepath ), os .O_RDONLY | getattr ( os , "O_BINARY" , 0 ) )
730745 except OSError :
731746 return 1.0
732747 try :
@@ -735,7 +750,7 @@ def detect_step_length_unit_scale(filepath) -> float:
735750 tail = b"" # trailing bytes of the previous chunk, so a needle straddling a
736751 hit = - 1 # chunk boundary is still found
737752 while pos < size :
738- buf = os . pread (fd , chunk , pos )
753+ buf = _pread (fd , chunk , pos )
739754 if not buf :
740755 break
741756 base = pos - len (tail ) # absolute file offset of window[0]
@@ -751,7 +766,7 @@ def detect_step_length_unit_scale(filepath) -> float:
751766 # Statement start = just past the prior ';'. Read a bounded window ending at the
752767 # match, then the full statement forward via the shared pread helper.
753768 bstart = max (0 , hit - (1 << 16 ))
754- pre = os . pread (fd , hit - bstart , bstart )
769+ pre = _pread (fd , hit - bstart , bstart )
755770 k = pre .rfind (b";" )
756771 start = bstart + k + 1 if k >= 0 else bstart
757772 stmt = _read_statement_pread (fd , start , size )
@@ -2051,11 +2066,10 @@ def _read_statement_pread(fd: int, offset: int, file_size: int) -> str:
20512066 window until the terminating ';' is found or EOF. Unlike the mmap slice, the file
20522067 pages this touches stay in the reclaimable OS page cache and never enter the process
20532068 address space / VmRSS — which is what the worker memory caps measure."""
2054- import os
20552069
20562070 chunk = _PREAD_CHUNK
20572071 while True :
2058- buf = os . pread (fd , chunk , offset )
2072+ buf = _pread (fd , chunk , offset )
20592073 end = _stmt_end_bytes (buf )
20602074 if end >= 0 :
20612075 return buf [:end ].decode ("utf-8" , "replace" )
@@ -2267,7 +2281,7 @@ def open_pool(self):
22672281
22682282 import numpy as np
22692283
2270- fd = os .open (self .step_path , os .O_RDONLY )
2284+ fd = os .open (self .step_path , os .O_RDONLY | getattr ( os , "O_BINARY" , 0 ) )
22712285 if self .idx_ids_path is not None :
22722286 ids_mm = np .memmap (self .idx_ids_path , dtype = np .int64 , mode = "r" )
22732287 offs_mm = np .memmap (self .idx_offs_path , dtype = np .int64 , mode = "r" )
0 commit comments