From 74ecf6e34d1913824dfc0a5e6ecef6470d38e0dc Mon Sep 17 00:00:00 2001 From: Sebastian Waldbauer Date: Tue, 1 Feb 2022 14:41:49 +0100 Subject: [PATCH] FIX: Possible race condition in file collector Locking the file with python fcntl to aquire a file lock, it may not support windows installations :/ Fixes #2128 Fixes #1631 Signed-off-by: Sebastian Waldbauer --- intelmq/bots/collectors/file/collector_file.py | 14 ++++++++++---- .../tests/bots/collectors/file/test_collector.py | 11 +++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/intelmq/bots/collectors/file/collector_file.py b/intelmq/bots/collectors/file/collector_file.py index e76a0b4975..2ea1d12cc1 100644 --- a/intelmq/bots/collectors/file/collector_file.py +++ b/intelmq/bots/collectors/file/collector_file.py @@ -24,6 +24,7 @@ import fnmatch import os +import fcntl import intelmq.lib.exceptions as exceptions from intelmq.lib.bot import CollectorBot @@ -71,10 +72,15 @@ def process(self): template.add("feed.url", "file://localhost%s" % filename) template.add("extra.file_name", f) - with open(filename, 'rb') as fh: - for report in generate_reports(template, fh, self.chunk_size, - self.chunk_replicate_header): - self.send_message(report) + try: + with open(filename, 'rb') as fh: + fcntl.flock(fh, fcntl.LOCK_EX | fcntl.LOCK_NB) + for report in generate_reports(template, fh, + self.chunk_size, self.chunk_replicate_header): + self.send_message(report) + fcntl.flock(fh, fcntl.LOCK_UN) + except BlockingIOError: + self.logger.info("File is already being used by another process, skipping.") if self.delete_file: try: diff --git a/intelmq/tests/bots/collectors/file/test_collector.py b/intelmq/tests/bots/collectors/file/test_collector.py index b2337f2411..7a475145a7 100644 --- a/intelmq/tests/bots/collectors/file/test_collector.py +++ b/intelmq/tests/bots/collectors/file/test_collector.py @@ -8,6 +8,7 @@ """ import os import unittest +import fcntl import intelmq.lib.test as test import intelmq.lib.utils as utils @@ -49,6 +50,16 @@ def test_events(self): self.assertMessageEqual(0, OUTPUT) + def test_file_lock(self): + f = open(PATH, 'rb') + fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) + self.run_bot(iterations=1) + self.assertLogMatches('File is already being used by another process, skipping.', levelname="INFO") + fcntl.flock(f, fcntl.LOCK_UN) + f.close() + self.run_bot(iterations=1) + self.assertMessageEqual(0, OUTPUT) + if __name__ == '__main__': # pragma: no cover unittest.main()