4

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

 2 years ago
source link: https://blog.51cto.com/jasonhuang/5376798
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

推荐 原创

如果大家有在使用混合部署完全本地化邮件服务器(也包含 Linux 等其他平台的邮件服务器),或许很多人会经常遇到和我一样的问题:邮件服务器的公网 IP 被列入Spamhaus和类似的反垃圾邮件联盟的黑名单 导致业务部门发出去的邮件被对方邮件网关拦截或直接拒收。


假设你有运维这样的一个场景,这里以Exchange平台为例(相信很多企业都是使用该场景):

使用的是混合部署环境,并且:
发信邮件流本地 ExchangeO365 Exchange Online;
收信邮件流本地邮件网关本地 Exchange 服务器O365用户邮箱

引用微软工程师的邮件内容: O365的EOP会参考Spamhaus,connection filter中的 Allow list不能跳过spamhaus的SBL和XBL的检测 ,目前列入的PBL,可以考虑加入连接筛选器的白名单。
Spamhaus是一个公平/权威的三方平台,为组织和企业提供实时的IP阻止列表。Office 365的EOP会实时参照Spamhaus的阻止列表来确定是否需要拒绝从某些地方发进来的邮件。
Spamhaus的阻止列表有好几种,如SBL,XBL.PBL,DBL。SBL列入的是发垃圾邮件的IP地址列表,XBL是因发一些恶意软件或者病毒而被列入的阻止列表,PBL是因为某些组织要求的阻止列表,DBL是基于域名的阻止列表。具体的这些阻止列表信息,您可以访问 The Spamhaus Project 查看原因。
因此,当您收到IP被放入Spamhaus的阻止列表,到Spamhaus查询的时候,可以看到这个IP被列在哪个阻止列表中。从而可以知道这个IP为什么会被加到阻止列表。
由于Spamhaus是三方的组织平台,因此,微软没有办法监测或者有提醒功能。而且,考虑到来自阻止IP列表邮件的危险性,为了我们的客户考虑,EOP会在连接层面就断掉来自该IP的连接。邮件并没有进入EOP被O365收下来,因此,您在Exchange online管理中心里设置黑白名单不会生效。

因此,如果管理员不能及时发现 IP 被列入黑名单,你的业务、销售部门此时还在不停的外发邮件。一般在 12-24小时左右,你的邮件公网 IP 会被微软列入黑名单,这个时候即便你在Exchange Online上将本地的公网 IP 添加到连接器的白名单,你的用户收发邮件仍然会收到影响。


一、监控 Exchange 传输日志

可以通过ELKSplunk这类的工具监控 Exchange 的传输日志,当出现关键词*550 5.7.1 Message rejected as spam by Content Filtering*550 5.7.1 Service unavailable;Client host[IP] blocked using Spamhaus 就发出告警通知邮件管理员

上述方法有个缺点,即当你通过该日志发现了问题,说明你的邮件服务器公网 IP 已经被列入黑名单有一段时间了,并且已经影响了你的生产环境。

二、通过Python的第三方Pydnsbl模块进行实时监控

优点 当你的 IP 被列入 Spamhaus 黑名单后,会立刻被查询到,及时通知管理员并进行相应处理,就不会出现长时间未处理,导致微软同步 Spamhaus策略后整个公司的邮件都受到收发影响

 官方文档

使用方法:
通过官方介绍,其使用方法非常简单,只要Python版本大于等于3.5版本即可。接下来我会基于这个方法做个详细的使用介绍,便于后期参考,也方便和我一样有类似需求的人查阅。

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警_Spamhaus 监控

有了这款工具和使用方法,接下来就要结合使用场景,来实现自动检查、自动触发告警。

由于我本人对 Python 不是很熟悉,只能根据自己的需求编写符合需求的脚本,如有不妥或可以优化、改进的地方,可留言赐教,谢谢!

#!/usr/bin/python3.7
# encoding: utf-8

import urllib.request
import json
import pydnsbl
import ssl
ssl._create_default_https_context = ssl._create_unverified_context


TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 企业的id,在管理端->"我的企业" 可以看到
# CORP_ID = "CORP_ID"
CORP_ID = "CORP_ID****"
# 某个自建应用的id及secret, 在管理端 -> 企业应用 -> 自建应用, 点进相应应用可以看到
APP_ID = "APP_ID*****"
CORP_SECRET = "**********"

