dotfiles/bin/os-log-color

460 lines
13 KiB
Python
Executable File

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# forked from: https://github.com/gaelL/openstack-log-colorizer
#
# Author: Gaël Lambert (gaelL) <gael.lambert@netwiki.fr>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Filter and color Openstack log files from stdin
--
It support log files:
- openstack services with default log format (work also using journalctl, stern)
* journalctl: journalctl --no-hostname -o short-iso -u 'devstack@*'
- openvswitch
- rabbitmq
- apache
"""
import sys
import re
import argparse
LOG_LEVEL = {
'critical': 50,
'emer': 50,
'error': 40,
'err': 40,
'warning': 30,
'warn': 30,
'wrn': 30,
'info': 20,
'inf': 20,
'debug': 10,
'dbg': 10,
'notset': 100,
}
PARSER = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
PARSER.add_argument("-l", "--level",
help="Set log levels you want display",
metavar='level',
choices=LOG_LEVEL.keys(),
type=str)
PARSER.add_argument("-e", "--exclude",
help="Set log levels you want exclude",
metavar='level',
choices=LOG_LEVEL.keys(),
type=str,
nargs='+')
PARSER.add_argument("-i", "--include",
help="Set log levels you only want display",
metavar='level',
choices=LOG_LEVEL.keys(),
type=str,
nargs='+')
ARGS = PARSER.parse_args()
class colors:
grey = '\033[1;30m'
red = '\033[1;31m'
green = '\033[1;32m'
yellow = '\033[1;33m'
blue = '\033[1;34m'
magenta = '\033[1;35m'
cyan = '\033[1;36m'
white = '\033[0;37m'
end = '\033[1;m'
def grey(text):
return '%s%s%s' % (colors.grey, text, colors.end)
def red(text):
return '%s%s%s' % (colors.red, text, colors.end)
def green(text):
return '%s%s%s' % (colors.green, text, colors.end)
def yellow(text):
return '%s%s%s' % (colors.yellow, text, colors.end)
def blue(text):
return '%s%s%s' % (colors.blue, text, colors.end)
def magenta(text):
return '%s%s%s' % (colors.magenta, text, colors.end)
def cyan(text):
return '%s%s%s' % (colors.cyan, text, colors.end)
def white(text):
return '%s%s%s' % (colors.white, text, colors.end)
def colored_level(level):
"Return level text with selected color tag"
level = level.upper()
if level in ['TRACE', 'SEC']:
return magenta(level)
elif level in ['DEBUG', 'DBG', 'INFO', 'INF']:
return green(level)
elif level in ['WARNING', 'WRN', 'WARN', '???']:
return yellow(level)
elif level in ['CRITICAL', 'ERROR', 'ERR', 'EMER']:
return red(level)
else:
return white(level)
def parse_line(line):
"Parse line and return dict of each elements"
# Line example : Openstack
# 2014-08-14 18:43:58.950 4092 INFO neutron.plugins.openvswitch.agent.ovs_neutron_agent [-]
regex = (
r'(.*)'
r'([0-9]{4}-[0-9]+-[0-9]+) ' # date
r'([0-9]+:[0-9]+:[0-9]+\.[0-9]+) ' # time
r'([0-9]+):? ' # process
r'([A-Z]+) ' # level
r'([^ ]+)* ' # name
r"(\[(?!'Traceback)[^\]]+\])*\s*" # context
r'(.+)*' # text
)
result = re.match(regex, line)
if result is not None:
return {
'header': "" if not result.group(1) else result.group(1),
'date': result.group(2),
'time': result.group(3),
'process': result.group(4),
'level': result.group(5),
'name': "" if not result.group(6) else result.group(6),
'req': "" if not result.group(7) else result.group(7),
'text': "" if not result.group(8) else result.group(8),
'_type': 'openstack',
}
# journalctl --no-hostname -o short-iso -f -u 'devstack@*'
# 2024-03-20T13:27:44+0000 devstack@keystone.service[74735]: INFO neutron.plugins.openvswitch.agent.ovs_neutron_agent [-]
regex = (
r'(.*)'
r'([0-9]{4}-[0-9]+-[0-9]+)T' # date
r'([0-9]+:[0-9]+:[0-9]+\+[0-9]+) ' # time
r'([^\[]+)\[([0-9]+)\]: ' # unit process
r'([A-Z]+) ' # level
r'([^ ]+)* ' # name
r"(\[(?!'Traceback)[^\]]+\])*\s*" # context
r'(.+)*' # text
)
result = re.match(regex, line)
if result is not None:
h = ""
if result.group(1):
h = result.group(1)
# it's unit name
elif result.group(4):
h = f"{result.group(4)} "
return {
'header': h,
'date': result.group(2),
'time': result.group(3),
'process': result.group(5),
'level': result.group(6),
'name': "" if not result.group(7) else result.group(7),
'req': "" if not result.group(8) else result.group(8),
'text': "" if not result.group(9) else result.group(9),
'_type': 'openstack',
}
regex = (
r'(.*)'
r"(\['Traceback.*)"
)
result = re.match(regex, line)
if result is not None:
return {
'header': "" if not result.group(1) else result.group(1),
'date': "",
'time': "",
'process': "",
'level': "TRACE",
'name': "" ,
'req': "",
'text': "" if not result.group(2) else result.group(2),
'_type': 'openstack',
}
regex = (
r'(.*)'
r'([0-9]{4}-[0-9]+-[0-9]+) ' # date
r'([0-9]+:[0-9]+:[0-9]+) ' # time
r'(.+)'
)
result = re.match(regex, line)
if result is not None:
key_val = {}
text = result.group(4)
keys = re.findall(r"[^ ]+:", text)
for i in range(0, len(keys)):
if i == len(keys) - 1:
next_key = ""
else:
next_key = keys[i+1]
match = re.match(rf".*{keys[i]} (.*){next_key}.*", text)
if match:
key_val[keys[i]] = match.group(1)
return {
'header': "" if not result.group(1) else result.group(1),
'date': result.group(2),
'time': result.group(3),
'key_val': key_val,
'_type': 'apache',
}
# rabbit
# 2023-07-30 12:39:41.230254+00:00 [error]
regex = (
r'(.*)'
r'([0-9]{4}-[0-9]+-[0-9]+) ' # date
r'([0-9]+:[0-9]+:[0-9]+\.[0-9]+[^ ]+) ' # time
r'\[([a-z\?]+)\] ' # level
r'(<[^>]+>) ' # <0.9417.870>
r'(.+)*'
)
result = re.match(regex, line)
if result is not None:
return {
'header': "" if not result.group(1) else result.group(1),
'date': result.group(2),
'time': result.group(3),
'level': result.group(4),
'pid': result.group(5),
'text': result.group(6),
'_type': 'rabbit',
}
# Line example : ovs.log
# 2014-11-21T06:25:09.549Z|00012|vlog|INFO|opened log file /var/log/op...
regex = (
r'(.*)'
r'([0-9]{4}-[0-9]+-[0-9]+)T' # date
r'([0-9]+:[0-9]+:[0-9]+\.[0-9]+)Z\|' # time
r'([0-9]+)\|' # serial
r'([^\|]+)\|' # name
r'([^\|]+)\|' # level
r'(.+)'
)
result = re.match(regex, line)
if result is not None:
return {
'header': "" if not result.group(1) else result.group(1),
'date': result.group(2),
'time': result.group(3),
'serial': result.group(4),
'name': result.group(5),
'level': result.group(6),
'text': result.group(7),
'_type': 'ovs',
}
regex = (
r'([^ ]+ [^ ]+)'
r'(.+)*'
)
result = re.match(regex, line)
if result is not None:
return {
'header': '' if not result.group(1) else result.group(1),
'text': '' if not result.group(2) else result.group(2),
'_type': None,
}
return {'header': '', 'text': line, '_type': None}
def colorize(line):
"Apply color tag on line"
if line.get('_type') == 'openstack':
if line.get('level') in ['TRACE', 'ERROR']:
if "['Traceback" in line.get('text'):
line['text'] = line['text'].replace('\\n', '\n')
if line.get('level') == 'TRACE':
colored_text = grey(line['text'])
else:
colored_text = white(line['text'])
return '%s%s %s %s %s %s %s %s' % (
magenta(line['header']),
grey(line['date']),
grey(line['time']),
grey(line['process']),
colored_level(line['level']),
blue(line['name']),
grey(line['req']),
colored_text
)
elif line.get('_type') == 'ovs':
return '%s%s %s %s %s %s %s' % (
magenta(line['header']),
grey(line['date']),
grey(line['time']),
grey(line['serial']),
blue(line['name']),
colored_level(line['level']),
white(line['text'])
)
elif line.get('_type') == 'rabbit':
return '%s%s %s %s %s %s' % (
magenta(line['header']),
grey(line['date']),
grey(line['time']),
colored_level(line['level']),
blue(line['pid']),
white(line['text'])
)
elif line.get('_type') == 'apache':
formatted = '%s%s %s' % (
magenta(line['header']),
grey(line['date']),
grey(line['time']),
)
for k, v in line['key_val'].items():
val = white(v)
try:
if k == "status:" and int(v) >= 500:
val = red(v)
elif k == "status:" and int(v) >= 400:
val = yellow(v)
elif k == "time:" and float(v) >= 1000:
val = red(v)
except Exception:
pass
formatted = f"{formatted} {blue(k)} {val}"
return formatted
else:
return '%s %s' % (magenta(line['header']), line.get('text'))
def check_args():
# Just allow one arg
num_args = sum(1 for i in [ARGS.level, ARGS.exclude, ARGS.include] if i)
if num_args > 1:
print('Args conflicts select just one arg')
PARSER.print_help()
return False
return True
def level_filter(line):
"Return true if line must be filtered. Never filter line without level"
level = ARGS.level.lower()
line_level = line.get('level', 'notset').lower()
if LOG_LEVEL.get(line_level, 100) < LOG_LEVEL.get(level, 0):
return True
else:
return False
def include_filter(line):
"Return true if line must be filtered. Never filter line without level"
includes = [i.lower() for i in ARGS.include]
line_level = line.get('level', 'notset').lower()
if line_level == 'notset':
return False
elif line_level in includes:
return False
return True
def exclude_filter(line):
"Return true if line must be filtered. Never filter line without level"
excludes = [e.lower() for e in ARGS.exclude]
line_level = line.get('level', 'notset').lower()
if line_level in excludes:
return True
return False
def line_is_filtered(line):
"Skip the line ?"
if ARGS.level:
return level_filter(line)
elif ARGS.include:
return include_filter(line)
elif ARGS.exclude:
return exclude_filter(line)
return False
if __name__ == '__main__':
# Read each line in stdin
if not check_args():
sys.exit(1)
while 1:
try:
line = sys.stdin.readline()
except KeyboardInterrupt:
break
if not line:
break
try:
# get parsed line
parsed_line = parse_line(line.rstrip('\n'))
except Exception:
parsed_line = {'header': '', 'text': line.rstrip('\n'), '_type': None}
# Skip line if filtred (never skip line without log level)
if line_is_filtered(parsed_line):
continue
# Print parsed and colored line
print(colorize(parsed_line))