3

可扩展的简单网络嗅探器

 2 years ago
source link: https://allenwind.github.io/blog/2396/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
Mr.Feng Blog

NLP、深度学习、机器学习、Python、Go

可扩展的简单网络嗅探器

实现一个简单的网络嗅探。

我们想写一个网络嗅探器,希望它有很好的通用性,能适应不同的嗅探场景。根据这样的需求,我们可以在功能是做分离。一个嗅探器可以分为两部分:

  • 网络嗅探部分

  • 对嗅探内容进行处理部分。前者需要系统兼容,后者根据不同的系统需求编写不同的处理嗅探内容的插件。

嗅探的原理

我们所熟知的TCP/IP网络协议栈中,数据时同网络接口层出入网络层然后传入运输层,然后根据端口号分流到不同的进程中。这是一个多路复用的过程。但,正是这种层层上传的过程,数据被层层解包:上层接收的数据丢失了底层网络的数据包结构信息。例如,IP层分组进入到TCP层变为报文段丢失了IP地址等。在网络编程中,一种方法叫“混杂模式”的套接字编程,可以绕过上述的过程,直接在TCP层接收网络接口层的信息。我们称其为原生套接字。

关键是在建立套接字时指定SOCK_RAW

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol)

还有要考虑系统的兼容性,不同系统在元素套接字上有不同的处理。详细差异见代码注释。

定义一个嗅探器的通用接口,采用OOP的写法:

class Sniffer:

def __init__(self, raw_sniffer):
self.sniffer = raw_sniffer
self.target_address = None

def recv_data(self, length=65536):
data, address = self.sniffer.recvfrom(length)
self.target_address = address
return address, data

recvfrom = self.recv_data

def get_current_target_address(self):
return self.target_address

考虑到网络嗅探采集数据的通用性,这里采用上下文管理的方式实现该函数。具体如下。

import socket
import os
import contextlib

@contextlib.contextmanager # 上下文管理装饰器
def make_sniffer(host=None, port=None):
if host is None:
host = socket.gethostbyname(socket.gethostname())
if port is None:
port = 0
if os.name == 'nt':
protocol = socket.IPPROTO_IP
else:
protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol)
sniffer.bind((host, port))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
try:
yield Sniffer(sniffer)
finally:
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

这样,当我们要实现具体的场景需求是,只要在这个sniffer上下文下就可以了。例如:

with make_sniffer() as sniffer:
# 调用recv_data接口接收数据和address,
# 根据这些信息实现满足特定场景的需求

基于通用嗅探器的一个例子

我们编写一个统计局域网下各IP的数据流量

import socket
import os
import contextlib

def ip_counter():
with make_sniffer() as sniffer:
import collections
import time
import threading

c = collections.Counter()
def task(c, interval):
# add bit counter
while True:
time.sleep(interval)
os.system("cls")
for ip, count in sorted(c.items(), key=lambda x: x[1], reverse=True):
print(ip, count)
threading.Thread(target=task, args=(c, 1)).start()

while True:
_, address = sniffer.recvfrom(65536)
ip = address[0]
c.update({ip: 1})

if __name__ == '__main__':
ip_counter()
192.168.0.111 17
14.114.244.1 11
220.249.245.146 2
192.168.0.1 1
120.4.90.229 1
64.233.188.188 1
101.27.78.51 1

可以看到局域网下不同主机的数据活跃度。

详细代码见simple-sniffer

转载请包括本文地址:https://allenwind.github.io/blog/2396
更多文章请参考:https://allenwind.github.io/blog/archives/


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK