python3实现maxent 最大熵模型
使用python实现最大熵模型,在原来作者的基础上重写了predict, 增加了predict_proba()方法原作者:https://blog.csdn.net/slx_share/article/details/80097020最大熵原理: 在学习概率模型时,在所有可能的概率模型分布中,熵最大的模型就是最好的模型。所以用最大熵原理作为约束条件选取的模型就是最大熵模型。最大熵原理通过熵的最大化
·
使用python实现最大熵模型,在原来作者的基础上重写了predict, 增加了predict_proba()方法
原作者:blog.csdn.net/slx_share/article/details/80097020
另外,更正了原本代码种存在的一些bug
最大熵原理: 在学习概率模型时,在所有可能的概率模型分布中,熵最大的模型就是最好的模型。所以用最大熵原理作为约束条件选取的模型就是最大熵模型。最大熵原理通过熵的最大化来表示等可能性。
整体代码如下:
本代码的github地址:https://github.com/yinboliu-git/model/blob/main/ML/maxnet.py
import math
import numpy as np
from collections import defaultdict
class MaxEnt:
def __init__(self, epsilon=1e-3, maxstep=100):
self.epsilon = epsilon
self.maxstep = maxstep
self.w = None # 特征函数的权重
self.labels = None # 标签
self.fea_list = [] # 特征函数
self.px = defaultdict(lambda: 0) # 经验边缘分布概率
self.pxy = defaultdict(lambda: 0) # 经验联合分布概率,由于特征函数为取值为0,1的二值函数,所以等同于特征的经验期望值
self.exp_fea = defaultdict(lambda: 0) # 每个特征在数据集上的期望
self.data_list = [] # 样本集,元素为tuple((X),y)
self.N = None # 样本总量
self.M = None # 某个训练样本包含特征的总数,这里假设每个样本的M值相同,即M为常数。其倒数类似于学习率
self.n_fea = None # 特征函数的总数
def init_param(self, X_data, y_data):
# 根据传入的数据集(数组)初始化模型参数
self.N = X_data.shape[0]
self.labels = np.unique(y_data)
self.fea_func(X_data, y_data)
self.n_fea = len(self.fea_list)
self.w = np.zeros(self.n_fea)
self._exp_fea(X_data, y_data)
return
def fea_func(self, X_data, y_data, rules=None):
# 特征函数
if rules is None: # 若没有特征提取规则,则直接构造特征,此时每个样本没有缺失值的情况下的特征个数相同,等于维度
for X, y in zip(X_data, y_data):
X = tuple(X)
self.px[X] += 1.0 / self.N # X的经验边缘分布
self.pxy[(X, y)] += 1.0 / self.N # X,y的经验联合分布
for dimension, val in enumerate(X):
key = (dimension, val, y)
if not key in self.fea_list:
self.fea_list.append(key) # 特征函数,由 维度+维度下的值+标签 构成的元组
self.M = X_data.shape[1]
else:
self.M = defaultdict(int) # 字典存储每个样本的特征总数
for i in range(self.N):
self.M[i] = X_data.shape[1]
pass # 根据具体规则构建
def _exp_fea(self, X_data, y_data):
# 计算特征的经验期望值
for X, y in zip(X_data, y_data):
for dimension, val in enumerate(X):
fea = (dimension, val, y)
self.exp_fea[fea] += self.pxy[(tuple(X), y)] # 特征存在取值为1,否则为0
return
def _py_X(self, X):
# 当前w下的条件分布概率,输入向量X和y的条件概率
py_X = defaultdict(float)
for y in self.labels:
s = 0
for dimension, val in enumerate(X):
tmp_fea = (dimension, val, y)
if tmp_fea in self.fea_list: # 输入X包含的特征
s += self.w[self.fea_list.index(tmp_fea)]
py_X[y] = math.exp(s)
normalizer = sum(py_X.values())
for key, val in py_X.items():
py_X[key] = val / normalizer
return py_X
def _est_fea(self, X_data, y_data):
# 基于当前模型,获取每个特征估计期望
est_fea = defaultdict(float)
for X, y in zip(X_data, y_data):
py_x = self._py_X(X)[y]
for dimension, val in enumerate(X):
est_fea[(dimension, val, y)] += self.px[tuple(X)] * py_x
return est_fea
def GIS(self,X_data,y_data):
# GIS算法更新delta
est_fea = self._est_fea(X_data, y_data)
delta = np.zeros(self.n_fea)
for j in range(self.n_fea):
try:
delta[j] = 1 / self.M * math.log(self.exp_fea[self.fea_list[j]] / est_fea[self.fea_list[j]])
except:
continue
delta = delta / delta.sum() # 归一化,防止某一个特征权重过大导致,后续计算超过范围
return delta
def IIS(self, delta, X_data, y_data):
# IIS算法更新delta
g = np.zeros(self.n_fea)
g_diff = np.zeros(self.n_fea)
for j in range(self.n_fea):
for k in range(self.N):
g[j] += self.px[tuple(X_data[k])] * self._py_X(X_data[k])[y_data[k]] * math.exp(delta[j] * self.M[k])
g_diff[j] += g[j] * self.M[k]
g[j] -= self.exp_fea[j]
delta[j] -= g[j] / g_diff[j]
return delta
def fit(self, X_data, y_data):
# 训练,迭代更新wi
self.init_param(X_data, y_data)
if isinstance(self.M, int):
i = 0
while i < self.maxstep:
i += 1
delta = self.GIS(X_data,y_data)
# if max(abs(delta)) < self.epsilon: # 所有的delta都小于阈值时,停止迭代
# break
self.w += delta
else:
i = 0
delta = np.random.rand(self.n_fea)
while i < self.maxstep:
i += 1
delta = self.IIS(delta, X_data, y_data)
# if max(abs(delta)) < self.epsilon:
# break
self.w += delta
return
def predict(self, X):
# 输入x(数组),返回条件概率最大的标签
y_p = self.predict_proba(X)
y_p = np.argmax(y_p,axis=1).reshape(-1,1)
return y_p
def predict_proba(self,X):
y_proba = []
for x in X:
py_x = self._py_X(x)
y_proba.append(list(py_x.values()))
return np.array(y_proba)
if __name__ == '__main__':
from sklearn.datasets import load_iris, load_digits
data = load_iris()
X_data = data['data']
y_data = data['target']
ME = MaxEnt(maxstep=2)
ME.fit(X_data,y_data)
y_p = ME.predict(X_data[-10:,:]) # 测试代码
y_proba = ME.predict_proba(X_data[-10:, :])
print(y_p)
print(y_proba)
环境:python3.7
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献3条内容
所有评论(0)