Skip to content

Commit

Permalink
FIX: Possible race condition in file collector
Browse files Browse the repository at this point in the history
Locking the file with python fcntl to aquire a file lock,
it may not support windows installations :/

Fixes #2128
Fixes #1631

Signed-off-by: Sebastian Waldbauer <[email protected]>
  • Loading branch information
waldbauer-certat committed Feb 1, 2022
1 parent 1dc5364 commit 7a518ea
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ CHANGELOG

#### Collectors
- `intelmq.bots.collectors.mail._lib`: Add support for unverified SSL/STARTTLS connections (PR#2055 by Sebastian Wagner).
- `intelmq.bots.collectors.file.collector_file`: Added file lock support, no more race conditions (PR#2147 by Sebastian Waldbauer)

#### Parsers
- `intelmq.bots.parsers.alienvault.parser_otx`: Save CVE data in `extra.cve` instead of `extra.CVE` due to the field name restriction on lower-case characters (PR#2059 by Sebastian Wagner).
Expand Down
36 changes: 23 additions & 13 deletions intelmq/bots/collectors/file/collector_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import fnmatch
import os
import fcntl

import intelmq.lib.exceptions as exceptions
from intelmq.lib.bot import CollectorBot
Expand Down Expand Up @@ -58,32 +59,41 @@ def process(self):
self.logger.debug("Started looking for files.")

if os.path.isdir(self.path):
p = os.path.abspath(self.path)
path = os.path.abspath(self.path)

# iterate over all files in dir
for f in os.listdir(p):
filename = os.path.join(p, f)
for file in os.listdir(path):
filename = os.path.join(path, file)
if os.path.isfile(filename):
if fnmatch.fnmatch(f, '*' + self.postfix):
if fnmatch.fnmatch(file, '*' + self.postfix):
self.logger.info("Processing file %r.", filename)

template = self.new_report()
template.add("feed.url", "file://localhost%s" % filename)
template.add("extra.file_name", f)

with open(filename, 'rb') as fh:
for report in generate_reports(template, fh, self.chunk_size,
self.chunk_replicate_header):
self.send_message(report)
template.add("feed.url", f"file://localhost{filename}")
template.add("extra.file_name", file)

try:
with open(filename, 'rb') as file_handle:
fcntl.flock(file_handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
for report in generate_reports(template, file_handle,
self.chunk_size,
self.chunk_replicate_header):
self.send_message(report)
fcntl.flock(file_handle, fcntl.LOCK_UN)
except BlockingIOError:
self.logger.info("File is already being used by another"
" process, skipping.")

if self.delete_file:
try:
os.remove(filename)
self.logger.debug("Deleted file: %r.", filename)
except PermissionError:
self.logger.error("Could not delete file %r.", filename)
self.logger.info("Maybe I don't have sufficient rights on that file?")
self.logger.error("Stopping now, to prevent reading this file again.")
self.logger.info("Maybe I don't have sufficient rights"
" on that file?")
self.logger.error("Stopping now, to prevent reading this"
" file again.")
self.stop()


Expand Down
11 changes: 11 additions & 0 deletions intelmq/tests/bots/collectors/file/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"""
import os
import unittest
import fcntl

import intelmq.lib.test as test
import intelmq.lib.utils as utils
Expand Down Expand Up @@ -49,6 +50,16 @@ def test_events(self):

self.assertMessageEqual(0, OUTPUT)

def test_file_lock(self):
f = open(PATH, 'rb')
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.run_bot(iterations=1)
self.assertLogMatches('File is already being used by another process, skipping.', levelname="INFO")
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
self.run_bot(iterations=1)
self.assertMessageEqual(0, OUTPUT)


if __name__ == '__main__': # pragma: no cover
unittest.main()

0 comments on commit 7a518ea

Please sign in to comment.