In [30]:
Copied!
import numpy as np
import pandas as pd
import typing as T
import numpy as np
import pandas as pd
import typing as T
以下考虑西瓜数据集3.0中第一步划分的属性选择。
In [31]:
Copied!
df = pd.read_csv('data/melon3.0a.csv')
attributes = ['色泽', '根蒂', '敲声', '纹理', '脐部', '触感']
category = '好瓜'
df = pd.read_csv('data/melon3.0a.csv')
attributes = ['色泽', '根蒂', '敲声', '纹理', '脐部', '触感']
category = '好瓜'
信息增益基于信息熵,在每一次划分后,集合的信息熵之和(按比例加权)会减小信息增益即为原信息熵与划分后的信息熵之差。
In [32]:
Copied!
def entropy(df, target_col):
p = df[target_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def conditional_entropy(df, discrete_col, target_col):
ret = 0
for val in df[discrete_col].unique():
p = len(df[df[discrete_col] == val]) / len(df) # 比例
ret += p * entropy(df[df[discrete_col] == val], target_col)
return ret
def calc_gain(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {
k: entropy_original - conditional_entropy(df, k, category)
for k in attributes
}
gain = pd.DataFrame([calc_gain(df, attributes, category)], index=['信息增益'])
gain
def entropy(df, target_col):
p = df[target_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def conditional_entropy(df, discrete_col, target_col):
ret = 0
for val in df[discrete_col].unique():
p = len(df[df[discrete_col] == val]) / len(df) # 比例
ret += p * entropy(df[df[discrete_col] == val], target_col)
return ret
def calc_gain(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {
k: entropy_original - conditional_entropy(df, k, category)
for k in attributes
}
gain = pd.DataFrame([calc_gain(df, attributes, category)], index=['信息增益'])
gain
Out[32]:
色泽 | 根蒂 | 敲声 | 纹理 | 脐部 | 触感 | |
---|---|---|---|---|---|---|
信息增益 | 0.108125 | 0.142675 | 0.140781 | 0.380592 | 0.289159 | 0.006046 |
信息增益会更加倾向于选择包含类别更多的属性,使用增益率指标可以平衡不同类别之间的权重。增益率指标为信息增益与属性固有值 $\mathrm{IV}$ 之比。
In [33]:
Copied!
def iv(df, discrete_col):
p = df[discrete_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def calc_gain_ratio(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {k: (entropy_original - conditional_entropy(df, k, category)) / iv(df, k)
for k in attributes
}
gain_ratio = pd.DataFrame(
[calc_gain_ratio(df, attributes, category)], index=['增益率']
)
gain_ratio
def iv(df, discrete_col):
p = df[discrete_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def calc_gain_ratio(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {k: (entropy_original - conditional_entropy(df, k, category)) / iv(df, k)
for k in attributes
}
gain_ratio = pd.DataFrame(
[calc_gain_ratio(df, attributes, category)], index=['增益率']
)
gain_ratio
Out[33]:
色泽 | 根蒂 | 敲声 | 纹理 | 脐部 | 触感 | |
---|---|---|---|---|---|---|
增益率 | 0.06844 | 0.101759 | 0.105627 | 0.263085 | 0.186727 | 0.006918 |
基尼指数衡量从数据集中随机抽取两个元素,标签取值不同的概率。
In [34]:
Copied!
def gini(df, target_col):
p = df[target_col].value_counts() / len(df)
return 1 - sum(p ** 2)
def gini_conditional(df, discrete_col, target_col):
ret = 0
for col in df[discrete_col].unique():
p = len(df[df[discrete_col] == col]) / len(df)
ret += p * gini(df[df[discrete_col] == col], target_col)
return ret
def calc_gini_index(df, attributes, category) -> T.Dict[str, float]:
return {
k: sum((df[k].value_counts() / len(df)) * gini_conditional(df, k, category))
for k in attributes
}
gini_index = pd.DataFrame([calc_gini_index(df, attributes, category)], index=['基尼指数'])
gini_index
def gini(df, target_col):
p = df[target_col].value_counts() / len(df)
return 1 - sum(p ** 2)
def gini_conditional(df, discrete_col, target_col):
ret = 0
for col in df[discrete_col].unique():
p = len(df[df[discrete_col] == col]) / len(df)
ret += p * gini(df[df[discrete_col] == col], target_col)
return ret
def calc_gini_index(df, attributes, category) -> T.Dict[str, float]:
return {
k: sum((df[k].value_counts() / len(df)) * gini_conditional(df, k, category))
for k in attributes
}
gini_index = pd.DataFrame([calc_gini_index(df, attributes, category)], index=['基尼指数'])
gini_index
Out[34]:
色泽 | 根蒂 | 敲声 | 纹理 | 脐部 | 触感 | |
---|---|---|---|---|---|---|
基尼指数 | 0.427451 | 0.422269 | 0.423529 | 0.277124 | 0.344538 | 0.494118 |
决策树构建¶
本节实现原书图4.2的决策树学习算法。
In [35]:
Copied!
class TreeNode():
def __init__(self, key):
self.key = key
self.mapping = {}
def add_child(self, value, child):
self.mapping[value] = child
def predict(self, x):
if self.key not in x or x[self.key] not in self.mapping:
return None
if isinstance(self.mapping[x[self.key]], TreeNode):
return self.mapping[x[self.key]].predict(x)
return self.mapping[x[self.key]]
def build_tree(df, attributes, category):
if len(df[category].unique()) == 1:
return df[category].unique()[0]
if len(attributes) == 0:
return df[category].value_counts().index[0]
gain = calc_gain(df, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
for attr in attributes:
if entropy(df, category) - conditional_entropy(df, attr, category) > entropy(df, category) - conditional_entropy(df, best_attr, category):
best_attr = attr
tree = TreeNode(best_attr)
for value in df[best_attr].unique():
tree.add_child(value, build_tree(df[df[best_attr] == value], [attr for attr in attributes if attr != best_attr], category))
return tree
tree = build_tree(df, attributes, category)
class TreeNode():
def __init__(self, key):
self.key = key
self.mapping = {}
def add_child(self, value, child):
self.mapping[value] = child
def predict(self, x):
if self.key not in x or x[self.key] not in self.mapping:
return None
if isinstance(self.mapping[x[self.key]], TreeNode):
return self.mapping[x[self.key]].predict(x)
return self.mapping[x[self.key]]
def build_tree(df, attributes, category):
if len(df[category].unique()) == 1:
return df[category].unique()[0]
if len(attributes) == 0:
return df[category].value_counts().index[0]
gain = calc_gain(df, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
for attr in attributes:
if entropy(df, category) - conditional_entropy(df, attr, category) > entropy(df, category) - conditional_entropy(df, best_attr, category):
best_attr = attr
tree = TreeNode(best_attr)
for value in df[best_attr].unique():
tree.add_child(value, build_tree(df[df[best_attr] == value], [attr for attr in attributes if attr != best_attr], category))
return tree
tree = build_tree(df, attributes, category)
决策树采取自顶向下的predict方式
In [36]:
Copied!
assert isinstance(tree, TreeNode)
tree.predict({
'色泽': '青绿',
'根蒂': '蜷缩',
'敲声': '浊响',
'纹理': '清晰',
'脐部': '凹陷',
'触感': '硬滑'
})
assert isinstance(tree, TreeNode)
tree.predict({
'色泽': '青绿',
'根蒂': '蜷缩',
'敲声': '浊响',
'纹理': '清晰',
'脐部': '凹陷',
'触感': '硬滑'
})
Out[36]:
'是'
In [119]:
Copied!
from sklearn.model_selection import train_test_split
def no_split(train, test, attributes, category):
return train[category].value_counts().index[0]
def build_pre_prune(train, test, attributes, category, tree_root=None):
if len(train[category].unique()) == 1:
return train[category].unique()[0]
no_split_result = no_split(train, test, attributes, category)
no_split_metric = np.sum(test[category] == no_split_result)
if len(attributes) == 0:
return no_split_result
# 选择划分属性
gain = calc_gain(train, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
# 创建节点
tree = TreeNode(best_attr)
if tree_root is None:
tree_root = tree
for value in train[best_attr].unique():
tree.add_child(value, no_split(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category
))
# 计算剪枝效果
result = []
for row in test[attributes].itertuples():
row_dict = row._asdict()
result.append(tree_root.predict(row_dict))
split_metric = np.sum([a == b for a, b in zip(result, test[category])])
# 剪枝
print(f'剪枝前: {no_split_metric}, 剪枝后: {split_metric}')
if split_metric < no_split_metric:
print('不剪枝')
return no_split_result
print('剪枝')
for value in train[best_attr].unique():
child = build_pre_prune(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category,
tree_root
)
tree.add_child(value, child)
return tree
train, test = train_test_split(df, test_size=0.5)
tree = build_pre_prune(train, test, attributes, category)
result = [
tree.predict(row._asdict()) if isinstance(tree, TreeNode) else tree
for row in test[attributes].itertuples()
]
split_metric = np.mean([a == b for a, b in zip(result, test[category])])
split_metric
from sklearn.model_selection import train_test_split
def no_split(train, test, attributes, category):
return train[category].value_counts().index[0]
def build_pre_prune(train, test, attributes, category, tree_root=None):
if len(train[category].unique()) == 1:
return train[category].unique()[0]
no_split_result = no_split(train, test, attributes, category)
no_split_metric = np.sum(test[category] == no_split_result)
if len(attributes) == 0:
return no_split_result
# 选择划分属性
gain = calc_gain(train, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
# 创建节点
tree = TreeNode(best_attr)
if tree_root is None:
tree_root = tree
for value in train[best_attr].unique():
tree.add_child(value, no_split(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category
))
# 计算剪枝效果
result = []
for row in test[attributes].itertuples():
row_dict = row._asdict()
result.append(tree_root.predict(row_dict))
split_metric = np.sum([a == b for a, b in zip(result, test[category])])
# 剪枝
print(f'剪枝前: {no_split_metric}, 剪枝后: {split_metric}')
if split_metric < no_split_metric:
print('不剪枝')
return no_split_result
print('剪枝')
for value in train[best_attr].unique():
child = build_pre_prune(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category,
tree_root
)
tree.add_child(value, child)
return tree
train, test = train_test_split(df, test_size=0.5)
tree = build_pre_prune(train, test, attributes, category)
result = [
tree.predict(row._asdict()) if isinstance(tree, TreeNode) else tree
for row in test[attributes].itertuples()
]
split_metric = np.mean([a == b for a, b in zip(result, test[category])])
split_metric
剪枝前: 4, 剪枝后: 4 剪枝 剪枝前: 5, 剪枝后: 0 不剪枝
Out[119]:
0.4444444444444444
后剪枝¶
后剪枝在决策树生成后再遍历决策树,删去过多的节点
In [120]:
Copied!
def build_post_prune(train, test, attributes, category):
tree = build_tree(train, attributes, category)
# TODO: apply post pruning
def build_post_prune(train, test, attributes, category):
tree = build_tree(train, attributes, category)
# TODO: apply post pruning