#!/usr/bin/python3 # -*- coding: utf-8 -*- # # forked from: https://github.com/gaelL/openstack-log-colorizer # # Author: Gaƫl Lambert (gaelL) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Filter and color Openstack log files from stdin -- It support log files: - openstack services with default log format (work also using journalctl, stern) * journalctl: journalctl --no-hostname -o short-iso -u 'devstack@*' - openvswitch - rabbitmq - apache """ import sys import re import argparse LOG_LEVEL = { 'critical': 50, 'emer': 50, 'error': 40, 'err': 40, 'warning': 30, 'warn': 30, 'wrn': 30, 'info': 20, 'inf': 20, 'debug': 10, 'dbg': 10, 'notset': 100, } PARSER = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) PARSER.add_argument("-l", "--level", help="Set log levels you want display", metavar='level', choices=LOG_LEVEL.keys(), type=str) PARSER.add_argument("-e", "--exclude", help="Set log levels you want exclude", metavar='level', choices=LOG_LEVEL.keys(), type=str, nargs='+') PARSER.add_argument("-i", "--include", help="Set log levels you only want display", metavar='level', choices=LOG_LEVEL.keys(), type=str, nargs='+') ARGS = PARSER.parse_args() class colors: grey = '\033[1;30m' red = '\033[1;31m' green = '\033[1;32m' yellow = '\033[1;33m' blue = '\033[1;34m' magenta = '\033[1;35m' cyan = '\033[1;36m' white = '\033[0;37m' end = '\033[1;m' def grey(text): return '%s%s%s' % (colors.grey, text, colors.end) def red(text): return '%s%s%s' % (colors.red, text, colors.end) def green(text): return '%s%s%s' % (colors.green, text, colors.end) def yellow(text): return '%s%s%s' % (colors.yellow, text, colors.end) def blue(text): return '%s%s%s' % (colors.blue, text, colors.end) def magenta(text): return '%s%s%s' % (colors.magenta, text, colors.end) def cyan(text): return '%s%s%s' % (colors.cyan, text, colors.end) def white(text): return '%s%s%s' % (colors.white, text, colors.end) def colored_level(level): "Return level text with selected color tag" level = level.upper() if level in ['TRACE', 'SEC']: return magenta(level) elif level in ['DEBUG', 'DBG', 'INFO', 'INF']: return green(level) elif level in ['WARNING', 'WRN', 'WARN', '???']: return yellow(level) elif level in ['CRITICAL', 'ERROR', 'ERR', 'EMER']: return red(level) else: return white(level) def parse_line(line): "Parse line and return dict of each elements" # Line example : Openstack # 2014-08-14 18:43:58.950 4092 INFO neutron.plugins.openvswitch.agent.ovs_neutron_agent [-] regex = ( r'(.*)' r'([0-9]{4}-[0-9]+-[0-9]+) ' # date r'([0-9]+:[0-9]+:[0-9]+\.[0-9]+) ' # time r'([0-9]+):? ' # process r'([A-Z]+) ' # level r'([^ ]+)* ' # name r"(\[(?!'Traceback)[^\]]+\])*\s*" # context r'(.+)*' # text ) result = re.match(regex, line) if result is not None: return { 'header': "" if not result.group(1) else result.group(1), 'date': result.group(2), 'time': result.group(3), 'process': result.group(4), 'level': result.group(5), 'name': "" if not result.group(6) else result.group(6), 'req': "" if not result.group(7) else result.group(7), 'text': "" if not result.group(8) else result.group(8), '_type': 'openstack', } # journalctl --no-hostname -o short-iso -f -u 'devstack@*' # 2024-03-20T13:27:44+0000 devstack@keystone.service[74735]: INFO neutron.plugins.openvswitch.agent.ovs_neutron_agent [-] regex = ( r'(.*)' r'([0-9]{4}-[0-9]+-[0-9]+)T' # date r'([0-9]+:[0-9]+:[0-9]+\+[0-9]+) ' # time r'([^\[]+)\[([0-9]+)\]: ' # unit process r'([A-Z]+) ' # level r'([^ ]+)* ' # name r"(\[(?!'Traceback)[^\]]+\])*\s*" # context r'(.+)*' # text ) result = re.match(regex, line) if result is not None: h = "" if result.group(1): h = result.group(1) # it's unit name elif result.group(4): h = f"{result.group(4)} " return { 'header': h, 'date': result.group(2), 'time': result.group(3), 'process': result.group(5), 'level': result.group(6), 'name': "" if not result.group(7) else result.group(7), 'req': "" if not result.group(8) else result.group(8), 'text': "" if not result.group(9) else result.group(9), '_type': 'openstack', } regex = ( r'(.*)' r"(\['Traceback.*)" ) result = re.match(regex, line) if result is not None: return { 'header': "" if not result.group(1) else result.group(1), 'date': "", 'time': "", 'process': "", 'level': "TRACE", 'name': "" , 'req': "", 'text': "" if not result.group(2) else result.group(2), '_type': 'openstack', } regex = ( r'(.*)' r'([0-9]{4}-[0-9]+-[0-9]+) ' # date r'([0-9]+:[0-9]+:[0-9]+) ' # time r'(.+)' ) result = re.match(regex, line) if result is not None: key_val = {} text = result.group(4) keys = re.findall(r"[^ ]+:", text) for i in range(0, len(keys)): if i == len(keys) - 1: next_key = "" else: next_key = keys[i+1] match = re.match(rf".*{keys[i]} (.*){next_key}.*", text) if match: key_val[keys[i]] = match.group(1) return { 'header': "" if not result.group(1) else result.group(1), 'date': result.group(2), 'time': result.group(3), 'key_val': key_val, '_type': 'apache', } # rabbit # 2023-07-30 12:39:41.230254+00:00 [error] regex = ( r'(.*)' r'([0-9]{4}-[0-9]+-[0-9]+) ' # date r'([0-9]+:[0-9]+:[0-9]+\.[0-9]+[^ ]+) ' # time r'\[([a-z\?]+)\] ' # level r'(<[^>]+>) ' # <0.9417.870> r'(.+)*' ) result = re.match(regex, line) if result is not None: return { 'header': "" if not result.group(1) else result.group(1), 'date': result.group(2), 'time': result.group(3), 'level': result.group(4), 'pid': result.group(5), 'text': result.group(6), '_type': 'rabbit', } # Line example : ovs.log # 2014-11-21T06:25:09.549Z|00012|vlog|INFO|opened log file /var/log/op... regex = ( r'(.*)' r'([0-9]{4}-[0-9]+-[0-9]+)T' # date r'([0-9]+:[0-9]+:[0-9]+\.[0-9]+)Z\|' # time r'([0-9]+)\|' # serial r'([^\|]+)\|' # name r'([^\|]+)\|' # level r'(.+)' ) result = re.match(regex, line) if result is not None: return { 'header': "" if not result.group(1) else result.group(1), 'date': result.group(2), 'time': result.group(3), 'serial': result.group(4), 'name': result.group(5), 'level': result.group(6), 'text': result.group(7), '_type': 'ovs', } regex = ( r'([^ ]+ [^ ]+)' r'(.+)*' ) result = re.match(regex, line) if result is not None: return { 'header': '' if not result.group(1) else result.group(1), 'text': '' if not result.group(2) else result.group(2), '_type': None, } return {'header': '', 'text': line, '_type': None} def colorize(line): "Apply color tag on line" if line.get('_type') == 'openstack': if line.get('level') in ['TRACE', 'ERROR']: if "['Traceback" in line.get('text'): line['text'] = line['text'].replace('\\n', '\n') if line.get('level') == 'TRACE': colored_text = grey(line['text']) else: colored_text = white(line['text']) return '%s%s %s %s %s %s %s %s' % ( magenta(line['header']), grey(line['date']), grey(line['time']), grey(line['process']), colored_level(line['level']), blue(line['name']), grey(line['req']), colored_text ) elif line.get('_type') == 'ovs': return '%s%s %s %s %s %s %s' % ( magenta(line['header']), grey(line['date']), grey(line['time']), grey(line['serial']), blue(line['name']), colored_level(line['level']), white(line['text']) ) elif line.get('_type') == 'rabbit': return '%s%s %s %s %s %s' % ( magenta(line['header']), grey(line['date']), grey(line['time']), colored_level(line['level']), blue(line['pid']), white(line['text']) ) elif line.get('_type') == 'apache': formatted = '%s%s %s' % ( magenta(line['header']), grey(line['date']), grey(line['time']), ) for k, v in line['key_val'].items(): val = white(v) try: if k == "status:" and int(v) >= 500: val = red(v) elif k == "status:" and int(v) >= 400: val = yellow(v) elif k == "time:" and float(v) >= 1000: val = red(v) except Exception: pass formatted = f"{formatted} {blue(k)} {val}" return formatted else: return '%s %s' % (magenta(line['header']), line.get('text')) def check_args(): # Just allow one arg num_args = sum(1 for i in [ARGS.level, ARGS.exclude, ARGS.include] if i) if num_args > 1: print('Args conflicts select just one arg') PARSER.print_help() return False return True def level_filter(line): "Return true if line must be filtered. Never filter line without level" level = ARGS.level.lower() line_level = line.get('level', 'notset').lower() if LOG_LEVEL.get(line_level, 100) < LOG_LEVEL.get(level, 0): return True else: return False def include_filter(line): "Return true if line must be filtered. Never filter line without level" includes = [i.lower() for i in ARGS.include] line_level = line.get('level', 'notset').lower() if line_level == 'notset': return False elif line_level in includes: return False return True def exclude_filter(line): "Return true if line must be filtered. Never filter line without level" excludes = [e.lower() for e in ARGS.exclude] line_level = line.get('level', 'notset').lower() if line_level in excludes: return True return False def line_is_filtered(line): "Skip the line ?" if ARGS.level: return level_filter(line) elif ARGS.include: return include_filter(line) elif ARGS.exclude: return exclude_filter(line) return False if __name__ == '__main__': # Read each line in stdin if not check_args(): sys.exit(1) while 1: try: line = sys.stdin.readline() except KeyboardInterrupt: break if not line: break try: # get parsed line parsed_line = parse_line(line.rstrip('\n')) except Exception: parsed_line = {'header': '', 'text': line.rstrip('\n'), '_type': None} # Skip line if filtred (never skip line without log level) if line_is_filtered(parsed_line): continue # Print parsed and colored line print(colorize(parsed_line))