HOME/Articles/

1.数据准备

Article Outline

<center><b><font color=#A52A2A size=5 >公众号:数据挖掘与机器学习笔记</font></b></center>

word2vec

1.数据准备

import os
import jieba
import random
import numpy as np
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

# 参数设置
word_size = 64  # 词向量维度
window = 5  # 窗口大小
nb_negative = 25  # 随机负采样的样本数
min_count = 10  # 频数少于min_count的词会将被抛弃,低频词类似于噪声,可以抛弃掉
file_num = 10000 #只取file_num个文件进行训练


# 数据预处理
def get_all_apths(dirname):
    paths = []  # 将所有的txt文件路径存放在这个list中
    for maindir, subdir, file_name_list in os.walk(dirname):
        for filename in file_name_list:
            apath = os.path.join(maindir, filename)  # 合并成一个完整路径
            paths.append(apath)
    return paths


def get_corpus(file_path):
    words = []
    corpus = []
    i = 0
    for file in file_path:
        if ".txt" in file:
            i += 1
            try:
                with open(file, encoding="utf-8") as fr:
                    for line in fr:
                        words += jieba.lcut(line)
                        corpus.append(jieba.lcut(line))
            except Exception as e:
                print(e)
        if i == file_num:
            break

    words = dict(Counter(words))
    total = sum(words.values())
    words = {i: j for i, j in words.items() if j >= min_count}  # 去掉低频词
    id2word = {i + 2: j for i, j in enumerate(words)}
    id2word[0] = "PAD"
    id2word[1] = "UNK"
    word2id = {j: i for i, j in id2word.items()}
    return words, corpus, id2word, word2id


def get_negative_sample(x, word_range, neg_num):
    """
    负采样
    :param x:
    :param word_range:
    :param neg_num:
    :return:
    """
    negs = []
    while True:
        rand = random.randrange(0, word_range)
        if rand not in negs and rand != x:
            negs.append(rand)
        if len(negs) == neg_num:
            return negs


def data_generator(corpus, word2id, id2word):
    """
    生成训练数据
    :return:
    """
    x, y = [], []
    for sentence in corpus:
        sentence = [0] * window + [word2id[w] for w in sentence if w in word2id] + [0] * window
        # 上面这句代码的意思是,因为我们是通过滑窗的方式来获取训练数据的,那么每一句语料的第一个词和最后一个词
        # 如何出现在中心位置呢?答案就是给它padding一下,例如“我/喜欢/足球”,两边分别补窗口大小个pad,得到“pad pad 我 喜欢 足球 pad pad”
        # 那么第一条训练数据的背景词就是['pad', 'pad','喜欢', '足球'],中心词就是'我'
        for i in range(window, len(sentence) - window):
            x.append(sentence[i - window:i] + sentence[i + 1:window + i + 1])
            y.append([sentence[i]] + get_negative_sample(sentence[i], len(id2word), nb_negative))
    x, y = np.array(x), np.array(y)
    z = np.zeros((len(x), nb_negative + 1))
    z[:, 0] = 1
    return x, y, z


def get_train_test_data(x, y, z):
    X_train, X_test, y_train, y_test, z_train, z_test = train_test_split([x, y, z], test_size=0.2, random_state=42, shuffle=True)
    return X_train, X_test, y_train, y_test, z_train, z_test


#准备成pytorch的DataLoader格式,方便训练
class DatasetTorch(Dataset):
    def __init__(self, x, y, z):
        self.x = x
        self.y = y
        self.z = z[:, 1]  # torch使用交叉熵损失时,target不需要使用onehot

    def __len__(self):
        return self.x.shape[0]

    def __getitem__(self, index):
        return self.x[index], self.y[index], self.z[index]

#划分训练和测试数据
def get_train_test_dataloader(x, y, z, batch_size):
    """
    生成训练和测试数据的DataLoader
    :param x:
    :param y:
    :param z:
    :param batch_size:
    :return:
    """
    x_train, x_test, y_train, y_test, z_train, z_test = train_test_split(x, y, z, test_size=0.2, random_state=42, shuffle=True)
    train_dataset = DatasetTorch(x_train, y_train, z_train)
    test_dataset = DatasetTorch(x_test, y_test, z_test)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size)
    return train_dataloader, test_dataloader

2.keras实现CBOW模型

import tensorflow as tf
from data_helper import get_all_apths, get_corpus, data_generator
from data_helper import window, word_size, nb_negative

nb_epoch = 10  # 迭代次数

from tensorflow import keras


def build_model():
    """
        模型网络构建
    :return:
    """
    input_words = keras.layers.Input(shape=(window * 2,), dtype="int32")  # shape=(,window*2)
    input_vecs = keras.layers.Embedding(len(id2word), word_size, name="word2vec")(input_words)  # shape=(,window*2,word_size)
    input_vecs_sum = keras.layers.Lambda(lambda x: tf.reduce_sum(x, axis=1))(input_vecs)  # CBOW模型直接将上下文词向量求和 shape=(,word_size)

    # 第二个输入,中心词和负样本词
    samples = keras.layers.Input(shape=(nb_negative + 1,), dtype="int32")  # shape=(,nb_negative + 1)
    softmax_weights = keras.layers.Embedding(len(id2word), word_size, name="W")(samples)  # shape=(,nb_negative + 1,word_size)
    softmax_biases = keras.layers.Embedding(len(id2word), 1, name="b")(samples)  # shape=(,nb_negative + 1,1)

    # 将加和得到的词向量与中心词和负样本的词向量分别进行点乘
    input_vecs_sum_dot = keras.layers.Lambda(lambda x: tf.matmul(x[0], tf.expand_dims(x[1], 2)))([softmax_weights, input_vecs_sum])  # shape=(,nb_negative + 1,1)

    add_biases = keras.layers.Lambda(lambda x: tf.reshape(x[0] + x[1], shape=(-1, nb_negative + 1)))([input_vecs_sum_dot, softmax_biases])
    softmax = keras.layers.Lambda(lambda x: tf.nn.softmax(x))(add_biases)

    # 模型编译
    model = keras.layers.Model(inputs=[input_words, samples], outputs=softmax)
    model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    print(model.summary())
    return model


