滑动窗口
数据载入 对于本项目来说,数据就是日志的一行行记录,载入数据就是文件 IO的读取。将获取数据的方法封装成函数。
1 2 3 4 5 6 7 8 9 def load (path ): """装载日志文件""" with open (path) as f: for line in f: fields = extract(line) if fields: yield fields else : continue
时间窗口分析 概念 很多数据,例如日志,都是和时间相关的,都是按照时间顺序产生的。
产生的数据分析的时候,要按照时间求值
interval 表示每一次求值的时间间隔
width 时间窗口宽度,指的一次求值的时间窗口宽度
当width > interval
数据求值时会有重叠。当走一个interval间隔后,用当前位置减去width宽度秒就是要采集的数据的起始位置。c2=c1-delta,delta=width-interval。当delta=0时,width=interval
当width = interval
数据求值没有重叠
当width < interval 一般不采纳这种方案,会有数据缺失
时序数据 运维环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间先后产生并记录下来的数据,所以一般按照时间对数据进行分析。
数据分析基本程序结构 无限的生成随机数函数,产生时间相关的数据,返回时间和随机数的字典
每次取3个数据,求平均值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import randomimport datetimeimport timedef source (): while True : yield {'value' :random.randint(1 ,100 ),'datetime' :datetime.datetime.now()} time.sleep(1 ) s = source() items = [next (s) for _ in range (3 )] def handler (iterable ): return sum (map (lambda item:item['value' ],iterable)) / len (iterable) print (items)print ("{:.2f}" .format (handler(items)))
上面代码模拟了,一段时间内产生了数据,等了一段固定的时间取数据来计算平均值。
窗口函数实现 将上面的获取数据的程序扩展为window函数。使用重叠的方案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import randomimport datetimeimport timedef source (second=1 ): """生成数据""" while True : yield { 'datetime' :datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8 ))), 'value' :random.randint(1 ,100 ) } time.sleep(second) def window (iterator, handler, width:int , interval:int ): """ 窗口函数 :param iterator:数据源,生成器,用来拿数据 :param handler:数据处理函数 :param width:时间窗口宽度,秒 :param interval:处理时间间隔,秒 """ start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z' ) current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z' ) buffer = [] delta = datetime.timedelta(seconds=width-interval) while True : data = next (iterator) if data: buffer.append(data) current = data['datetime' ] if (current - start).total_seconds() >= interval: ret = handler(buffer) print ('{:.2f}' .format (ret)) start = current buffer = [x for x in buffer if x['datetime' ] > current - delta] def handler (iterable ): return sum (map (lambda x:x['value' ],iterable)) / len (iterable) window(source(), handler, 10 , 5 )
时间的计算
实例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import randomimport datetime def source (): while True : yield datetime.datetime.now(),random.randint(1 ,100 ) i = 0 for x in source(): print (x) i += 1 if i>10 : break src = source() i = 0 lst = [next (src) for _ in range (3 )] def handler (iterable ): return sum (iterable) // len (iterable) ======================================================================================= import randomimport datetime def source (): while True : yield datetime.datetime.now(),random.randint(1 ,100 ) src = source() i = 0 lst = [next (src) for _ in range (3 )] def window (src, handler, width:int , interval:int ): i = 0 for x in src i += 1 print (x) if i > 10 : break window(load('test.log' ),None ,0.0 )