Problem Statement
We are using tail and subprocess.Popen to ingest log data into a custom application. This application is expected to run on a variety of *NIX systems including OpenBSD, RedHat Linux, and Debian 10 + 11.
We are spawning a new subprocess using subprocess.Popen calling the system installed tail binary to collect log data from files. Using the system installed version of tail is deemed most efficient as it uses inotify and polling to discover changes to the file.
The child process pushes output through STDOUT and an IO multiplexer created by DefaultSelector handles the communication with our python application. Per the docs, it is recommended that DefaultSelector be used when running on heterogenous *NIX systems and choses the right selector object. IO Selector Doc
Through testing we've noticed that read1 calls return IO that may contain broken lines. For example, a line containing 29834 within the file may be returned as two messages: 2983 and 4.
Question
Curious if this is a race condition within the selector module in python or on the system. Slowing our writes to the log file reduces this anomaly but does not fully eliminate it. I have created a POC for testing and further explanations. We need a solution that will fix this issue at the application or system level. It may also be accepted that this is the wrong approach for tailing a file in this manor.
Example and Setup Instructions
Setup
Two scripts that can be used to recreate the problem. The first, tailp.py will tail the file and print the output to the parent processes STDOUT. The second script write_logs.py will simple write 30K log lines to the target file ./log.log.
Create a target log file within the local directory
touch log.log.Run
python3 tailp.pywithin a terminal.Within a separate terminal run
python3 write_logs.py.Within the terminal session for
tailp.pylog lines should be emitted. There will likely be broken numbers such as:
29829
29830
29831
29832
29833
2983 <-- Should be a single line
4 <--
29835
29836
29837
29838
29839
29840
29841
Scripts
tailp.py
import selectors
import logging
import subprocess
from shlex import quote
from shutil import which
from threading import Event
def intake_gateway_push(datum):
"""
Function used to push data into a mocked component.
"""
print(datum)
def tail(filename: str, stop_event: Event):
"""
Tails a file and pushes data into intake gateway.
:param filename: full path to file
:param stop_event: event used to send stop signal
"""
with subprocess.Popen(
[which("tail"), "-f", "-n", "0", quote(filename)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
) as tail_process:
logging.info("Creating tail process with PID '%s'", tail_process.pid)
# Using 'DefaultSelector' as this is expected to run
# on Linux and BSD systems
selector = selectors.DefaultSelector()
selector.register(tail_process.stdout, selectors.EVENT_READ)
# Loops when 'selector.select' times out, exits
# when stop event is sent.
while not stop_event.is_set():
for key, _ in selector.select(timeout=5.0):
# Decoding file handler data into utf-8 and separating data
# by newlines.
data = key.fileobj.read1().decode('utf-8').split("\n")
# Removing all empty strings produced by 'split(\n)'.
data = [ datum for datum in data if datum != '' ]
# Pushing each log line into mocked gateway function.
for datum in data:
intake_gateway_push(datum)
logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
selector.unregister(tail_process.stdout)
# Tail process is usually defunct and does not exit on OpenBSD7.3
# explicity calling 'kill' solves this, need to investigate how
# the context manager is cleaning up child processes.
tail_process.kill()
if __name__ == "__main__":
stop_event = Event()
tail("log.log", stop_event)
write_logs.py
import time
def write_logs(count: int):
"""
Write specified quantity of logs to file.
:param count: number of log lines to write
"""
with open(f"./log.log", mode="w", encoding="utf-8") as file:
for i in range(count):
file.write(f"{i}\n")
time.sleep(0.001)
if __name__ == "__main__":
write_logs(30000)
The problem appears when a complete line is not received in a single read operation. If the last character of the received data is not a newline character it means that the line is incomplete. The incomplete line gets stored for the next iteration and the next read operation appends more data to it. However the subsequent split operation may still result in incomplete lines being processed.
Example 1.
The incomplete line is stored in the
incomplete_linevariable. The variable is updated with each iteration to accumulate the incomplete line along with the new data received. The combined data is then split into lines and all complete lines except the last one are processed. The last line is stored as potentially incomplete for the next iteration.Example 2.
For a more efficient solution you can use the same process, but instead of collecting incomplete lines you can process the data in a streaming fashion and collect complete lines as they become available.
NOTE: This solution assumes data is terminated by
\nso adjust according to your needs.