5

利用RITA检测beacon通信

 2 years ago
source link: https://www.anquanke.com/post/id/266379
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RITA 是一个用于检测 C2 通信流量的开源项目,由 GO 语言编写

本文主要介绍其分析器的工作原理,并借助 Python 代码逐步还原该算法

为方便阅读,以下涉及到的变量均参照源码搬运而来,读者可跳转至该项目后自行作对比分析

分析器源码地址:https://github.com/activecm/rita/blob/master/pkg/beacon/analyzer.go

该算法对同源-目的IP间的通信,进行分数计算得到 score,其值分布为 0~1 之间,趋近 1 则疑似 beacon 通信行为

具体计算过程大致上可以分为两部分,根据通信间隔数据包大小的特征,分别计算出 tsScoredsScore,最后取均值得到 score

其中,tsScore 和 dsScore 的计算过程相似,下面开始针对各个部分做具体分析

tsScore的计算

tsScore 的计算又细分为三个部分:tsSkewScoretsMadmScoretsConnCountScore

tsSkewScore

该值的计算原理在源码中的注释部分也有所解释,即:理想的 beacon 通信行为应该具有通信间隔和数据大小呈现对称分布的特征

意思也很好理解,以 CobalStrike 默认的 beacon 配置为例,其通信行为如下图所示:

CS 默认回连的间隔时间为 60 秒,即使加上抖动(jitter),理论上仍然会呈现出高度对称的分布特征,其数据包大小亦如是

这里采用Bowley 偏度公式来度量对称性,得到偏度系数 tsSkew 和 dsSkew,后者在第二部分计算 dsScore 时会使用到

所谓的偏度系数,适用于描述曲线形状的对称性,其值在 1 到 -1 之间,分别代表着正偏斜或负偏斜,参照下图

具体计算时需要对数据集按大小顺序作四等分,在三个分位点依次取值:tsLow,tsMid,tsHigh

Python 示例代码:

# 根据时间戳计算通信间隔
http_df['deltas'] = http_df[timestamp].apply(lambda x: pd.Series(x).diff().dt.seconds.dropna().tolist())
# 以通信间隔为数据集,计算各分位点处数值
http_df['tsLow'] = http_df['deltas'].apply(lambda x: np.percentile(np.array(x), 25))
http_df['tsMid'] = http_df['deltas'].apply(lambda x: np.percentile(np.array(x), 50))
http_df['tsHigh'] = http_df['deltas'].apply(lambda x: np.percentile(np.array(x), 75))
# 根据公式:Bowley Skewness = (Q1 + Q3 – 2Q2) / (Q3 – Q1),计算分母和分子
http_df['tsBowleyNum'] = http_df['tsLow'] + http_df['tsHigh'] - 2*http_df['tsMid']
http_df['tsBowleyDen'] = http_df['tsHigh'] - http_df['tsLow']
# 如果分母为 0,Q2 = Q1 或 Q2 = Q3,则计算结果不可靠,tsSkew 取 0
http_df['tsSkew'] = http_df[['tsLow','tsMid','tsHigh','tsBowleyNum','tsBowleyDen']].apply(
    lambda x: x['tsBowleyNum'] / x['tsBowleyDen'] if x['tsBowleyDen'] != 0 and x['tsMid'] != x['tsLow'] and x['tsMid'] != x['tsHigh'] else 0.0, axis=1
    )

最后根据 tsSkew 算出:tsSkewScore = 1 – abs(tsSkew)

tsMADMScore

这一段计算的前提为:理想的 beacon 通信行为在其通信间隔的中位数周围应该具有很低的分散度

该特征可以用 MADM(Median Absolute Deviation about the Median)来度量

具体计算过程也比较简单,在 Python 中我们用一行代码就能搞定

http_df['tsMadm'] = http_df['deltas'].apply(lambda x: np.median(np.absolute(np.array(x) - np.median(np.array(x)))))

得到 tsMadm 和 dsMadm 后,设定 30秒32字节 作为阈值,超出则分数计为 0

因为这里的分散度越低越好,由此可以计算出 tsMadmScore 和 dsMadmScore,以下是 Python 示例代码:

http_df['tsMadmScore'] = 1.0 - http_df['tsMadm'] / 30.0
http_df['tsMadmScore'] = http_df['tsMadmScore'].apply(lambda x: 0 if x < 0 else x)
http_df['dsMadmScore'] = 1.0 - http_df['tsMadm'] / 32.0
http_df['dsMadmScore'] = http_df['dsMadmScore'].apply(lambda x: 0 if x < 0 else x)

tsConnCountScore

跟上面的 SkewScore 和 MadmScore 不同,这一部分在 tsScore 和 dsScore 中的计算有所区别

在行为特征上,beacon 通信时往往具有较高的连接数,tsConnCountScore 正是用来度量这一点的

