1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
| import random import datetime import time from queue import Queue import threading import re from pathlib import Path
PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s\ "(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\ (?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)''' regex = re.compile(PATTERN)
OPS = { 'datetime':lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'), 'status':int, 'size':int }
def extract(line:str) -> dict: matcher = regex.match(line) if matcher: return {name:ops.get(name, lambda x:x)(data) for name, data in matcher.groupdict()items()}
def openfile(path:str): with open(path) as f: for line in f: fields = extract(line) if fields: yield fields else: continue def load(*paths): for item in paths: p = Path(item) if not p.exists(): continue if p.is_dir(): for file in p.iterdir(): if file.is_file(): yield from openfile(str(file)) elif p.is_file(): yield from openfile(str(p))
def source(second=1): """生成数据""" while True: yield { 'datetime':datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))), 'value':random.randint(1,100) } time.sleep(second)
def window(src:Queue, handler, width:int, interval:int): """ 窗口函数 :param src: 数据源,缓存队列,用来拿数据 :param handler: 数据处理函数 :param width: 时间窗口宽度,秒 :param interval: 处理时间间隔,秒 """ start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z') current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z') buffer = [] delta = datetime.timedelta(seconds=width-interval) while True: data = src.get() if data: buffer.append(data) current = data['datetime'] if (current - start).total_seconds() >= interval: ret = handler(buffer) print('{}'.format(ret)) start = current buffer = [x for x in buffer if x['datetime'] > current - delta]
def handler(iterable): return sum(map(lambda x:x['value'], iterable)) / len(iterable)
def donothing_handler(iterable): return iterable
def status_handler(iterable): status = {} for item in iterable: key = item['status'] status[key] = status.get(key, 0) + 1 total = len(iterable) return {k:status[k]/total for k,v in status.items()}
def dispatcher(src): handlers = [] queues = [] def reg(handler, width:int, interval:int): """ 注册 窗口处理函数 :param handler: 注册的数据处理函数 :param width: 时间窗口宽度 :param interval: 时间间隔 """ q = Queue() queues.append(q) h = threading.Thread(target=window, args=(q, handler, width, interval)) handlers.append(h) def run(): for t in handlers: t.start() for item in src: for q in queues: q.put(item) return reg, run
if __name__ == "__main__": import sys path = 'test.log' reg, run = dispatcher(load(path)) reg(status_handler, 10, 5) run()
|