defmakekey(line:str): start = 0 skip = False for i,c inenumerate(line): ifnot skip and c in'"[': start = i + 1 skip = True elif skip and c in'"]': skip = False yield line[start:i] start = i + 1 continue if skip: continue if c in CHARS: if start == i: start = i + 1 continue yield line[start:i] start = i + 1 else: if start < len(line): yield line[start:] names = ('remote','','','datetime','request','status','length','','useragent')
# 137课开始讲日志分析的项目 import datetime import re from queue import Queue import threading from pathlib import Path from collections import defaultdict from user_agents import parse
# d = {} #for k,v in extract(logline).items(): # k是每个fields的命名 # d[k] = ops.get(k, lambda x:x)(v) # 这里如果可以查询到k的值,就用k值对应的函数处理v;如果没找到k值,就用lambda函数处理v,lambda函 # 数在这里的意思是给什么就返回什么,也就是直接返回v。lambda的地方不能写成v,因为还要将这个默认值当 # 函数去处理v,所以如果写成v,就会变成v(v),这会报错。 # d = {k:ops.get(k,lambda x:x)(v) for k,v in extract(logline).items()} # 上面的三行代码转换为这一行。让上面的extract函数直接返回这个字典解析式更好些
defopenfile(path:str): withopen(str(p)) as f: # open()方法里需要字符串,要用str()函数转一下 for line in f: d = extract(line) if d: yield d else: # TODO 不合格数据有多少 continue
# 下面的调用方法 defload(*path): # 读一批路径。这是一个生成器函数,yield出来的是字典 """文件装载""" for file in path: p = Path(file) ifnot p.exists(): continue if p.is_dir(): # 这里不递归处理子目录,也可以使用**/*log的方法 for x in p.iterdir(): if x.is_file(): # with open(str(p)) as f: # open()方法里需要字符串,要用str()函数转一下 # for line in f: # d = extract(line) # if d: # yield d # else: # # TODO 不合格数据有多少 # continue # gen = openfile(str(x)) # for y in openfile(str(x)): # yield y # 这里不能使用return y,这样会有一个就扔出去了,我们需要的是攒起来一起处理,攒多少个是window函数的事 # openfile(str(x))不是普通的函数调用,它返回的是一个生成器对象,我们需要使用for循环或next拿,那么需 # 要迭代多少个合适呢,所以用yield,这样load就变成一个生成器函数了。有yield的就是生成器函数,返回的是生 # 成器对象。yield from后面是一个可迭代对象就行 yieldfrom openfile(str(x)) # 将上面两行改为一行 elif p.is_file(): # with open(str(p)) as f: # open()方法里需要字符串,要用str()函数转一下 # for line in f: # d = extract(line) # if d: # yield d # else: # # TODO 不合格数据有多少 # continue yieldfrom openfile(str(x)) defwindow(src:Queue, handler, width:int, interval:int): # window函数只做移动窗口拿数据的工作,处理在handler函数中做 # 函数的功能越单一越好。这里的src就是load函数生成的数据,src生成的是字典。load是生成器函数 # current是现在的时间,start是起始时间,width是宽度,表示一次求值的时间窗口宽度,窗口就是下面的 # buffer中的数据,主要是配合Interval计算delta,interval是时间间隔,表示间隔多入处理一次数据 # i = 0 # for x in src # i += 1 # print(x) # if i > 10: # break start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800', '%Y/%m/%d %H:%M:%S %z') current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800', '%Y/%m/%d %H:%M:%S %z') delta = datetime.timedelta(seconds=width - interval) buffer = [] # for x in src: whileTrue: data = src.get() # 这里传进来的src是一个Queue,src.get()就是从Queue队列中拿数据的方法。如果没有数据就会阻塞等待,等下 # 一个数据过来。所以上面也不能再用for循环了,而使用while True循环。这里拿到的是load生成器函数处理后 # yield出来的字典。 if data: buffer.append(data) # 直接把字典放入buffer中,具体的处理工作由handler执行。不要把加入buffer中的值写死,那样的适用性会很差 current = data['datetime'] # 当前时间应该是日志中的时间 if (current - start).total_seconds() >= interval: ret = handler(buffer) print(ret) start = current # 调整开始时间 # buffer 的处理 # tmp = [] # for i in buffer: # if i['datetime'] > current - delta: # 用拿到的时间与current减delta的时间比较,这就是有数据重叠的部分,现在的时间减去delta后的位置就成了处理过的数据要保留部分的起始位置,大于这个位置的数据都要保留,其他的就不要了。可以比对时间窗口的图查看 # tmp.append(i) buffer = [i for i in buffer if i['datetime'] > current - delta] # 留下的数据形成了一个窗口,这是一个新建的buffer,和上面的buffer没关系