拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 zabbix 线路质量监控自定义python模块(socket+deque版),集成ICMP/TCP/UDP探测,批量监控线路质量并自定义阈值联动mtr保存线路故障日志

zabbix 线路质量监控自定义python模块(socket+deque版),集成ICMP/TCP/UDP探测,批量监控线路质量并自定义阈值联动mtr保存线路故障日志

白鹭 - 2022-02-14 2169 0 0

互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测验就已经恢复正常,得不到例外时的mtr无法判断故障点在哪里

故此有了根据丢包率和时延变换联动mtr的需求

前段时间使用Mysql实作了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是资料可复用,做多种形式的展示

后续使用socket+deque实作低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史资料不留存,用完即丢

系统环境

  Ubuntu 18.04.5 LTS+Python 3.6.9 

python库

  自带基本库,考虑到系统权限问题没有使用第三方库

 

  1 #!/usr/bin/env python3
  2 #-*-coding:utf-8-*-
  3 from collections import deque
  4 import itertools,time
  5 import queue,json
  6 import argparse,sys,re,os,subprocess
  7 import time,socket,random,string
  8 import threading
  9 from functools import reduce
 10 import logging
 11 ipqli=[]
 12 filename = os.path.realpath(sys.argv[0])
 13 def logger():
 14     dir = os.path.dirname(os.path.realpath(sys.argv[0]))
 15     log_name = dir+'/log'
 16     logger = logging.getLogger()
 17     fh = logging.FileHandler(log_name)
 18     formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
 19     fh.setFormatter(formater)
 20     logger.setLevel(logging.DEBUG)
 21     logger.addHandler(fh)
 22     return logger
 23 #ping程序,避免系统权限问题未使用ping3
 24 class Ping:
 25     def __init__(self,ip,flag,inver=1,count=20,udp_length=64):
 26         ip = tuple(ip)
 27         self.sip,self.tip,self.type,self.port=ip
 28         self.type = self.type.lower()
 29         self.port = int(self.port)
 30         self.inver=inver
 31         self.count=count
 32         self.flag=flag
 33         self.udp_length=udp_length
 34         self.log = logger()
 35         restime_name = 'restime_deque'+''.join(ip).replace('.','')
 36         pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
 37         locals()[restime_name] = deque(maxlen=60)
 38         locals()[pkloss_name] = deque(maxlen=60)
 39         self.restime_deque = locals()[restime_name]
 40         self.pkloss_deque = locals()[pkloss_name]
 41         self.ret_restime_deque = globals()[restime_name]
 42         self.ret_pkloss_deque = globals()[pkloss_name]
 43         self.compile= r'(?<=time=)\d+\.?\d+(?= ms)'
 44     def _tcp(self):
 45             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 46             s.settimeout(1)
 47             start_time = time.time()
 48             res_count=0
 49             try:
 50                 s.bind((self.sip,0))
 51                 s.connect((self.tip, self.port))
 52                 s.shutdown(socket.SHUT_RD)
 53                 value = https://www.cnblogs.com/darkchen/p/(time.time() - start_time)*1000  
 54                 self.restime_deque.append(value)
 55                 self.pkloss_deque.append(0)
 56                 res_count=1
 57             except socket.timeout:
 58                 self.restime_deque.append(0)
 59                 self.pkloss_deque.append(1)
 60             except OSError as e:
 61                 self.log.debug(e)
 62                 return 0,0
 63             usetime = time.time()-start_time
 64             sleep_time = self.inver - usetime if usetime<self.inver else self.inver
 65             return sleep_time,res_count
 66     def _udp(self):
 67         res_count=0
 68         s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
 69         s.settimeout(1)
 70         start_time = time.time()
 71         data=https://www.cnblogs.com/darkchen/p/''.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
 72         try:
 73             s.sendto(data.encode('utf-8'),(self.tip,self.port))
 74             s.recv(1024)
 75             value = https://www.cnblogs.com/darkchen/p/(time.time() - start_time)*1000
 76             self.restime_deque.append(value)
 77             self.pkloss_deque.append(0)
 78             res_count=1
 79         except socket.timeout:
 80             self.restime_deque.append(0)
 81             self.pkloss_deque.append(1)
 82         except OSError as e:
 83             self.log.debug(e)
 84             return 0,0
 85         usetime = time.time()-start_time
 86         sleep_time = self.inver - usetime if usetime<self.inver else self.inver
 87         return sleep_time,res_count
 88     def _icmp(self):
 89         res_count=0
 90         start_time = time.time()
 91         cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip)
 92         ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
 93         try:
 94             value=https://www.cnblogs.com/darkchen/p/re.findall(self.compile, ret,re.S)[0]
 95             self.restime_deque.append(value)
 96             self.pkloss_deque.append(0)
 97             res_count=1
 98         except:
 99             self.pkloss_deque.append(1)
100             self.restime_deque.append(0)
101         usetime = time.time()-start_time
102         sleep_time = self.inver - usetime if usetime<self.inver else self.inver
103         return sleep_time,res_count
104     def fastping(self):
105         getattr(self, '_'+self.type)()
106     def slow_ping(self):
107         index = 0
108         res_count=0
109         while index<self.count:
110             sleep_time,count=getattr(self, '_'+self.type)()
111             index+=1
112             res_count+=count
113             if not self.flag == len(ipqli) or len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 :
114                 break
115             time.sleep(sleep_time)
116         return index,res_count
117     def ping_value(self):
118         start_time = time.time()
119         count = self.count
120         rescount = self.count
121         if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
122             fastli=[]
123             for x in range(self.count):
124                 t = threading.Thread(target=self.fastping)
125                 t.start()
126                 fastli.append(t)
127             for th in fastli:
128                 th.join()
129         else:
130             count,rescount = self.slow_ping()
131             rescount=count if rescount==0 else rescount
132         use_time = round(time.time()-start_time,4)
133         li = [self.restime_deque.pop() for x in range(count)]
134         pkli = [self.pkloss_deque.pop() for x in range(count)]
135         try:
136             restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2)
137             pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
138             return (round(restime,2),round(pkloss,2),use_time)   
139         except Exception as e:
140             self.log.debug(e)
141             return 0,0,0
142 #server端代码
143 class Server():
144     def __init__(self,sock):
145         global ipqli
146         self.ipqli=ipqli
147         self.thli=[]
148         self.sock=sock
149         self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
150         self.log = logger()
151     @classmethod
152     def start(cls):
153         s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
154         address = ('127.0.0.1',6589)
155         s.bind(address)
156         s.listen(100)
157         obj = cls(s)
158         ping_server=threading.Thread(target=obj.server)
159         ping_server.start()
160         obj.thli.append(ping_server)
161         create_t = threading.Thread(target=obj.create)
162         create_t.start()
163         obj.thli.append(create_t)
164         for t in obj.thli:
165             t.join()
166     def server(self):
167         while True:
168             conn,addr = self.sock.accept() 
169             data=https://www.cnblogs.com/darkchen/p/conn.recv(1024) 
170             data = https://www.cnblogs.com/darkchen/p/data.decode('utf-8')
171             data =https://www.cnblogs.com/darkchen/p/ json.loads(data)
172             ip,item = data
173             restime_ipq = 'restime_deque'+''.join(ip).r
							
标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *