Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STOMP-and-n6-related updates, fixes and enhancements, especially adding login-based authentication #2408

Merged
merged 3 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,65 @@ CHANGELOG
------------------

### Configuration
- Add new optional configuration parameters for `intelmq.bots.collectors.stomp.collector`
and `intelmq.bots.outputs.stomp.output` (PR#2408 by Jan Kaliszewski):
- `auth_by_ssl_client_certificate` (Boolean, default: *true*; if *false* then
`ssl_client_certificate` and `ssl_client_certificate_key` will be ignored);
- `username` (STOMP authentication login, default: "guest"; to be used only
if `auth_by_ssl_client_certificate` is *false*);
- `password` (STOMP authentication passcode, default: "guest"; to be used only
if `auth_by_ssl_client_certificate` is *false*).

### Core
- `intelmq.lib.message`: For invalid message keys, add a hint on the failure to the exception: not allowed by configuration or not matching regular expression (PR#2398 by Sebastian Wagner).
- `intelmq.lib.exceptions.InvalidKey`: Add optional parameter `additional_text` (PR#2398 by Sebastian Wagner).
- `intelmq.lib.mixins`: Add a new class, `StompMixin` (defined in a new submodule: `stomp`),
which provides certain common STOMP-bot-specific operations, factored out from
`intelmq.bots.collectors.stomp.collector` and `intelmq.bots.outputs.stomp.output`
(PR#2408 by Jan Kaliszewski).

### Development

### Data Format

### Bots
#### Collectors
- `intelmq.bots.collectors.stomp.collector` (PR#2408 by Jan Kaliszewski):
- Add support for authentication based on STOMP login and passcode,
introducing 3 new configuration parameters (see above: *Configuration*).
- Update the code to support new versions of `stomp.py`, including the latest (`8.1.0`);
fixes [#2342](https://github.com/certtools/intelmq/issues/2342).
- Fix the reconnection behavior: do not attempt to reconnect after `shutdown`. Also,
never attempt to reconnect if the version of `stomp.py` is older than `4.1.21` (it
did not work properly anyway).
- Add coercion of the `port` config parameter to `int`.
- Add implementation of the `check` hook (verifying, in particular, accessibility
of necessary file(s)).
- Remove undocumented and unused attributes of `StompCollectorBot` instances:
`ssl_ca_cert`, `ssl_cl_cert`, `ssl_cl_cert_key`.
- Minor fixes/improvements and some refactoring (see also above: *Core*...).

#### Parsers

#### Experts

#### Outputs
- `intelmq.bots.outputs.stomp.output` (PR#2408 by Jan Kaliszewski):
- Add support for authentication based on STOMP login and passcode,
introducing 3 new configuration parameters (see above: *Configuration*).
- Update the code to support new versions of `stomp.py`, including the latest (`8.1.0`).
- Fix `AttributeError` caused by attempts to get unset attributes of `StompOutputBot`
(`ssl_ca_cert` et consortes).
- Add coercion of the `port` config parameter to `int`.
- Add implementation of the `check` hook (verifying, in particular, accessibility
of necessary file(s)).
- Add `stomp.py` version check (raise `MissingDependencyError` if not `>=4.1.8`).
- Minor fixes/improvements and some refactoring (see also above: *Core*...).

### Documentation
- Add a readthedocs configuration file to fix the build fail (PR#2403 by Sebastian Wagner).
- Update/fix/improve the stuff related to the STOMP bots and integration with the *n6*'s
Stream API (PR#2408 by Jan Kaliszewski).

### Packaging

Expand Down
20 changes: 13 additions & 7 deletions docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -945,12 +945,15 @@ Install the `stomp.py` library from PyPI:
**Configuration Parameters**

* **Feed parameters** (see above)
* `exchange`: exchange point
* `exchange`: STOMP *destination* to subscribe to, e.g. "/exchange/my.org/*.*.*.*"
* `port`: 61614
* `server`: hostname e.g. "n6stream.cert.pl"
* `server`: hostname, e.g. "n6stream.cert.pl"
* `ssl_ca_certificate`: path to CA file
* `ssl_client_certificate`: path to client cert file
* `ssl_client_certificate_key`: path to client cert key file
* `auth_by_ssl_client_certificate`: Boolean, default: true (note: set to false for new *n6* auth)
* `ssl_client_certificate`: path to client cert file, used only if `auth_by_ssl_client_certificate` is true
* `ssl_client_certificate_key`: path to client cert key file, used only if `auth_by_ssl_client_certificate` is true
* `username`: STOMP *login* (e.g., *n6* user login), used only if `auth_by_ssl_client_certificate` is false
* `password`: STOMP *passcode* (e.g., *n6* user API key), used only if `auth_by_ssl_client_certificate` is false


.. _intelmq.bots.collectors.twitter.collector_twitter:
Expand Down Expand Up @@ -4305,7 +4308,7 @@ Also you will need a so called "exchange point".

**Configuration Parameters**

* `exchange`: The exchange to push at
* `exchange`: STOMP *destination* to push at, e.g. "/exchange/_push"
* `heartbeat`: default: 60000
* `message_hierarchical_output`: Boolean, default: false
* `message_jsondict_as_string`: Boolean, default: false
Expand All @@ -4314,8 +4317,11 @@ Also you will need a so called "exchange point".
* `server`: Host or IP address of the STOMP server
* `single_key`: Boolean or string (field name), default: false
* `ssl_ca_certificate`: path to CA file
* `ssl_client_certificate`: path to client cert file
* `ssl_client_certificate_key`: path to client cert key file
* `auth_by_ssl_client_certificate`: Boolean, default: true (note: set to false for new *n6* auth)
* `ssl_client_certificate`: path to client cert file, used only if `auth_by_ssl_client_certificate` is true
* `ssl_client_certificate_key`: path to client cert key file, used only if `auth_by_ssl_client_certificate` is true
* `username`: STOMP *login* (e.g., *n6* user login), used only if `auth_by_ssl_client_certificate` is false
* `password`: STOMP *passcode* (e.g., *n6* user API key), used only if `auth_by_ssl_client_certificate` is false


.. _intelmq.bots.outputs.tcp.output:
Expand Down
3 changes: 1 addition & 2 deletions docs/user/n6-integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ n6 is maintained and developed by `CERT.pl <https://www.cert.pl/>`_.

Information about n6 can be found here:

- Website: `n6.cert.pl <https://n6.cert.pl/en/>`_
- Website: `cert.pl/en/n6 <https://cert.pl/en/n6/>`_
- Source Code: `github.com/CERT-Polska/n6 <https://github.com/CERT-Polska/n6/>`_
- n6 documentation: `n6.readthedocs.io <https://n6.readthedocs.io/>`_
- n6sdk developer documentation: `n6sdk.readthedocs.io <https://n6sdk.readthedocs.io/>`_

.. image:: /_static/n6/n6-schemat2.png
:alt: n6 schema
Expand Down
86 changes: 46 additions & 40 deletions intelmq/bots/collectors/stomp/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import os.path

from intelmq.lib.bot import CollectorBot
from intelmq.lib.exceptions import MissingDependencyError
from intelmq.lib.mixins import StompMixin

try:
import stomp
import stomp.exception
except ImportError:
stomp = None
else:
Expand All @@ -18,9 +18,10 @@ class StompListener(stomp.PrintingListener):
the stomp listener gets called asynchronously for
every STOMP message
"""
def __init__(self, n6stompcollector, conn, destination):
def __init__(self, n6stompcollector, conn, destination, connect_kwargs=None):
self.stompbot = n6stompcollector
self.conn = conn
self.connect_kwargs = connect_kwargs
self.destination = destination
super().__init__()
if stomp.__version__ >= (5, 0, 0):
Expand All @@ -29,15 +30,23 @@ def __init__(self, n6stompcollector, conn, destination):

def on_heartbeat_timeout(self):
self.stompbot.logger.info("Heartbeat timeout. Attempting to re-connect.")
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination)

def on_error(self, headers, message):
self.stompbot.logger.error('Received an error: %r.', message)

def on_message(self, headers, message):
self.stompbot.logger.debug('Receive message %r...', message[:500])
if self.stompbot._auto_reconnect:
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination,
connect_kwargs=self.connect_kwargs)

def on_error(self, frame, body=None):
if body is None:
# `stomp.py >= 6.1.0`
body = frame.body
self.stompbot.logger.error('Received an error: %r.', body)

def on_message(self, frame, body=None):
if body is None:
# `stomp.py >= 6.1.0`
body = frame.body
self.stompbot.logger.debug('Receive message %r...', body[:500])
report = self.stompbot.new_report()
report.add("raw", message.rstrip())
report.add("raw", body.rstrip())
report.add("feed.url", "stomp://" +
self.stompbot.server +
":" + str(self.stompbot.port) +
Expand All @@ -46,24 +55,31 @@ def on_message(self, headers, message):

def on_disconnected(self):
self.stompbot.logger.debug('Detected disconnect')
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination)
if self.stompbot._auto_reconnect:
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination,
connect_kwargs=self.connect_kwargs)


def connect_and_subscribe(conn, logger, destination, start=False):
def connect_and_subscribe(conn, logger, destination, start=False, connect_kwargs=None):
if start:
conn.start()
conn.connect(wait=True)
if connect_kwargs is None:
connect_kwargs = dict(wait=True)
conn.connect(**connect_kwargs)
conn.subscribe(destination=destination,
id=1, ack='auto')
logger.info('Successfully connected and subscribed.')


class StompCollectorBot(CollectorBot):
class StompCollectorBot(CollectorBot, StompMixin):
"""Collect data from a STOMP Interface"""
""" main class for the STOMP protocol collector """
exchange: str = ''
port: int = 61614
server: str = "n6stream.cert.pl"
auth_by_ssl_client_certificate: bool = True
username: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true
password: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true
ssl_ca_certificate: str = 'ca.pem' # TODO pathlib.Path
ssl_client_certificate: str = 'client.pem' # TODO pathlib.Path
ssl_client_certificate_key: str = 'client.key' # TODO pathlib.Path
Expand All @@ -73,36 +89,22 @@ class StompCollectorBot(CollectorBot):
__conn = False # define here so shutdown method can check for it

def init(self):
if stomp is None:
raise MissingDependencyError("stomp")
elif stomp.__version__ < (4, 1, 8):
raise MissingDependencyError("stomp", version="4.1.8",
installed=stomp.__version__)

self.ssl_ca_cert = self.ssl_ca_certificate
self.ssl_cl_cert = self.ssl_client_certificate
self.ssl_cl_cert_key = self.ssl_client_certificate_key

# check if certificates exist
for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]:
if not os.path.isfile(f):
raise ValueError("Could not open file %r." % f)

_host = [(self.server, self.port)]
self.__conn = stomp.Connection(host_and_ports=_host, use_ssl=True,
ssl_key_file=self.ssl_cl_cert_key,
ssl_cert_file=self.ssl_cl_cert,
ssl_ca_certs=self.ssl_ca_cert,
heartbeats=(self.heartbeat,
self.heartbeat))

self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange))
self.stomp_bot_runtime_initial_check()

# (note: older versions of `stomp.py` do not play well with reconnects)
self._auto_reconnect = (stomp.__version__ >= (4, 1, 21))

self.__conn, connect_kwargs = self.prepare_stomp_connection()
self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange,
connect_kwargs=connect_kwargs))
connect_and_subscribe(self.__conn, self.logger, self.exchange,
start=stomp.__version__ < (4, 1, 20))
start=stomp.__version__ < (4, 1, 20),
connect_kwargs=connect_kwargs)

def shutdown(self):
if not stomp or not self.__conn:
return
self._auto_reconnect = False
try:
self.__conn.disconnect()
except stomp.exception.NotConnectedException:
Expand All @@ -111,5 +113,9 @@ def shutdown(self):
def process(self):
pass

@classmethod
def check(cls, parameters):
return cls.stomp_bot_parameters_check(parameters) or None


BOT = StompCollectorBot
36 changes: 15 additions & 21 deletions intelmq/bots/outputs/stomp/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import os.path

from intelmq.lib.bot import OutputBot
from intelmq.lib.exceptions import MissingDependencyError

from intelmq.lib.mixins import StompMixin

try:
import stomp
except ImportError:
stomp = None


class StompOutputBot(OutputBot):
class StompOutputBot(OutputBot, StompMixin):
"""Send events to a STMOP server"""
""" main class for the STOMP protocol output bot """
exchange: str = "/exchange/_push"
Expand All @@ -28,36 +26,28 @@ class StompOutputBot(OutputBot):
port: int = 61614
server: str = "127.0.0.1" # TODO: could be ip address
single_key: bool = False
auth_by_ssl_client_certificate: bool = True
username: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true
password: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true
ssl_ca_certificate: str = 'ca.pem' # TODO: could be pathlib.Path
ssl_client_certificate: str = 'client.pem' # TODO: pathlib.Path
ssl_client_certificate_key: str = 'client.key' # TODO: patlib.Path

_conn = None

def init(self):
if stomp is None:
raise MissingDependencyError("stomp")

# check if certificates exist
for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]:
if not os.path.isfile(f):
raise ValueError("Could not open SSL (certificate) file '%s'." % f)

_host = [(self.server, self.port)]
self._conn = stomp.Connection(host_and_ports=_host, use_ssl=True,
ssl_key_file=self.ssl_cl_cert_key,
ssl_cert_file=self.ssl_cl_cert,
ssl_ca_certs=self.ssl_ca_cert,
heartbeats=(self.heartbeat,
self.heartbeat))
self.stomp_bot_runtime_initial_check()
(self._conn,
self._connect_kwargs) = self.prepare_stomp_connection()
self.connect()

def connect(self):
self.logger.debug('Connecting.')
# based on the documentation at:
# https://github.com/jasonrbriggs/stomp.py/wiki/Simple-Example
self._conn.start()
self._conn.connect(wait=True)
if stomp.__version__ < (4, 1, 20):
self._conn.start()
self._conn.connect(**self._connect_kwargs)
self.logger.debug('Connected.')

def shutdown(self):
Expand All @@ -73,5 +63,9 @@ def process(self):
destination=self.exchange)
self.acknowledge_message()

@classmethod
def check(cls, parameters):
return cls.stomp_bot_parameters_check(parameters) or None


BOT = StompOutputBot
11 changes: 5 additions & 6 deletions intelmq/etc/feeds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1158,20 +1158,19 @@ providers:
module: intelmq.bots.collectors.stomp.collector
parameters:
exchange: "{insert your exchange point as given by CERT.pl}"
ssl_client_certificate_key: "{insert path to client cert key file for
CERT.pl's n6}"
ssl_ca_certificate: "{insert path to CA file for CERT.pl's n6}"
auth_by_ssl_client_certificate: false
username: "{insert n6 user's login}"
password: "{insert n6 user's API key}"
port: '61614'
ssl_client_certificate: "{insert path to client cert file for CERTpl's
n6}"
server: n6stream.cert.pl
name: __FEED__
provider: __PROVIDER__
parser:
module: intelmq.bots.parsers.n6.parser_n6stomp
parameters:
revision: 2018-01-20
documentation: https://n6.cert.pl/en/
revision: 2023-09-23
documentation: https://n6.readthedocs.io/usage/streamapi/
public: false
AlienVault:
OTX:
Expand Down
3 changes: 2 additions & 1 deletion intelmq/lib/mixins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
from intelmq.lib.mixins.http import HttpMixin
from intelmq.lib.mixins.cache import CacheMixin
from intelmq.lib.mixins.sql import SQLMixin
from intelmq.lib.mixins.stomp import StompMixin

__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin']
__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin', 'StompMixin']
Loading