Skip to content

Commit

Permalink
FIX: make load_balance work again
Browse files Browse the repository at this point in the history
  • Loading branch information
monoidic committed Aug 9, 2021
1 parent 3daff30 commit 5303764
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@
class PipelineFactory(object):

@staticmethod
def create(logger, broker=None, direction=None, queues=None, pipeline_args={}, load_balance=False, is_multithreaded=False):
def create(logger, broker=None, direction=None, queues=None, pipeline_args=None, load_balance=False, is_multithreaded=False):
"""
direction: "source" or "destination", optional, needed for queues
queues: needs direction to be set, calls set_queues
bot: Bot instance
"""
if pipeline_args is None:
pipeline_args = {}

if direction not in [None, "source", "destination"]:
raise exceptions.InvalidArgument("direction", got=direction,
expected=["destination", "source"])

if 'load_balance' not in pipeline_args:
pipeline_args['load_balance'] = load_balance

if direction == 'source' and 'source_pipeline_broker' in pipeline_args:
broker = pipeline_args['source_pipeline_broker'].title()
if direction == 'destination' and 'destination_pipeline_broker' in pipeline_args:
Expand Down Expand Up @@ -96,10 +102,7 @@ def set_queues(self, queues: Optional[str], queues_type: str):
"""
if queues_type == "source":
self.source_queue = queues
if queues is not None:
self.internal_queue = queues + "-internal"
else:
self.internal_queue = None
self.internal_queue = None if queues is None else f'{queues}-internal'

elif queues_type == "destination":
type_ = type(queues)
Expand All @@ -109,8 +112,7 @@ def set_queues(self, queues: Optional[str], queues_type: str):
q = {"_default": queues.split()}
elif type_ is dict:
q = queues
for key, val in queues.items():
q[key] = val if type(val) is list else val.split()
q.update({key: (val if isinstance(val, list) else val.split()) for key, val in queues.items()})
else:
raise exceptions.InvalidArgument(
'queues', got=queues,
Expand Down Expand Up @@ -187,15 +189,12 @@ class Redis(Pipeline):
destination_pipeline_password = None

def load_configurations(self, queues_type):
self.host = self.pipeline_args.get("{}_pipeline_host".format(queues_type),
"127.0.0.1")
self.port = self.pipeline_args.get("{}_pipeline_port".format(queues_type), "6379")
self.db = self.pipeline_args.get("{}_pipeline_db".format(queues_type), 2)
self.password = self.pipeline_args.get("{}_pipeline_password".format(queues_type),
None)
self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "127.0.0.1")
self.port = self.pipeline_args.get(f"{queues_type}_pipeline_port", "6379")
self.db = self.pipeline_args.get(f"{queues_type}_pipeline_db", 2)
self.password = self.pipeline_args.get(f"{queues_type}_pipeline_password", None)
# socket_timeout is None by default, which means no timeout
self.socket_timeout = self.pipeline_args.get("{}_pipeline_socket_timeout".format(queues_type),
None)
self.socket_timeout = self.pipeline_args.get(f"{queues_type}_pipeline_socket_timeout", None)
self.load_balance = self.pipeline_args.get("load_balance", False)
self.load_balance_iterator = 0

Expand Down Expand Up @@ -241,8 +240,7 @@ def send(self, message: str, path: str = "_default",
if self.load_balance:
queues = [queues[self.load_balance_iterator]]
self.load_balance_iterator += 1
if self.load_balance_iterator == len(self.destination_queues[path]):
self.load_balance_iterator = 0
self.load_balance_iterator %= len(self.destination_queues[path])

for destination_queue in queues:
try:
Expand Down Expand Up @@ -559,8 +557,7 @@ def send(self, message: str, path: str = "_default",
if self.load_balance:
queues = [queues[self.load_balance_iterator]]
self.load_balance_iterator += 1
if self.load_balance_iterator == len(self.destination_queues[path]):
self.load_balance_iterator = 0
self.load_balance_iterator %= len(self.destination_queues[path])

for destination_queue in queues:
self._send(destination_queue, message)
Expand Down

0 comments on commit 5303764

Please sign in to comment.