利用通信的持续时间和连接次数计算得到:tsConnCountScore = ConnectionCount / (tsConnDiv / 10.0)

其中,ConnectionCount 代表通信次数,tsConnDiv 由通信起止时间除以固定的连接频率得来,此处该值取为 10 秒

最后得分如果大于1,则向下舍入取1,因为连接得越频繁,越有可能是 beacon 通信

PS:对于这里的 10 秒,应该还存在优化空间,有研究员指出该值太小可能会影响最终得分从而产生漏报,传送地址

综上,计算 tsScore = (tsSkewScore + tsMADMScore + tsConnCountScore) / 3.0

dsScore的计算

数据大小的分散度(dsSkewScore)和 MADM(dsMADMScore) 的算法不变,前文已经有所解释

主要区别在于,计算 dsMADMScore 时数据大小的 MADM 以 32 字节作为阈值,数据包超出该大小则得分为 0

另外,由于 beacon 通常都是较小的数据包,这里采用 dsSmallnessScore 作为衡量指标

其含义为,如果数据包大小的众数(Mode)超过 65 Kb,则得分为 0

Python 示例代码如下:

# 计算数据包大小差值
http_df['size_deltas'] = http_df[data_size].apply(lambda x: pd.Series(x).diff().dropna().tolist())
# 计算 dsSkew
http_df['dsLow'] = http_df['size_deltas'].apply(lambda x: np.percentile(np.array(x), 25))
http_df['dsMid'] = http_df['size_deltas'].apply(lambda x: np.percentile(np.array(x), 50))
http_df['dsHigh'] = http_df['size_deltas'].apply(lambda x: np.percentile(np.array(x), 75))
http_df['dsBowleyNum'] = http_df['dsLow'] + http_df['dsHigh'] - 2*http_df['dsMid']
http_df['dsBowleyDen'] = http_df['dsHigh'] - http_df['dsLow']
http_df['dsSkew'] = http_df[['dsLow','dsMid','dsHigh','dsBowleyNum','dsBowleyDen']].apply(
    lambda x: x['dsBowleyNum'] / x['dsBowleyDen'] if x['dsBowleyDen'] != 0 and x['dsMid'] != x['dsLow'] and x['dsMid'] != x['dsHigh'] else 0.0, axis=1
    )
# 计算 dsSkewScore
http_df['dsSkewScore'] = 1.0 - abs(http_df['dsSkew'])
# 计算 dsMadmScore
http_df['dsMadm'] = http_df['size_deltas'].apply(lambda x: np.median(np.absolute(np.array(x) - np.median(np.array(x)))))
http_df['dsMadmScore'] = 1.0 - http_df['dsMadm'] / 32.0
http_df['dsMadmScore'] = http_df['dsMadmScore'].apply(lambda x: 0 if x < 0 else x)
# 计算 dsSmallnessScore
http_df['dsSmallnessScore'] = http_df[data_size].apply(lambda x: 1- (np.argmax(np.bincount(x)) / 65535))
http_df['dsSmallnessScore'] = http_df['dsSmallnessScore'].apply(lambda x: 0 if x < 0 else x)

最后得到,dsScore = (dsSkewScore + dsMADMScore + dsSmallnessScore) / 3.0

最终得分:score = (tsScore + tsScore) / 2.0,完整的演示代码放在 这里

为了简单演示下狩猎效果,我用 Covenant 作为 C2 框架,生成 3 组数据,通信间隔在 10-60 秒不等,抖动在 10%-20% 之间

最后得分如下:

当然,这里只是为了验证该算法基本逻辑的正确性,数据样本太小,并不能说明其实际使用效果,切勿直接用作检测告警

它的目的是帮助我们提高威胁狩猎的效率,搭配其它工具和方法一起使用才是正确的姿势

例如可以将其输出作为我们检测规则的输入,或者结合威胁情报食用,进行二次确认

另外,不要忘了对输入数据做好预处理和清洗工作,比如聚焦特定协议判断内外网IP过滤白名单主机等等,这会大幅提高分析效率和检测效果

当然,算法本身还有很大的优化空间,尤其对计算过程中涉及到的一些常量,我也有不少的疑问

例如,Mehmet Ergene 在其文章中提到过可能会产生的漏报:

  1. 通信间隔的 MADM 大于 30 秒时,则 tsMADMScore 分数为 0
  2. 数据大小的 MADM 超过 32 字节时,则 dsMADMScore分数为 0

如果一个 beacon 的通信间隔为 10 分钟,抖动(jitter)为 20%,其 MADM 会超过 30s

这种情况下算出的 tsMadmScore 为 0,进而会降低总得分,导致漏报

最后,感谢 Mehmet Ergene 的分享,这篇文章受其启发而来,如有纰漏之处,欢迎私信交流


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK