diff --git a/.gitignore b/.gitignore index 0d20b648..b664ab4e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +venv/ \ No newline at end of file diff --git a/bin/preprocessor.py b/bin/preprocessor.py new file mode 100644 index 00000000..cd7d17f8 --- /dev/null +++ b/bin/preprocessor.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python + +# Copyright (C) 2012 STFC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Script to run a sending SSM.""" + +from __future__ import print_function + +import ssm.agents +from ssm import __version__, LOG_BREAK +import os +import logging +from optparse import OptionParser +from dirq.QueueSimple import QueueSimple +from ssm.message_directory import MessageDirectory +import re + +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser + + +def _get_path_to_outq(cp): + try: + qpath = cp.get('messaging', 'path') + print(qpath) + except: + raise ValueError('Cannot retrieve path to outq.') + return qpath + + +def _get_path_type(cp, log): + try: + path_type = cp.get('messaging', 'path_type') + except ConfigParser.NoOptionError: + log.info('No path type defined, assuming dirq.') + print('No path type defined') + path_type = 'dirq' + return path_type + + +def _get_queue(qpath, path_type): + + for dirpath, dirnames, files in os.walk(qpath): + dirs_at_path = dirnames + files_at_path = files + path_examined = qpath + break + + if path_type == 'dirq': + if QueueSimple is None: + raise ImportError('Dirq path_type requested but' + 'dirq module not found.') + + if (len(dirs_at_path) == 0 or + (len(dirs_at_path) == 1 and dirs_at_path[0] == 'combined_queue')): + raise ValueError("Provided path_type was dirq but no " + "directory found at path. Should " + "path_type be 'directory'?") + + outq = QueueSimple(qpath) + + elif path_type == 'directory': + if len(dirs_at_path) > 0: + raise ValueError("Provided path_type was directory but an " + "unexpected directory is present at path, " + "as well as files. Should path_type be 'dirq'?") + + outq = MessageDirectory(qpath) + + else: + raise ValueError('Unsupported path_type variable.') + + return outq + + +def _header_matches_regex(header): + regex_expr_header = re.compile(r'^APEL(?:-[a-z]+)+-message: v[0-9].[0-9]$') + return regex_expr_header.match(header) + + +def _first_time_executing_code(previous_header): + if previous_header == None: + return True + else: + return False + + +def _add_to_queue(msg, queue_combined_msgs, originally_a_string): + if originally_a_string: + queue_combined_msgs.add(msg) + else: + msg_in_bytes = str.encode(msg) + queue_combined_msgs.add(msg_in_bytes) + return + + + +def _create_new_queue(new_path, path_type): + + if path_type == 'dirq': + newq = QueueSimple(new_path) + + elif path_type == 'directory': + if not os.path.exists(new_path): + os.makedirs(new_path) + newq = MessageDirectory(new_path) + + else: + raise ValueError('Unsupported path_type variable.') + + return newq + + +def _determine_what_to_iterate(outq, path_type): + + if path_type == 'dirq': + structure_to_iterate = outq + elif path_type == 'directory': + structure_to_iterate = outq._get_messages() + else: + raise ValueError('Unsupported path_type variable.') + + return structure_to_iterate + + + + + +def create_queue_combined_msgs(cp, log): + + previous_header = None + n_msg_combined = 0 + n_max_msg_combined = 500 + qpath = _get_path_to_outq(cp) + combined_queue_path = os.path.join(qpath, 'combined_queue') + path_type = _get_path_type(cp, log) + outq = _get_queue(qpath, path_type) + structure_to_iterate = _determine_what_to_iterate(outq, path_type) + queue_combined_msgs = _create_new_queue(combined_queue_path, path_type) + + for msgid in structure_to_iterate: + if not outq.lock(msgid): + log.warning('Message was locked. %s will not be read.', msgid) + continue + + text = outq.get(msgid) + originally_a_string = True + try: + text = text.decode() + originally_a_string = False + except (UnicodeDecodeError, AttributeError): + pass + + splitted_content = text.split('\n') + header = splitted_content[0] + contents_minus_header = splitted_content[1:] + + if _header_matches_regex(header): + if header == previous_header and n_msg_combined < n_max_msg_combined: + combined_msgs = combined_msgs + '\n' + '\n'.join(contents_minus_header) + n_msg_combined += 1 + else: + if not _first_time_executing_code(previous_header): + _add_to_queue(combined_msgs, queue_combined_msgs, originally_a_string) + + combined_msgs = text + previous_header = header + n_msg_combined = 1 + + outq.remove(msgid) + + + _add_to_queue(combined_msgs, queue_combined_msgs, originally_a_string) + + try: + outq.purge() + except OSError as e: + log.warning('OSError raised while purging message queue: %s', e) + + return combined_queue_path + diff --git a/bin/sender.py b/bin/sender.py index f6d08e98..e883f359 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -30,6 +30,8 @@ except ImportError: import configparser as ConfigParser +from bin.preprocessor import create_queue_combined_msgs + def main(): """Set up connection, send all messages and quit.""" @@ -72,6 +74,12 @@ def main(): brokers, project, token = ssm.agents.get_ssm_args(protocol, cp, log) + # Creating queue of combined messages + combined_queue_path = create_queue_combined_msgs(cp, log) + + # Updating path in cp so that it points to the 'combined_msgs' queue + cp.set('messaging', 'path', combined_queue_path) + ssm.agents.run_sender(protocol, brokers, project, token, cp, log) diff --git a/ssm/ssm2.py b/ssm/ssm2.py index 72099ad8..fd337be7 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -285,6 +285,10 @@ def _handle_msg(self, text): - verify signature - Return plain-text message, signer's DN and an error/None. """ + + if isinstance(text, bytes): + text = text.decode() + if text is None or text == '': warning = 'Empty text passed to _handle_msg.' log.warning(warning) @@ -380,6 +384,10 @@ def _send_msg_ams(self, text, msgid): encrypted. """ log.info('Sending message: %s', msgid) + + if isinstance(text, bytes): + text = text.decode() + if text is not None: # First we sign the message to_send = crypto.sign(text, self._cert, self._key)