if __name__ == '__main__':
    file_dir = "F:\\data\\machine_learning\\THUCNews\\THUCNews"
    paths = get_all_apths(file_dir)
    print(len(paths), paths[0:10])

    words, corpus, id2word, word2id = get_corpus(paths)

    # print(words)
    # print(id2word)
    x, y, z = data_generator(corpus, word2id, id2word)
    print(x.shape, y.shape, z.shape)

    model = build_model()
    model.fit([x, y], z, epochs=nb_epoch, batch_size=512)

3. pytorch实现CBOW模型

import torch
from torch import nn
from torch.nn import Module, CrossEntropyLoss
from torch.optim import SGD
from data_helper import get_all_apths, get_corpus, data_generator, get_train_test_dataloader
from data_helper import window, word_size, nb_negative

nb_epoch = 10  # 迭代次数


class Word2VecCBOW(Module):
    def __init__(self, window, id2word, nb_negative, embedding_dim):
        """
            CBOW模型
        :param window:窗口大小
        :param id2word:
        :param nb_negative:负采样数量
        :param embedding_dim:词向量维度
        """
        super(Word2VecCBOW, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=len(id2word), embedding_dim=embedding_dim)
        self.window = window
        self.id2word = id2word
        self.nb_negative = nb_negative
        self.embedding_dim = embedding_dim

    def forward(self, input_words, negative_samples):
        """

        :param input_words: 上下文单词
        :param negative_samples:中心词和负采样单词
        :return:
        """
        input_vecs = self.embedding(input_words)  # shape=(,window*2,word_size)
        input_vecs_sum = torch.sum(input_vecs, dim=1)  # CBOW模型直接对上下文单词的嵌入进行求和操作 shape=(,word_size)

        negative_sample_vecs = self.embedding(negative_samples)  # shape=(,nb_negative + 1,word_size)

        out = torch.matmul(negative_sample_vecs, torch.unsqueeze(input_vecs_sum, dim=2))
        out = torch.squeeze(out)
        out = torch.softmax(out, dim=-1)
        return out


def train(model, train_dataloader, device, optimizer, crossEntropyLoss):
    model.train()
    train_loss = 0.0
    for i, data in enumerate(train_dataloader):
        x_train, y_train, z_train = data
        x_train, y_train, z_train = x_train.to(torch.long).to(device), y_train.to(torch.long).to(device), z_train.to(torch.long).to(device)
        optimizer.zero_grad()  # 梯度清零
        z_predict = model(x_train, y_train)  # (batch_size,51)
        loss = crossEntropyLoss(z_predict, z_train)
        loss.backward()  # 梯度反向传播
        optimizer.step()  # 梯度更新
        train_loss += loss.item()
        # if i % 10 == 0:
        #     print(loss.item())
    return train_loss / i


def test(model, test_dataloader, device, crossEntropyLoss):
    model.eval()
    test_loss = 0.0
    for i, data in enumerate(test_dataloader):
        x_test, y_test, z_test = data
        x_test, y_test, z_test = x_test.to(torch.long).to(device), y_test.to(torch.long).to(device), z_test.to(torch.long).to(device)
        z_predict = model(x_test, y_test)  # (batch_size,51)
        loss = crossEntropyLoss(z_predict, z_test)
        test_loss += loss.item()

    return test_loss / i


def train_test(epochs, batch_size):
    file_dir = "F:\\data\\machine_learning\\THUCNews\\THUCNews"
    paths = get_all_apths(file_dir)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    words, corpus, id2word, word2id = get_corpus(paths)

    x, y, z = data_generator(corpus, word2id, id2word)
    print(x.shape, y.shape, z.shape)
    train_dataloader, test_dataloader = get_train_test_dataloader(x, y, z, batch_size=batch_size)
    loss_fun = CrossEntropyLoss()
    cbow = Word2VecCBOW(window, id2word, nb_negative, word_size)
    cbow.to(device)
    optimizer = SGD(cbow.parameters(), lr=0.01)

    print("------开始训练------:", device)
    for epoch in range(1, epochs + 1):
        train_loss = train(cbow, train_dataloader, device, optimizer, loss_fun)
        test_loss = test(cbow, test_dataloader, device, loss_fun)
        print("epoch %d, train loss: %.2f, test loss:%.2f" % (epoch, train_loss, test_loss))

    torch.save(cbow, "../models/cbow_w2v.pkl")


if __name__ == '__main__':
    # train_test(nb_epoch, 32) #训练、测试
    cbow = torch.load("../models/cbow_w2v.pkl")  # 加载模型
    print(cbow.embedding.weight.shape)  # 提取训练好的Embedding

在这里插入图片描述