class Wechat(object):
    "send monitor message by wechat"

    def __init__(self):
        self.CORP_ID = CORP_ID
        self.CORP_SECRET = CORP_SECRET
        self.APP_ID = APP_ID
        self.BASEURL = 'https://qyapi.weixin.qq.com/cgi-bin/'
        self.TOKEN_URL = 'gettoken?corpid={0}&corpsecret={1}'.format(
            self.CORP_ID, self.CORP_SECRET)

    # 获取认证 token
    def Get_Token(self):
        try:
            response = urllib.request.urlopen(
                '{0}{1}'.format(self.BASEURL, self.TOKEN_URL))
            access_token = json.loads(
                response.read().decode('utf-8'))['access_token']
            with open('token', 'w') as f:
                f.write(access_token)
        except KeyError:
            raise KeyError
        return access_token

    def checker(self):
        Result = []
        ip_checker = pydnsbl.DNSBLIpChecker()
        Result1 = ip_checker.check('68.128.212.240')  # Exchange Server 01
        Result2 = ip_checker.check('68.128.212.241')  # Exchange Server 02
        Result3 = ip_checker.check('68.128.212.242')  # Exchange Server 03
        Result.append(Result1)
        Result.append(Result2)
        Result.append(Result3)
        # domain_checker = pydnsbl.DNSBLDomainChecker()
        # Result4 = domain_checker.check('luxiu2.com')  # Domain Name 01
        # Result5 = domain_checker.check('videour.com')  # Domain Name 02
        # Result6 = domain_checker.check('jesdoit.com')  # Domain Name 03
        # Result.append(Result4)
        # Result.append(Result5)
        # Result.append(Result6)
        return Result

    # 本地 token
    def Local_Token(self):
        try:
            with open('token', 'r') as f:
                token = f.readline().strip()
                if token == '':
                    token = self.Get_Token()
                    return token
                else:
                    return token
        except IOError:
            token = self.Get_Token()
            return token

    # 获取报警人员名单
    def Get_User(self, dep_id=1, fchild=1):
        #token = self.Get_Token()
        token = self.Local_Token()
        send_url = '{0}user/list?access_token={1}&department_id={2}&fetch_child{3}'.format(
            self.BASEURL, token, dep_id, fchild,)
        respone = urllib.request.urlopen(url=send_url).read()
        stat = json.loads(respone)['userlist']
        user = ''
        for k in stat:
            user += '{0} '.format(k['mobile'])
        mobile = ','.join(user.split())
        with open('user.txt', 'w') as f:
            f.write(mobile)

    # 发送报警信息
    def Send_Message(self, content):
        self.content = {
            # "touser":  "User01|User02|User03",   # 成员, @all及所有人  "UserID1|UserID2|UserID3",//企业微信的唯一userid,非必输
            "touser": 'jasonhuang',
            # "toparty": '1',                          # 部门,@all 及所有部门  "PartyID1|PartyID2",//部门id,非必输,如果输入了就只给指定部门发送消息
            "msgtype": 'text',                       # 消息类型,文本,图片
            "agentid": self.APP_ID,                  # 企业应用 id
            "safe": "0",                             #
            "text": {
                    "content": content                   # 报警内容
            }
        }
        token = self.Local_Token()
        # 构建告警信息,必须是 json 格式
        msg = messages_content = json.dumps(self.content)
        send_url = '{0}message/send?access_token={1}'.format(
            self.BASEURL, token)
        respone = urllib.request.urlopen(
            url=send_url, data=msg.encode("utf-8")).read()
        stat = json.loads(respone.decode())['errcode']
        if stat == 0:
            print('Succesfully Send To Wechat')
        else:
            token = self.Get_Token()
            send_url = '{0}message/send?access_token={1}'.format(
                self.BASEURL, token)
            respone = urllib.request.urlopen(url=send_url, data=msg).read()
            return respone


if __name__ == '__main__':
    msgs = []
    msg = Wechat().checker()
    for i in msg:
        j = str(i)
        if j.find('BLACKLISTED') != -1:
            msgs.append(j)
    if msgs:
        msgsend = str(msgs)
        wechat = Wechat()
        wechat.Send_Message('邮件服务器公网IP状态告警:'+msgsend)
        print('Error'+msgsend)
    else:
        print('All Exchange Server Internet IP Status are Normal')

我在脚本第46-48行中监控了 3 台服务器的公网 IP 在Spamhaus的状态([font color=“#B22222”]为测试效果,这些 IP 为网上找的垃圾邮件服务器公网 IP[/font]

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警_Python运维脚本_02

这里我们可以看到这 3 个 IP 均被提示异常,接下来我们去Spamhaus官网手动查询下结果看是否一致。

访问 Lookup - Reputation Checker - Spamhaus并输入68.128.212.240进行查询,结果如下:

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警_Python运维脚本_03

其他两个 IP 也是一样的结果,所以可以判断其运行正常。

最后,通过上述脚本,制作个计划任务,每半小时或每小时执行一次,这样能很好的监控你的邮件服务器公网 IP 的运行状态了。 也可以启用脚本的 52-56行,对一个或多个域名进行监控,按需操作即可。

Enjoy~~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK