41

支招 | 机器学习算法之KNN

 4 years ago
source link: https://www.tuicool.com/articles/n6Fzeiv
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

2UvQzmJ.gif

点击上方“蓝字”关注“AI开发者”

2QjM3uJ.jpg!web

本文为 AI 研习社社区用户 @BBuf 的博客 内容,欢迎扫描底部社区名片访问 @BBuf 的主页,查看更多内容。

6B3uyma.png!web

原  理

如果一个样本在特征空间中的k个最相似(即特征空间中最邻近)的样本中的大多数属于某一个类别,则该样本也属于这个类别。

KNN算法不仅可以用于分类,还可以用于回归。通过找出一个样本的k个最近邻居,将这些邻居的属性的平均值赋给该样本,就可以得到该样本的属性。

6B3uyma.png!web

实  现

如果按照原理来暴力实现的话,是很简单的。但暴力实现的复杂度是很高的,因为我们都要遍历样本的所有数据求离当前要查询的数据的距离。所以在上两篇文章中引入了最大堆和KD-Tree,这两种数据结构正式为KD-Tree的加速查找服务的。我们知道KNN算法是一个寻找Top K的问题,解决这类问题可以使用大顶堆,可以省去排序的过程。然后对于多维数组的查找的问题,KD-Tree也是解决这类问题的一个不错的方式。

具体来说,KD-Tree只能找到最近邻,而我们需要找到k近邻。所以当找到最近邻的时候,让算法不要退出循环,继续查找,直到我们的大顶堆中堆顶也比未被查找的邻居们都近时,再退出循环。那么保留在大顶堆里面的就是topk了。

6B3uyma.png!web

代  码

#coding=utf-8

from kdtree import KDTree

from max_heap import MaxHeap


from copy import copy

from random import randint, seed, random

from time import time

from random import choice

from math import exp, log


# 统计程序运行时间函数

# fn代表运行的函数

def run_time(fn):

def fun():

start = time()

fn()

ret = time() - start

if ret < 1e-6:

unit = "ns"

ret *= 1e9

elif ret < 1e-3:

unit = "us"

ret *= 1e6

elif ret < 1:

unit = "ms"

ret *= 1e3

else:

unit = "s"

print("Total run time is %.1f %s\n" % (ret, unit))

return fun()


def load_cancer():

f = open("boston/breast_cancer.csv")

X = []

y = []

for line in f:

line = line[:-1].split(',')

xi = [float(s) for s in line[:-1]]

yi = line[-1]

if '.' in yi:

yi = float(yi)

else:

yi = int(yi)

X.append(xi)

y.append(yi)

f.close()

return X, y


def load_house_data():

f = open("boston/housing.csv")

X = []

y = []

for line in f:

line = line[:-1].split(',')

xi = [float(s) for s in line[:-1]]

yi = line[-1]

if '.' in yi:

yi = float(yi)

else:

yi = int(yi)

X.append(xi)

y.append(yi)

f.close()

return X, y


# 划分训练集和测试集

def train_test_split(X, y, prob=0.7, random_state=None):

if random_state is not None:

seed(random_state)

X_train = []

X_test = []

y_train = []

y_test = []

for i in range(len(X)):

if random() < prob:

X_train.append(X[i])

y_train.append(y[i])

else:

X_test.append(X[i])

y_test.append(y[i])

seed()

return X_train, X_test, y_train, y_test


# 准确率

def get_acc(y, y_hat):

return sum(yi == yi_hat for yi, yi_hat in zip(y, y_hat)) / len(y)


# 查准率

def get_precision(y, y_hat):

true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))

predicted_positive = sum(y_hat)

return true_postive / predicted_positive


# 查全率

def get_recall(y, y_hat):

true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))

actual_positive = sum(y)

return true_postive / actual_positive


# 计算真正率

def get_tpr(y, y_hat):

true_positive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))

actual_positive = sum(y)

return true_positive / actual_positive


# 计算真负率

def get_tnr(y, y_hat):

true_negative = sum(1 - (yi or yi_hat) for yi, yi_hat in zip(y, y_hat))

actual_negative = len(y) - sum(y)

return true_negative / actual_negative


# 画ROC曲线

def get_roc(y, y_hat_prob):

thresholds = sorted(set(y_hat_prob), reverse=True)

ret = [[0, 0]]

for threshold in thresholds:

y_hat = [int(yi_hat_prob >= threshold) for yi_hat_prob in y_hat_prob]

ret.append([get_tpr(y, y_hat), 1 - get_tnr(y, y_hat)])

return ret

# 计算AUC(ROC曲线下方的面积)

def get_auc(y, y_hat_prob):

roc = iter(get_roc(y, y_hat_prob))

tpr_pre, fpr_pre = next(roc)

auc = 0

for tpr, fpr in roc:

auc += (tpr + tpr_pre) * (fpr - fpr_pre) / 2

tpr_pre = tpr

fpr_pre = fpr

return auc


def model_evaluation(clf, X, y):

y_hat = clf.predict(X)

y_hat_prob = [clf._predict(Xi) for Xi in X]

ret = dict()

ret["Accuracy"] = get_acc(y, y_hat)

ret["Recall"] = get_recall(y, y_hat)

ret['Precision'] = get_precision(y, y_hat)

ret['AUC'] = get_auc(y, y_hat_prob)

for k, v in ret.items():

print("%s: %.3f" % (k, v))

print()

return ret


# 计算回归模型的拟合优度

def get_r2(reg, X, y):

y_hat = reg.predict(X)

m = len(y)

n = len(y_hat)

sse = sum((yi - yi_hat) ** 2 for yi, yi_hat in zip(y, y_hat))

y_avg = sum(y) / len(y)

sst = sum((yi - y_avg) ** 2 for yi in y)

r2 = 1 - sse / sst

print("Test r2 is %.3f!" % r2)

return r2


# 将数据归一化到[0, 1]范围

def min_max_scale(X):

m = len(X[0])

x_max = [-float('inf') for _ in range(m)]

x_min = [float('inf') for _ in range(m)]

for row in X:

x_max = [max(a, b) for a, b in zip(x_max, row)]

x_min = [min(a, b) for a, b in zip(x_min, row)]


ret = []

for row in X:

tmp = [(x - b) / (a - b) for a, b, x in zip(x_max, x_min, row)]

ret.append(tmp)

return ret


class KNeighborsBase(object):


def __init__(self):

self.k_neighbors = None

self.tree = None


def fit(self, X, y, k_neighbors=3):

self.k_neighbors = k_neighbors

self.tree = KDTree()

self.tree.build_tree(X, y)


# 1.获取kd_Tree

# 2.建立大顶堆

# 3.建立队列

# 4.外层循环更新大顶堆

# 5.内层循环遍历kd_Tree

# 6.满足堆顶是第k近邻时退出循环


def knn_search(self, Xi):

tree = self.tree

heap = MaxHeap(self.k_neighbors, lambda x: x.dist)

# 搜索Xi时,从根节点到叶节点的路径

nd = tree.search(Xi, tree.root)

# 初始化队列

que = [(tree.root, nd)]

while que:

# 计算Xi和根节点的距离

nd_root, nd_cur = que.pop(0)

nd_root.dist = tree.get_eu_dist(Xi, nd_root)

heap.add(nd_root)

while nd_cur is not nd_root:

# 计算Xi和当前节点的距离

nd_cur.dist = tree.get_eu_dist(Xi, nd_cur)

# 更新最好的节点和距离

heap.add(nd_cur)

if nd_cur.brother and (not heap or heap.items[0].dist > tree.get_hyper_plane_dist(Xi, nd_cur.father)):

_nd = tree.search(Xi, nd_cur.brother)

que.append((nd_cur.brother, _nd))

nd_cur = nd_cur.father


return heap


def _predict(self, Xi):

return NotImplemented


def predict(self, X):

return [self._predict(Xi) for Xi in X]



class KNeighborsClassifier(KNeighborsBase):

def __init__(self):

KNeighborsBase.__init__(self)


def _predict(self, Xi):

heap = self.knn_search(Xi)

n_pos = sum(nd.split[1] for nd in heap._items)

return int(n_pos * 2 > self.k_neighbors)


class KNeighborsRegressor(KNeighborsBase):

def __init__(self):

KNeighborsBase.__init__(self)


def _predict(self, Xi):

heap = self.knn_search(Xi)

return sum(nd.split[1] for nd in heap._items) / self.k_neighbors



@run_time

def main1():

print("Tesing the performance of KNN classifier...")

X, y = load_cancer()

X = min_max_scale(X)

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=20)

clf = KNeighborsClassifier()

clf.fit(X_train, y_train, k_neighbors=21)

model_evaluation(clf, X_test, y_test)


@run_time

def main2():

print("Tesing the performance of KNN regressor...")

X, y = load_house_data()

X = min_max_scale(X)

X_train, X_test, y_train, y_test = train_test_split(

X, y, random_state=10)

reg = KNeighborsRegressor()

reg.fit(X=X_train, y=y_train, k_neighbors=3)

get_r2(reg, X_test, y_test)


6B3uyma.png!web

结  果

qUrABfR.png!web

6B3uyma.png!web

源码Github地址

https://github.com/BBuf/machine-learning

Qnmei2N.gif

扫码即可关注 @BBuf 的个人主页,查看更多动态内容

AJV3Ufe.png!web

*封面图来源:维基百科 - 最近鄰居法(KNN)

y2i2Y3E.png!web

nQZJjub.gif

点击

阅读原文

查看 @BBuf 的更多干货文章


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK