In [1]:
Copied!
import numpy as np
import pandas as pd
import typing as T
import numpy as np
import pandas as pd
import typing as T
/tmp/ipykernel_107382/1600169486.py:2: DeprecationWarning: Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0), (to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries) but was not found to be installed on your system. If this would cause problems for you, please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466 import pandas as pd
以下考虑西瓜数据集3.0中第一步划分的属性选择。
In [2]:
Copied!
df = pd.read_csv('data/melon3.0a.csv')
attributes = ['色泽', '根蒂', '敲声', '纹理', '脐部', '触感']
category = '好瓜'
df = pd.read_csv('data/melon3.0a.csv')
attributes = ['色泽', '根蒂', '敲声', '纹理', '脐部', '触感']
category = '好瓜'
信息增益基于信息熵,在每一次划分后,集合的信息熵之和(按比例加权)会减小信息增益即为原信息熵与划分后的信息熵之差。
In [3]:
Copied!
def entropy(df, target_col):
p = df[target_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def conditional_entropy(df, discrete_col, target_col):
ret = 0
for val in df[discrete_col].unique():
p = len(df[df[discrete_col] == val]) / len(df) # 比例
ret += p * entropy(df[df[discrete_col] == val], target_col)
return ret
def calc_gain(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {
k: entropy_original - conditional_entropy(df, k, category)
for k in attributes
}
gain = pd.DataFrame([calc_gain(df, attributes, category)], index=['信息增益'])
gain
def entropy(df, target_col):
p = df[target_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def conditional_entropy(df, discrete_col, target_col):
ret = 0
for val in df[discrete_col].unique():
p = len(df[df[discrete_col] == val]) / len(df) # 比例
ret += p * entropy(df[df[discrete_col] == val], target_col)
return ret
def calc_gain(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {
k: entropy_original - conditional_entropy(df, k, category)
for k in attributes
}
gain = pd.DataFrame([calc_gain(df, attributes, category)], index=['信息增益'])
gain
Out[3]:
色泽 | 根蒂 | 敲声 | 纹理 | 脐部 | 触感 | |
---|---|---|---|---|---|---|
信息增益 | 0.108125 | 0.142675 | 0.140781 | 0.380592 | 0.289159 | 0.006046 |
信息增益会更加倾向于选择包含类别更多的属性,使用增益率指标可以平衡不同类别之间的权重。增益率指标为信息增益与属性固有值 $\mathrm{IV}$ 之比。
In [4]:
Copied!
def iv(df, discrete_col):
p = df[discrete_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def calc_gain_ratio(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {k: (entropy_original - conditional_entropy(df, k, category)) / iv(df, k)
for k in attributes
}
gain_ratio = pd.DataFrame(
[calc_gain_ratio(df, attributes, category)], index=['增益率']
)
gain_ratio
def iv(df, discrete_col):
p = df[discrete_col].value_counts() / len(df)
return -sum(p * np.log2(p))
def calc_gain_ratio(df, attributes, category) -> T.Dict[str, float]:
entropy_original = entropy(df, category)
return {k: (entropy_original - conditional_entropy(df, k, category)) / iv(df, k)
for k in attributes
}
gain_ratio = pd.DataFrame(
[calc_gain_ratio(df, attributes, category)], index=['增益率']
)
gain_ratio
Out[4]:
色泽 | 根蒂 | 敲声 | 纹理 | 脐部 | 触感 | |
---|---|---|---|---|---|---|
增益率 | 0.06844 | 0.101759 | 0.105627 | 0.263085 | 0.186727 | 0.006918 |
基尼指数衡量从数据集中随机抽取两个元素,标签取值不同的概率。
In [5]:
Copied!
def gini(df, target_col):
p = df[target_col].value_counts() / len(df)
return 1 - sum(p ** 2)
def gini_conditional(df, discrete_col, target_col):
ret = 0
for col in df[discrete_col].unique():
p = len(df[df[discrete_col] == col]) / len(df)
ret += p * gini(df[df[discrete_col] == col], target_col)
return ret
def calc_gini_index(df, attributes, category) -> T.Dict[str, float]:
return {
k: sum((df[k].value_counts() / len(df)) * gini_conditional(df, k, category))
for k in attributes
}
gini_index = pd.DataFrame([calc_gini_index(df, attributes, category)], index=['基尼指数'])
gini_index
def gini(df, target_col):
p = df[target_col].value_counts() / len(df)
return 1 - sum(p ** 2)
def gini_conditional(df, discrete_col, target_col):
ret = 0
for col in df[discrete_col].unique():
p = len(df[df[discrete_col] == col]) / len(df)
ret += p * gini(df[df[discrete_col] == col], target_col)
return ret
def calc_gini_index(df, attributes, category) -> T.Dict[str, float]:
return {
k: sum((df[k].value_counts() / len(df)) * gini_conditional(df, k, category))
for k in attributes
}
gini_index = pd.DataFrame([calc_gini_index(df, attributes, category)], index=['基尼指数'])
gini_index
Out[5]:
色泽 | 根蒂 | 敲声 | 纹理 | 脐部 | 触感 | |
---|---|---|---|---|---|---|
基尼指数 | 0.427451 | 0.422269 | 0.423529 | 0.277124 | 0.344538 | 0.494118 |
决策树构建¶
本节实现原书图4.2的决策树学习算法。
In [6]:
Copied!
class TreeNode():
def __init__(self, key):
self.key = key
self.mapping = {}
def add_child(self, value, child):
self.mapping[value] = child
def predict(self, x):
if self.key not in x or x[self.key] not in self.mapping:
return None
if isinstance(self.mapping[x[self.key]], TreeNode):
return self.mapping[x[self.key]].predict(x)
return self.mapping[x[self.key]]
def build_tree(df, attributes, category):
if len(df[category].unique()) == 1:
return df[category].unique()[0]
if len(attributes) == 0:
return df[category].value_counts().index[0]
gain = calc_gain(df, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
for attr in attributes:
if entropy(df, category) - conditional_entropy(df, attr, category) > entropy(df, category) - conditional_entropy(df, best_attr, category):
best_attr = attr
tree = TreeNode(best_attr)
for value in df[best_attr].unique():
tree.add_child(value, build_tree(df[df[best_attr] == value], [attr for attr in attributes if attr != best_attr], category))
return tree
tree = build_tree(df, attributes, category)
class TreeNode():
def __init__(self, key):
self.key = key
self.mapping = {}
def add_child(self, value, child):
self.mapping[value] = child
def predict(self, x):
if self.key not in x or x[self.key] not in self.mapping:
return None
if isinstance(self.mapping[x[self.key]], TreeNode):
return self.mapping[x[self.key]].predict(x)
return self.mapping[x[self.key]]
def build_tree(df, attributes, category):
if len(df[category].unique()) == 1:
return df[category].unique()[0]
if len(attributes) == 0:
return df[category].value_counts().index[0]
gain = calc_gain(df, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
for attr in attributes:
if entropy(df, category) - conditional_entropy(df, attr, category) > entropy(df, category) - conditional_entropy(df, best_attr, category):
best_attr = attr
tree = TreeNode(best_attr)
for value in df[best_attr].unique():
tree.add_child(value, build_tree(df[df[best_attr] == value], [attr for attr in attributes if attr != best_attr], category))
return tree
tree = build_tree(df, attributes, category)
决策树采取自顶向下的predict方式
In [7]:
Copied!
assert isinstance(tree, TreeNode)
tree.predict({
'色泽': '青绿',
'根蒂': '蜷缩',
'敲声': '浊响',
'纹理': '清晰',
'脐部': '凹陷',
'触感': '硬滑'
})
assert isinstance(tree, TreeNode)
tree.predict({
'色泽': '青绿',
'根蒂': '蜷缩',
'敲声': '浊响',
'纹理': '清晰',
'脐部': '凹陷',
'触感': '硬滑'
})
Out[7]:
'是'
In [8]:
Copied!
from sklearn.model_selection import train_test_split
def no_split(train, test, attributes, category):
return train[category].value_counts().index[0]
def build_pre_prune(train, test, attributes, category, tree_root=None):
if len(train[category].unique()) == 1:
return train[category].unique()[0]
no_split_result = no_split(train, test, attributes, category)
no_split_metric = np.sum(test[category] == no_split_result)
if len(attributes) == 0:
return no_split_result
# 选择划分属性
gain = calc_gain(train, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
# 创建节点
tree = TreeNode(best_attr)
if tree_root is None:
tree_root = tree
for value in train[best_attr].unique():
tree.add_child(value, no_split(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category
))
# 计算剪枝效果
result = []
for row in test[attributes].itertuples():
row_dict = row._asdict()
result.append(tree_root.predict(row_dict))
split_metric = np.sum([a == b for a, b in zip(result, test[category])])
# 剪枝
print(f'剪枝前: {no_split_metric}, 剪枝后: {split_metric}')
if split_metric < no_split_metric:
print('不剪枝')
return no_split_result
print('剪枝')
for value in train[best_attr].unique():
child = build_pre_prune(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category,
tree_root
)
tree.add_child(value, child)
return tree
train, test = train_test_split(df, test_size=0.5)
tree = build_pre_prune(train, test, attributes, category)
result = [
tree.predict(row._asdict()) if isinstance(tree, TreeNode) else tree
for row in test[attributes].itertuples()
]
split_metric = np.mean([a == b for a, b in zip(result, test[category])])
split_metric
from sklearn.model_selection import train_test_split
def no_split(train, test, attributes, category):
return train[category].value_counts().index[0]
def build_pre_prune(train, test, attributes, category, tree_root=None):
if len(train[category].unique()) == 1:
return train[category].unique()[0]
no_split_result = no_split(train, test, attributes, category)
no_split_metric = np.sum(test[category] == no_split_result)
if len(attributes) == 0:
return no_split_result
# 选择划分属性
gain = calc_gain(train, attributes, category)
best_attr = max(gain, key=lambda _: gain[_])
# 创建节点
tree = TreeNode(best_attr)
if tree_root is None:
tree_root = tree
for value in train[best_attr].unique():
tree.add_child(value, no_split(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category
))
# 计算剪枝效果
result = []
for row in test[attributes].itertuples():
row_dict = row._asdict()
result.append(tree_root.predict(row_dict))
split_metric = np.sum([a == b for a, b in zip(result, test[category])])
# 剪枝
print(f'剪枝前: {no_split_metric}, 剪枝后: {split_metric}')
if split_metric < no_split_metric:
print('不剪枝')
return no_split_result
print('剪枝')
for value in train[best_attr].unique():
child = build_pre_prune(
train[train[best_attr] == value], test,
[attr for attr in attributes if attr != best_attr], category,
tree_root
)
tree.add_child(value, child)
return tree
train, test = train_test_split(df, test_size=0.5)
tree = build_pre_prune(train, test, attributes, category)
result = [
tree.predict(row._asdict()) if isinstance(tree, TreeNode) else tree
for row in test[attributes].itertuples()
]
split_metric = np.mean([a == b for a, b in zip(result, test[category])])
split_metric
剪枝前: 5, 剪枝后: 7 剪枝 剪枝前: 5, 剪枝后: 0 不剪枝
Out[8]:
0.7777777777777778
后剪枝¶
后剪枝在决策树生成后再遍历决策树,删去过多的节点
In [9]:
Copied!
def build_post_prune(train, test, attributes, category):
tree = build_tree(train, attributes, category)
# TODO: apply post pruning
def build_post_prune(train, test, attributes, category):
tree = build_tree(train, attributes, category)
# TODO: apply post pruning