From b81aca63aba0be90f616170ccce4c55d4c25a1aa Mon Sep 17 00:00:00 2001 From: Letizia Date: Tue, 16 Aug 2022 17:33:15 +0100 Subject: [PATCH 1/2] Create preprocessor for combining messages Create a preprocessor that reads messages from a queue and combines the ones that have the same header (and are next to each other in the queue) into a single message. The combined messages are added to a new queue (which is then used for sending them). --- .gitignore | 1 + bin/preprocessor.py | 195 ++++++++++++++++++++++++++++++++++++++++++++ bin/sender.py | 8 ++ 3 files changed, 204 insertions(+) create mode 100644 bin/preprocessor.py diff --git a/.gitignore b/.gitignore index 0d20b648..b664ab4e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +venv/ \ No newline at end of file diff --git a/bin/preprocessor.py b/bin/preprocessor.py new file mode 100644 index 00000000..cd7d17f8 --- /dev/null +++ b/bin/preprocessor.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python + +# Copyright (C) 2012 STFC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Script to run a sending SSM.""" + +from __future__ import print_function + +import ssm.agents +from ssm import __version__, LOG_BREAK +import os +import logging +from optparse import OptionParser +from dirq.QueueSimple import QueueSimple +from ssm.message_directory import MessageDirectory +import re + +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser + + +def _get_path_to_outq(cp): + try: + qpath = cp.get('messaging', 'path') + print(qpath) + except: + raise ValueError('Cannot retrieve path to outq.') + return qpath + + +def _get_path_type(cp, log): + try: + path_type = cp.get('messaging', 'path_type') + except ConfigParser.NoOptionError: + log.info('No path type defined, assuming dirq.') + print('No path type defined') + path_type = 'dirq' + return path_type + + +def _get_queue(qpath, path_type): + + for dirpath, dirnames, files in os.walk(qpath): + dirs_at_path = dirnames + files_at_path = files + path_examined = qpath + break + + if path_type == 'dirq': + if QueueSimple is None: + raise ImportError('Dirq path_type requested but' + 'dirq module not found.') + + if (len(dirs_at_path) == 0 or + (len(dirs_at_path) == 1 and dirs_at_path[0] == 'combined_queue')): + raise ValueError("Provided path_type was dirq but no " + "directory found at path. Should " + "path_type be 'directory'?") + + outq = QueueSimple(qpath) + + elif path_type == 'directory': + if len(dirs_at_path) > 0: + raise ValueError("Provided path_type was directory but an " + "unexpected directory is present at path, " + "as well as files. Should path_type be 'dirq'?") + + outq = MessageDirectory(qpath) + + else: + raise ValueError('Unsupported path_type variable.') + + return outq + + +def _header_matches_regex(header): + regex_expr_header = re.compile(r'^APEL(?:-[a-z]+)+-message: v[0-9].[0-9]$') + return regex_expr_header.match(header) + + +def _first_time_executing_code(previous_header): + if previous_header == None: + return True + else: + return False + + +def _add_to_queue(msg, queue_combined_msgs, originally_a_string): + if originally_a_string: + queue_combined_msgs.add(msg) + else: + msg_in_bytes = str.encode(msg) + queue_combined_msgs.add(msg_in_bytes) + return + + + +def _create_new_queue(new_path, path_type): + + if path_type == 'dirq': + newq = QueueSimple(new_path) + + elif path_type == 'directory': + if not os.path.exists(new_path): + os.makedirs(new_path) + newq = MessageDirectory(new_path) + + else: + raise ValueError('Unsupported path_type variable.') + + return newq + + +def _determine_what_to_iterate(outq, path_type): + + if path_type == 'dirq': + structure_to_iterate = outq + elif path_type == 'directory': + structure_to_iterate = outq._get_messages() + else: + raise ValueError('Unsupported path_type variable.') + + return structure_to_iterate + + + + + +def create_queue_combined_msgs(cp, log): + + previous_header = None + n_msg_combined = 0 + n_max_msg_combined = 500 + qpath = _get_path_to_outq(cp) + combined_queue_path = os.path.join(qpath, 'combined_queue') + path_type = _get_path_type(cp, log) + outq = _get_queue(qpath, path_type) + structure_to_iterate = _determine_what_to_iterate(outq, path_type) + queue_combined_msgs = _create_new_queue(combined_queue_path, path_type) + + for msgid in structure_to_iterate: + if not outq.lock(msgid): + log.warning('Message was locked. %s will not be read.', msgid) + continue + + text = outq.get(msgid) + originally_a_string = True + try: + text = text.decode() + originally_a_string = False + except (UnicodeDecodeError, AttributeError): + pass + + splitted_content = text.split('\n') + header = splitted_content[0] + contents_minus_header = splitted_content[1:] + + if _header_matches_regex(header): + if header == previous_header and n_msg_combined < n_max_msg_combined: + combined_msgs = combined_msgs + '\n' + '\n'.join(contents_minus_header) + n_msg_combined += 1 + else: + if not _first_time_executing_code(previous_header): + _add_to_queue(combined_msgs, queue_combined_msgs, originally_a_string) + + combined_msgs = text + previous_header = header + n_msg_combined = 1 + + outq.remove(msgid) + + + _add_to_queue(combined_msgs, queue_combined_msgs, originally_a_string) + + try: + outq.purge() + except OSError as e: + log.warning('OSError raised while purging message queue: %s', e) + + return combined_queue_path + diff --git a/bin/sender.py b/bin/sender.py index f6d08e98..e883f359 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -30,6 +30,8 @@ except ImportError: import configparser as ConfigParser +from bin.preprocessor import create_queue_combined_msgs + def main(): """Set up connection, send all messages and quit.""" @@ -72,6 +74,12 @@ def main(): brokers, project, token = ssm.agents.get_ssm_args(protocol, cp, log) + # Creating queue of combined messages + combined_queue_path = create_queue_combined_msgs(cp, log) + + # Updating path in cp so that it points to the 'combined_msgs' queue + cp.set('messaging', 'path', combined_queue_path) + ssm.agents.run_sender(protocol, brokers, project, token, cp, log) From 66c0a5f0fb81c71c02cfa36c7d142d54a75adab9 Mon Sep 17 00:00:00 2001 From: Letizia Date: Fri, 19 Aug 2022 09:45:17 +0100 Subject: [PATCH 2/2] Add conversion of messages from queue to string Messages read from a queue (queuesimple) are generally in bytes, even when added as strings to the queue. For sender and receiver to work, the messages need to be converted to strings after they are read from the queue. --- ssm/ssm2.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ssm/ssm2.py b/ssm/ssm2.py index 72099ad8..fd337be7 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -285,6 +285,10 @@ def _handle_msg(self, text): - verify signature - Return plain-text message, signer's DN and an error/None. """ + + if isinstance(text, bytes): + text = text.decode() + if text is None or text == '': warning = 'Empty text passed to _handle_msg.' log.warning(warning) @@ -380,6 +384,10 @@ def _send_msg_ams(self, text, msgid): encrypted. """ log.info('Sending message: %s', msgid) + + if isinstance(text, bytes): + text = text.decode() + if text is not None: # First we sign the message to_send = crypto.sign(text, self._cert, self._key)