开发者社区 > ModelScope模型即服务 > 语音 > 正文

PyTorch Transformer 英中翻译超详细教程

在这篇文章中,我将详细地介绍一个英中翻译 Transformer 的 PyTorch 实现。这篇文章会完整地展示一个深度学习项目的搭建过程,从数据集准备,到模型定义、训练。这篇文章不仅会讲解如何把 Transformer 的论文翻译成代码,还会讲清楚代码实现中的诸多细节。相信初学者能够从这篇文章中学到丰富的知识。

如果你只对 Transformer 感兴趣,而不想涉及过多的 NLP 内容,可以选读模型实现部分。

项目网址: https://github.com/SingleZombie/DL-Demos/pull/21

如果你对 Transformer 的论文不熟,欢迎阅读我之前的文章:Attention Is All You Need (Transformer) 论文精读

任务与数据集准备

在开启一个深度学习项目之初,要把任务定义好。准确来说,我们要明白这个任务是在完成一个怎样的映射,并准备一个用于评测的数据集,定义好评价指标。

英中翻译,这个任务非常明确,就是把英文的句子翻译成中文。英中翻译的数据集应该包含若干个句子对,每个句子对由一句英文和它对应的中文翻译组成。

中英翻译的数据集不是很好找。有几个比较出名的数据集的链接已经失效了,还有些数据集需要注册与申请后才能获取。我在中文NLP语料库仓库(https://github.com/brightmart/nlp_chinese_corpus)找到了中英文平行语料 translation2019zh。该语料库由520万对中英文语料构成,训练集516万对,验证集3.9万对。用作训练和验证中英翻译模型是足够了。

机器翻译的评测指标叫做BLEU Score。如果模型输出的翻译和参考译文有越多相同的单词、连续2个相同单词、连续3个相同单词……,则得分越高。

PyTorch 提供了便捷的API,我们可以用一行代码算完BLEU Score。

>>> from torchtext.data.metrics import bleu_score
>>> candidate_corpus = [['My', 'full', 'pytorch', 'test'], ['Another', 'Sentence']]
>>> references_corpus = [[['My', 'full', 'pytorch', 'test'], ['Completely', 'Different']], [['No', 'Match']]]
>>> bleu_score(candidate_corpus, references_corpus)
    0.8408964276313782

数据集预处理

数据清洗

得到数据集后,下一步要做的是对数据集做处理,把原始数据转化成能够输入神经网络的张量。对于图片,预处理可能是裁剪、缩放,使所有图片都有一样的大小;对于文本,预处理可能是分词、填充。

网盘上下载好 translation2019zh 数据集后,我们来一步一步清洗这个数据集。这个数据集只有两个文件translation2019zh_train.json, translation2019zh_valid.json,它们的结构如下:

{"english": <english>, "chinese": <chinese>}
{"english": <english>, "chinese": <chinese>}
{"english": <english>, "chinese": <chinese>}
...

这些json文件有点不合标准,每对句子由一行json格式的记录组成。english属性是英文句子,chinese属性是中文句子。比如:

{"english": "In Italy ...", "chinese": "在意大利 ..."}

因此,在读取数据时,我们可以用下面的代码提取每对句子。

import json

with open(json_path, 'r') as fp:
    for line in fp:
        line = json.loads(line)
        english, chinese = line['english'], line['chinese']

这个数据集有一点不干净,有一些句子对的中英文句子颠倒过来了。为此,我们要稍微处理一下,把这些句子对翻转过来。如果一个英文句子不全由 ASCII 组成,则它可能是一个被标错的中文句子。

# Correct mislabeled data
    if not english.isascii():
        english, chinese = chinese, english

经过这一步,我们只得到了中英文的字符文本。而在NLP中,大部分处理的最小单位都是符号(token)——对于英文来说,符号是单词、标点;对于中文来说,符号是词语、标点。我们还需要一个符号化的过程。

英文符号化非常方便,torchtext 提供了非常便捷的英文分词 API。

from torchtext.data import get_tokenizer

tokenizer = get_tokenizer('basic_english')
english = tokenizer(english)

而中文分词方面,我使用了jieba库。该库可以直接 pip 安装。

pip install jieba

分词的 API 是 jieba.cut。由于分词的结果中,相邻的词之间有空格,我一股脑地把所有空白符给过滤掉了。

import jieba
chinese = list(jieba.cut(chinese))
chinese = [x for x in chinese if x not in {' ', '\t'}]

经过这些处理后,每句话被转换成了中文词语或英文单词的数组。整个处理代码如下:

def read_file(json_path):
    english_sentences = []
    chinese_sentences = []
    tokenizer = get_tokenizer('basic_english')
    with open(json_path, 'r') as fp:
        for line in fp:
            line = json.loads(line)
            english, chinese = line['english'], line['chinese']
            # Correct mislabeled data
            if not english.isascii():
                english, chinese = chinese, english
            # Tokenize
            english = tokenizer(english)
            chinese = list(jieba.cut(chinese))
            chinese = [x for x in chinese if x not in {' ', '\t'}]
            english_sentences.append(english)
            chinese_sentences.append(chinese)
    return english_sentences, chinese_sentences

词语转序号

为了让计算机更方便地处理单词,我们还要把单词转换成序号。比如令apple为0号,banana为1号,则句子apple banana apple就转换成了0 1 0。

给每一个单词选一个标号,其实就是要建立一个词典。一般来说,我们可以利用他人的统计结果,挑选最常用的一些英文单词和中文词语构成词典。不过,现在我们已经有了一个庞大的中英语料库了,我们可以直接从这个语料库中挑选出最常见的词构成词典。

根据上一步处理得到的句子数组sentences,我们可以用下面的 Python 代码统计出最常见的一些词语,把它们和4个特殊字符, , , (句子开始字符、句子结束字符、频率太少没有被加入词典的词语、填充字符)一起构成词典。统计字符出现次数是通过 Python 的 Counter 类实现的。

from collections import Counter

def create_vocab(sentences, max_element=None):
    """Note that max_element includes special characters"""

    default_list = ['<sos>', '<eos>', '<unk>', '<pad>']

    char_set = Counter()
    for sentence in sentences:
        c_set = Counter(sentence)
        char_set.update(c_set)

    if max_element is None:
        return default_list + list(char_set.keys())
    else:
        max_element -= 4
        words_freq = char_set.most_common(max_element)
        # pair array to double array
        words, freq = zip(*words_freq)
        return default_list + list(words)

准备好了词典后,我还编写了两个工具函数sentence_to_tensor,tensor_to_sentence,它们可以用于字符串数组与序号数组的互相转换。测试这些代码的脚本及其输出如下:

# Dataset.py

def main():
    en_sens, zh_sens = read_file(
        'data/translation2019zh/translation2019zh_valid.json')
    print(*en_sens[0:3])
    print(*zh_sens[0:3])
    en_vocab = create_vocab(en_sens, 10000)
    zh_vocab = create_vocab(zh_sens, 30000)
    print(list(en_vocab)[0:10])
    print(list(zh_vocab)[0:10])

    en_tensors = sentence_to_tensor(en_sens, en_vocab)
    zh_tensors = sentence_to_tensor(zh_sens, zh_vocab)

    print(tensor_to_sentence(en_tensors[0], en_vocab, True))
    print(tensor_to_sentence(zh_tensors[0], zh_vocab))

['slowly', 'and', 'not', 'without', 'struggle', ',', 'america', 'began', 'to', 'listen', '.'] ...]
['美国', '缓慢', '地', '开始', '倾听', ',', '但', '并非', '没有', '艰难曲折', '。'] ...]
['<sos>', '<eos>', '<unk>', '<pad>', 'the', '.', ',', 'of', 'and', 'to']
['<sos>', '<eos>', '<unk>', '<pad>', '的', ',', '。', '在', '了', '和']
slowly and not without struggle , america began to listen .
美国缓慢地开始倾听,但并非没有<unk>。

在这一步中,有一个重要的参数:词典的大小。显然,词典越大,能处理的词语越多,但训练速度也会越慢。由于这个项目只是一个用于学习的demo,我设置了比较小的词典大小。想提升整个模型的性能的话,调大词典大小是一个最快的方法。

生成 Dataloader

都说程序员是新时代的农民工,这非常有道理。因为,作为程序员,你免不了要写一些繁重、无聊的数据处理脚本。还好,写完这些无聊的预处理代码后,总算可以使用 PyTorch 的 API 写一些有趣的代码了。

把词语数组转换成序号句子数组后,我们要考虑怎么把序号句子数组输入给模型了。文本数据通常长短不一,为了一次性处理一个 batch 的数据,要把短的句子填充,使得一批句子长度相等。写 Dataloader 时最主要的工作就是填充并对齐句子。

先看一下Dataset的写法。上一步得到的序号句子数组可以塞进Dataset里。注意,每个句子的前后要加上表示句子开始和结束的特殊符号。

SOS_ID = 0
EOS_ID = 1
UNK_ID = 2
PAD_ID = 3

class TranslationDataset(Dataset):

    def __init__(self, en_tensor: np.ndarray, zh_tensor: np.ndarray):
        super().__init__()
        assert len(en_tensor) == len(zh_tensor)
        self.length = len(en_tensor)
        self.en_tensor = en_tensor
        self.zh_tensor = zh_tensor

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        x = np.concatenate(([SOS_ID], self.en_tensor[index], [EOS_ID]))
        x = torch.from_numpy(x)
        y = np.concatenate(([SOS_ID], self.zh_tensor[index], [EOS_ID]))
        y = torch.from_numpy(y)
        return x, y

接下来看一下 DataLoader 的写法。在创建 Dataloader 时,最重要的是 collate_fn 的编写,这个函数决定了怎么把多条数据合成一个等长的 batch。

def get_dataloader(en_tensor: np.ndarray,
                   zh_tensor: np.ndarray,
                   batch_size=16):

    def collate_fn(batch):
        ...

    dataset = TranslationDataset(en_tensor, zh_tensor)
    dataloader = DataLoader(dataset,
                            batch_size=batch_size,
                            shuffle=True,
                            collate_fn=collate_fn)

    return dataloader

collate_fn 的输入是多个 dataset __getitem__ 的返回结果构成的数组。对于我们的 dataset 来说,collate_fn 的输入是 [(x1, y1), (x2, y2), ...] 。我们可以用 zip(*batch) 把二元组数组拆成两个数组 x, y 。

collate_fn 的输出就是将来 dataloader 的输出。PyTorch 提供了 pad_sequence 函数用来把一批数据填充至等长。

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    x, y = zip(*batch)
    x_pad = pad_sequence(x, batch_first=True, padding_value=PAD_ID)
    y_pad = pad_sequence(y, batch_first=True, padding_value=PAD_ID)

    return x_pad, y_pad

Transformer 模型

准备好 DataLoader 后,接下来就要进入这个项目最重要的部分——Transformer 模型实现了。我将按照代码的执行顺序,从前往后,自底向上地介绍 Transformer 的各个模块:Positional Encoding, MultiHeadAttention, Encoder & Decoder, 最后介绍如何把各个模块拼到一起。在这个过程中,我还会着重介绍一个论文里没有提及,但是代码实现时非常重要的一个细节——字符的处理。

说实话,用 PyTorch 实现 Transformer 没有什么有变数的地方,大家的代码写得都差不多,我也是参考着别人的教程写的。但是,Transformer 的代码实现中有很多坑。大部分人只会云淡风轻地介绍一下最终的代码成品,不会去讲他们 debug 耗费了多少时间,哪些地方容易出错。而我会着重讲一下代码中的一些细节,以及我碰到过的问题。

Positional Encoding

image

模型一开始是一个 Embedding 层加一个 Positional Encoding。Embedding 在 PyTorch 里已经有实现了,且不是文章的创新点,我们就直接来看 Positional Encoding 的写法。

求 Positional Encoding,其实就是求一个二元函数的许多函数值构成的矩阵。对于二元函数��(���,�),我们要求出���∈[0,������−1],�∈[0,������−1]时所有的函数值,其中,������是该序列的长度,������是每一个词向量的长度。

理论上来说,每个句子的序列长度是不固定的,因此,每次都要用到不同的 Positional Encoding。但是,Positional Encoding 的值都是可以提前计算好的。我们可以设置一个很大的最大序列长度,预处理好这个超大的Positional Encoding矩阵。每次有句子输入进来,根据这个句子的序列长度,去预处理好的矩阵里取一小段出来即可。

这样,整个类的实现应该如下所示:

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, max_seq_len: int):
        super().__init__()

        # Assume d_model is an even number for convenience
        assert d_model % 2 == 0

        i_seq = torch.linspace(1, max_seq_len, max_seq_len)
        j_seq = torch.linspace(2, d_model, d_model // 2)
        pos, two_i = torch.meshgrid(i_seq, j_seq)
        pe_2i = torch.sin(pos / 10000**(two_i / d_model))
        pe_2i_1 = torch.cos(pos / 10000**(two_i / d_model))
        pe = torch.stack((pe_2i, pe_2i_1), 2).reshape(1, max_seq_len, d_model)

        self.register_buffer('pe', pe, False)

    def forward(self, x: torch.Tensor):
        n, seq_len, d_model = x.shape
        pe: torch.Tensor = self.pe
        assert seq_len <= pe.shape[1]
        assert d_model == pe.shape[2]
        x *= d_model**0.5
        return x + pe[:, 0:seq_len, :]

代码中有不少需要讲解的部分。首先,先看一下预处理好的矩阵pe是怎么在__init__中算出来的。pe可以很直接地用两层循环算出来。由于这段预处理代码只会执行一次,相对于冗长的训练时间,哪怕生成pe的代码性能差一点也没事。然而,作为一个编程高手,我必须用高性能的并行编程把这段逻辑实现。

为了并行地求pe,我们要初始化一个二维网格,表示自变量���,�。生成网格可以用下面的代码实现。(由于�要分奇偶讨论,�的个数是������2)

i_seq = torch.linspace(1, max_seq_len, max_seq_len)
j_seq = torch.linspace(2, d_model, d_model // 2)
pos, two_i = torch.meshgrid(i_seq, j_seq)

torch.meshgrid用于生成网格。比如torch.meshgrid([0, 1], [0, 1])就可以生成[[(0, 0), (0, 1)], [(1, 0), (1, 1)]]这四个坐标构成的网格。不过,这个函数会把坐标的两个分量分别返回。比如:

i, j = torch.meshgrid([0, 1], [0, 1])
# i: [[0, 0], [1, 1]]
# j: [[0, 1], [0, 1]]

利用这个函数的返回结果,我们可以并行地分别算出奇偶位置的 PE 值。

pe_2i = torch.sin((pos / 10000)**(two_i / d_model))
pe_2i_1 = torch.cos((pos / 10000)**(two_i / d_model))

有了奇偶处的值,现在的问题是怎么把它们优雅地拼到同一个维度上。我这里先把它们堆成了形状为seq_len, d_model/2, 2的一个张量,再把最后一维展平,就得到了最后的pe矩阵。

pe = torch.stack((pe_2i, pe_2i_1), 2).reshape(1, max_seq_len, d_model)

最后,要注意一点。只用 self.pe = pe 记录这个量是不够好的。我们最好用 self.register_buffer('pe', pe, False) 把这个量登记成 torch.nn.Module 的一个存储区,同时也会完成self.pe = pe。这里涉及到 PyTorch 的一些知识了。

PyTorch 的 Module 会记录两类参数,一类是 parameter 可学习参数;另一类是 buffer 不可学习的参数。把变量登记成 buffer 的最大好处是,在使用 model.to(device) 把一个模型搬到另一个设备上时,所有 parameter 和 buffer 都会自动被搬过去。另外,buffer 和 parameter 一样,也可以被记录到 state_dict 中,并保存到文件里。register_buffer 的第三个参数决定了是否将变量加入 state_dict。由于 pe 可以直接计算,不需要记录,可以把这个参数设成 False。

预处理好 pe 后,用起来就很方便了。每次读取输入的序列长度,从中取一段出来即可。

另外,ransformer 给嵌入层乘了个系数������。为了方便起见,我把这个系数放到了 PositionalEncoding 类里面。

def forward(self, x: torch.Tensor):
    n, seq_len, d_model = x.shape
    pe: torch.Tensor = self.pe
    assert seq_len <= pe.shape[1]
    assert d_model == pe.shape[2]
    x *= d_model**0.5
    return x + pe[:, 0:seq_len, :]

Scaled Dot-Product Attention

image

下一步是多头注意力层。为了实现多头注意力,我们先要实现 Transformer 里经典的注意力计算。而在讲注意力计算之前,我还要补充一下 Transformer 中有关 mask 的一些知识。

Transformer 里的 mask

Transformer 最大的特点就是其并行性。给定翻译好的第1~n个词语,它默认会并行地预测第2~(n+1)个下一个词语。为了模拟串行输出的情况,第�个词语不应该看到第�+1个词语之后的信息。

输入信息输出
(y1, --, --, --)y2
(y1, y2, --, --)y3
(y1, y2, y3, --)y4
(y1, y2, y3, y4)y5

为了实现这一功能,Transformer 使用了掩码。掩码取1表示这个地方的数是有效的,取0表示这个地方的数是无效的。Decoder 里的这种掩码应该是一个上三角全1矩阵。

掩码是在注意力计算中生效的。对于掩码取0的区域,其softmax前的���值取负无穷。这是因为,对于softmax

�������=�����1+��2+...���,

令��=−∞可以让它在 softmax 的分母里不产生任何贡献。

这是论文里提到的 mask,它用来模拟 decoder 的串行推理。而在代码实现中,还有其他地方会产生 mask。之前,我们在生成一个 batch 的数据时,给很多句子加入了 。这个特殊字符也没有实际意义,不应该对计算产生任何贡献。因此,有  的地方的 mask 也应该为0。之后讲 Transformer 模型类时,我会介绍所有的 mask 该怎么生成,这里我们仅关注注意力计算是怎么用到 mask 的。

注意力计算

补充完了背景知识,我们来看注意力计算的实现代码。由于注意力计算没有任何的状态,因此它应该写成一个函数,而不是一个类。我们可以轻松地用 PyTorch 代码翻译注意力计算的公式。(注意,我这里的 mask 表示哪些地方要填负无穷,而不是像之前讲的表示哪些地方有效)

���������(�,�,�)=�������(�����)�

MY_INF = 1e12

def attention(q: torch.Tensor,
              k: torch.Tensor,
              v: torch.Tensor,
              mask: Optional[torch.Tensor] = None):
    '''
    # Note: The dtype of mask must be bool
    '''
    # q shape: [n, heads, q_len, d_k]
    # k v shape: [n, heads, k_len, d_v]
    assert q.shape[-1] == k.shape[-1]
    d_k = k.shape[-1]
    # tmp shape: [n, heads, q_len, k_len]
    tmp = torch.matmul(q, k.transpose(-2, -1)) / d_k**0.5
    if mask is not None:
        tmp.masked_fill_(mask, -MY_INF)
    tmp = F.softmax(tmp, -1)
    # tmp shape: [n, heads, q_len, d_v]
    tmp = torch.matmul(tmp, v)
    return tmp

这里有一个很坑的地方。引入了  带来的 mask 后,会产生一个新的问题:可能一整行数据都是失效的,softmax 用到的所有 �� 可能都是负无穷。

�������=�−∞�−∞+�−∞+...�−∞

这个数据是没有意义的。如果用torch.inf来表示无穷大,就会令exp(torch.inf)=0,最后 softmax 结果会出现 NaN。

按道理讲,若一行全是失效的数据,则它们的 softmax 结果应该全填零。所以,可以直接判断一下计算结果中有 NaN 的地方,把它们赋值成0。

但是,大多数 PyTorch Transformer 教程压根就没提这一点,而他们的代码又还是能够跑通。拿放大镜仔细对比了代码后,我发现,他们的无穷大用的不是 torch.inf,而是自己随手设的一个极大值。这样,exp(MY_INF)得到的不再是0,而是一个极小值。softmax 的结果就会等于分母的项数,而不是 NaN。讲道理,这些 softmax 结果是要填 0 的,填一个常数问题倒也不是很大。出于随大流的考虑,我也使用了这种懒散的实现方式,用了一个假的无穷大。

Multi-Head Attention

image

有了注意力计算,就可以实现多头注意力层了。多头注意力层是有学习参数的,它应该写成一个类。

class MultiHeadAttention(nn.Module):

    def __init__(self, heads: int, d_model: int, dropout: float = 0.1):
        super().__init__()

        assert d_model % heads == 0
        self.d_k = d_model // heads
        self.heads = heads
        self.d_model = d_model
        self.q = nn.Linear(d_model, d_model)
        self.k = nn.Linear(d_model, d_model)
        self.v = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self,
                q: torch.Tensor,
                k: torch.Tensor,
                v: torch.Tensor,
                mask: Optional[torch.Tensor] = None):
        # batch should be same
        assert q.shape[0] == k.shape[0]
        assert q.shape[0] == v.shape[0]
        # the sequence length of k and v should be aligned
        assert k.shape[1] == v.shape[1]

        n, q_len = q.shape[0:2]
        n, k_len = k.shape[0:2]
        q = self.q(q).reshape(n, q_len, self.heads, self.d_k).transpose(1, 2)
        k = self.k(k).reshape(n, k_len, self.heads, self.d_k).transpose(1, 2)
        v = self.v(v).reshape(n, k_len, self.heads, self.d_k).transpose(1, 2)

        attention_res = attention(q, k, v, mask)
        concat_res = attention_res.transpose(1, 2).reshape(
            n, q_len, self.d_model)
        concat_res = self.dropout(concat_res)

        output = self.out(concat_res)
        return output

这段代码一处很灵性的地方。在 Transformer 的论文中,多头注意力是先把每个词的表示拆成ℎ个头,再对每份做投影、注意力,最后拼接起来,再投影一次。其实,拆开与拼接操作是多余的。我们可以一开始就让所有头的数据经过同一个线性层,再在做注意力之前把头和序列数这两维转置一下。这两步操作和拆开来做投影、注意力是等价的。做完了注意力操作之后,再把两个维度转置回来,这和拼接操作是等价的。

def forward(self,
                q: torch.Tensor,
                k: torch.Tensor,
                v: torch.Tensor,
                mask: Optional[torch.Tensor] = None):
        # batch should be same
        assert q.shape[0] == k.shape[0]
        assert q.shape[0] == v.shape[0]
        # the sequence length of k and v should be aligned
        assert k.shape[1] == v.shape[1]

        n, q_len = q.shape[0:2]
        n, k_len = k.shape[0:2]
        q = self.q(q).reshape(n, q_len, self.heads, self.d_k).transpose(1, 2)
        k = self.k(k).reshape(n, k_len, self.heads, self.d_k).transpose(1, 2)
        v = self.v(v).reshape(n, k_len, self.heads, self.d_k).transpose(1, 2)

        attention_res = attention(q, k, v, mask)
        concat_res = attention_res.transpose(1, 2).reshape(
            n, q_len, self.d_model)
        concat_res = self.dropout(concat_res)

        output = self.out(concat_res)
        return output

前馈网络

image

前馈网络太简单了,两个线性层,没什么好说的。注意内部那个隐藏层的维度大小���会更大一点。

class FeedForward(nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.layer1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.layer2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.layer1(x)
        x = self.dropout(F.relu(x))
        x = self.layer2(x)
        return x

Encoder & Decoder

准备好一切组件后,就可以把模型一层一层搭起来了。先搭好每个 Encoder 层和 Decoder 层,再拼成 Encoder 和 Decoder。

Encoder 层和 Decoder 层的结构与论文中的描述一致,且每个子层后面都有一个 dropout,和上一层之间使用了残差连接。归一化的方法是 LayerNorm。顺带一提,不仅是这些层,前面很多子层的计算中都加入了 dropout。

再补充说明一下 mask。由于 encoder 和 decoder 的输入不同,它们的填充情况不同,产生的 mask 也不同。后文会展示这些 mask 的生成方法。

class EncoderLayer(nn.Module):

    def __init__(self,
                 heads: int,
                 d_model: int,
                 d_ff: int,
                 dropout: float = 0.1):
        super().__init__()
        self.self_attention = MultiHeadAttention(heads, d_model, dropout)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, src_mask: Optional[torch.Tensor] = None):
        x = self.norm1(x +
                       self.dropout1(self.self_attention(x, x, x, src_mask)))
        x = self.norm2(x + self.dropout2(self.ffn(x)))
        return x

class DecoderLayer(nn.Module):

    def __init__(self,
                 heads: int,
                 d_model: int,
                 d_ff: int,
                 dropout: float = 0.1):
        super().__init__()
        self.self_attention = MultiHeadAttention(heads, d_model, dropout)
        self.attention = MultiHeadAttention(heads, d_model, dropout)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self,
                x,
                encoder_kv: torch.Tensor,
                dst_mask: Optional[torch.Tensor] = None,
                src_dst_mask: Optional[torch.Tensor] = None):
        x = self.norm1(x +
                       self.dropout1(self.self_attention(x, x, x, dst_mask)))
        x = self.norm2(x + self.dropout2(
            self.attention(x, encoder_kv, encoder_kv, src_dst_mask)))
        x = self.norm3(x + self.dropout2(self.ffn(x)))
        return x

Encoder 和 Decoder 就是再前面加了一个嵌入层、一个位置编码,再把多个层堆起来了而已,其他输入输出照搬。

class Encoder(nn.Module):

    def __init__(self,
                 vocab_size: int,
                 d_model: int,
                 d_ff: int,
                 n_layers: int,
                 heads: int,
                 dropout: float = 0.1,
                 max_seq_len: int = 120):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model, max_seq_len)
        self.layers = []
        for i in range(n_layers):
            self.layers.append(EncoderLayer(heads, d_model, d_ff, dropout))
        self.layers = nn.ModuleList(self.layers)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, src_mask: Optional[torch.Tensor] = None):
        x = self.embedding(x)
        x = self.dropout(x)
        x = self.pe(x)
        x = self.dropout(x)
        for layer in self.layers:
            x = layer(x, src_mask)
        return x


class Decoder(nn.Module):

    def __init__(self,
                 vocab_size: int,
                 d_model: int,
                 d_ff: int,
                 n_layers: int,
                 heads: int,
                 dropout: float = 0.1,
                 max_seq_len: int = 120):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model, max_seq_len)
        self.layers = []
        for i in range(n_layers):
            self.layers.append(DecoderLayer(heads, d_model, d_ff, dropout))
        self.layers = nn.Sequential(*self.layers)
        self.dropout = nn.Dropout(dropout)

    def forward(self,
                x,
                encoder_kv,
                dst_mask: Optional[torch.Tensor] = None,
                src_dst_mask: Optional[torch.Tensor] = None):
        x = self.embedding(x)
        x = self.dropout(x)
        x = self.pe(x)
        x = self.dropout(x)
        for layer in self.layers:
            x = layer(x, encoder_kv, dst_mask, src_dst_mask)
        return x

Transformer 类

image

终于,激动人心的时候到来了。我们要把各个子模块组成变形金刚(Transformer)了。先过一遍所有的代码。

class Transformer(nn.Module):

    def __init__(self,
                 src_vocab_size: int,
                 dst_vocab_size: int,
                 d_model: int,
                 d_ff: int,
                 n_layers: int,
                 heads: int,
                 dropout: float = 0.1,
                 max_seq_len: int = 200):
        super().__init__()
        self.encoder = Encoder(src_vocab_size, d_model, d_ff, n_layers, heads,
                               dropout, max_seq_len)
        self.decoder = Decoder(dst_vocab_size, d_model, d_ff, n_layers, heads,
                               dropout, max_seq_len)
        self.heads = heads

    def init_weights(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def generate_mask(self,
                      q_pad: torch.Tensor,
                      k_pad: torch.Tensor,
                      with_left_mask: bool = False):
        # q_pad shape: [n, q_len]
        # k_pad shape: [n, k_len]
        # q_pad k_pad dtype: bool
        assert q_pad.device == k_pad.device
        n, q_len = q_pad.shape
        n, k_len = k_pad.shape

        mask_shape = (n, self.heads, q_len, k_len)
        if with_left_mask:
            mask = 1 - torch.tril(torch.ones(mask_shape))
        else:
            mask = torch.zeros(mask_shape)
        mask = mask.to(q_pad.device)
        for i in range(n):
            mask[i, :, q_pad[i], :] = 1
            mask[i, :, :, k_pad[i]] = 1
        mask = mask.to(torch.bool)
        return mask

    def forward(self,
                x,
                y,
                src_pad_mask: Optional[torch.Tensor] = None,
                dst_pad_mask: Optional[torch.Tensor] = None):
        src_mask = self.generate_mask(src_pad_mask, src_pad_mask, False)
        dst_mask = self.generate_mask(dst_pad_mask, dst_pad_mask, True)
        src_dst_mask = self.generate_mask(dst_pad_mask, src_pad_mask, False)
        encoder_kv = self.encoder(x, src_mask)
        res = self.decoder(y, encoder_kv, dst_mask, src_dst_mask)
        embedding_reverse = self.decoder.embedding.weight.transpose(0, 1)
        res = torch.matmul(res, embedding_reverse)
        return res

我们一点一点来看。先看初始化函数。初始化函数的输入其实就是 Transformer 模型的超参数。总结一下,Transformer 应该有这些超参数:

  • d_model 模型中大多数词向量表示的维度大小

  • d_ff 前馈网络隐藏层维度大小

  • n_layers 堆叠的 Encoder & Decoder 层数

  • head 多头注意力的头数

  • dropout Dropout 的几率

另外,为了构件嵌入层,要知道源语言、目标语言的词典大小。为了预处理位置编码,需要提前知道一个最大序列长度。

照着子模块的初始化参数表,把参数归纳到__init__的参数表里即可。

def __init__(self,
            src_vocab_size: int,
            dst_vocab_size: int,
            d_model: int,
            d_ff: int,
            n_layers: int,
            heads: int,
            dropout: float = 0.1,
            max_seq_len: int = 200):
    super().__init__()
    self.encoder = Encoder(src_vocab_size, d_model, d_ff, n_layers, heads,
                            dropout, max_seq_len)
    self.decoder = Decoder(dst_vocab_size, d_model, d_ff, n_layers, heads,
                            dropout, max_seq_len)
    self.heads = heads

再看一下 forward 函数。forward先预处理好了所有的 mask,再逐步执行 Transformer 的计算:先是通过 Encoder 获得源语言的中间表示,再把它和目标语言的输入一起传入 Decoder,最后经过线性层输出结果。由于 PyTorch 的交叉熵损失函数自带了 softmax 操作,这里不需要多此一举。

当然,你去对着上面这段描述看下面的代码,会感到疑惑:输出前哪有线性层?embedding_reverse 又是什么东西?Transformer 论文提到,softmax 前的那个线性层可以和嵌入层共享权重,也就是说它们分别完成了词序号到词嵌入的正反映射。因此,我尊重原论文,用嵌入层的权重的转置完成了输出前的线性运算。不知道为什么网上大多数的实现都重新定义了一个线性层,可能是二者的区别不大吧。

def forward(self,
            x,
            y,
            src_pad_mask: Optional[torch.Tensor] = None,
            dst_pad_mask: Optional[torch.Tensor] = None):
    src_mask = self.generate_mask(src_pad_mask, src_pad_mask, False)
    dst_mask = self.generate_mask(dst_pad_mask, dst_pad_mask, True)
    src_dst_mask = self.generate_mask(dst_pad_mask, src_pad_mask, False)
    encoder_kv = self.encoder(x, src_mask)
    res = self.decoder(y, encoder_kv, dst_mask, src_dst_mask)
    embedding_reverse = self.decoder.embedding.weight.transpose(0, 1)
    res = torch.matmul(res, embedding_reverse)
    return res

等了很久,现在可以来仔细看一看 mask 的生成方法了。回忆一下,表示该字符是否有效的 mask 有两个来源。第一个是论文里提到的,用于模拟串行推理的 mask;另一个是填充操作的空白字符引入的 mask。generate_mask 用于生成这些 mask。

generate_mask 的输入有 query 句子和 key 句子的 pad mask q_pad, k_pad,它们的形状为[n, seq_len]。若某处为 True,则表示这个地方的字符是。对于自注意力,query 和 key 都是一样的;而在 Decoder 的第二个多头注意力层中,query 来自目标语言,key 来自源语言。with_left_mask 表示是不是要加入 Decoder 里面的那种 mask。

def generate_mask(self,
                q_pad: torch.Tensor,
                k_pad: torch.Tensor,
                with_left_mask: bool = False):

一开始,先取好维度信息,定好张量的形状。在注意力操作中,softmax 前的那个量的形状是 [n, heads, q_len, k_len],表示每一批每一个头的每一个query对每个key之间的相似度。mask 的形状应和它一样。

再新建一个表示最终 mask 的张量。如果不用 Decoder 的那种 mask,就生成一个全零的张量;否则,生成一个上三角为0,其余地方为1的张量。注意,在我的代码中,mask 为 True 或1就表示这个地方需要填负无穷。

最后,把有  的地方也标记一下。从形状[n, heads, q_len, k_len]可以知道,q_pad 表示哪些行是无效的,k_pad 表示哪些列是无效的。把对应的行或列赋值成1即可。

def generate_mask(self,
                    q_pad: torch.Tensor,
                    k_pad: torch.Tensor,
                    with_left_mask: bool = False):
    # q_pad shape: [n, q_len]
    # k_pad shape: [n, k_len]
    # q_pad k_pad dtype: bool
    assert q_pad.device == k_pad.device
    n, q_len = q_pad.shape
    n, k_len = k_pad.shape

    mask_shape = (n, self.heads, q_len, k_len)
    if with_left_mask:
        mask = 1 - torch.tril(torch.ones(mask_shape))
    else:
        mask = torch.zeros(mask_shape)
    mask = mask.to(q_pad.device)
    for i in range(n):
        mask[i, :, q_pad[i], :] = 1
        mask[i, :, :, k_pad[i]] = 1
    mask = mask.to(torch.bool)
    return mask

在 Transformer 中,有三类多头注意力层,它们的 mask 也不同。Encoder 的多头注意力层的 query 和 key 都来自源语言;Decoder 的第一个多头注意力层的 query 和 key 都来自目标语言;Decoder 的第二个多头注意力层的 query 来自目标语言, key 来自源语言。另外,Decoder 的第一个多头注意力层要加串行推理的那个 mask。

def forward(self,
            x,
            y,
            src_pad_mask: Optional[torch.Tensor] = None,
            dst_pad_mask: Optional[torch.Tensor] = None):
    src_mask = self.generate_mask(src_pad_mask, src_pad_mask, False)
    dst_mask = self.generate_mask(dst_pad_mask, dst_pad_mask, True)
    src_dst_mask = self.generate_mask(dst_pad_mask, src_pad_mask, False)

到此,Transfomer 模型总算编写完成了。噢,别忘了,模型参数的初始化方法也可以写进类里面。

def init_weights(self):
    for p in self.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

训练

准备好了模型、数据集后,剩下的工作非常惬意,只要随便调用一下就行了。训练的代码如下:

# Config
batch_size = 64
lr = 0.0001
d_model = 256
d_ff = 1024
n_layers = 6
heads = 8


def main():
    en_vocab, zh_vocab = load_vocab()

    en_train, zh_train, en_valid, zh_valid = load_sentences()
    dataloader_train = get_dataloader(en_train, zh_train, batch_size)

    device = 'cuda:0'

    print_interval = 1000

    model = Transformer(len(en_vocab), len(zh_vocab), d_model, d_ff, n_layers,
                        heads)
    model.to(device)

    model.init_weights()

    optimizer = torch.optim.Adam(model.parameters(), lr)
    citerion = nn.CrossEntropyLoss(ignore_index=PAD_ID)
    tic = time.time()
    cnter = 0
    dataset_len = len(dataloader_train.dataset)
    print('Dataset size:', dataset_len)
    for epoch in range(10):
        loss_sum = 0

        for x, y in dataloader_train:
            x, y = x.to(device), y.to(device)
            x_mask = x == PAD_ID
            y_mask = y == PAD_ID
            y_input = y[:, :-1]
            y_label = y[:, 1:]
            y_mask = y_mask[:, :-1]
            y_hat = model(x, y_input, x_mask, y_mask)
            n, seq_len = y_label.shape
            y_hat = torch.reshape(y_hat, (n * seq_len, -1))
            y_label = torch.reshape(y_label, (n * seq_len, ))
            loss = citerion(y_hat, y_label)

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
            optimizer.step()

            loss_sum += loss.item()

            toc = time.time()
            interval = toc - tic
            minutes = int(interval // 60)
            seconds = int(interval % 60)
            if cnter % print_interval == 0:
                print(f'{cnter:08d} {minutes:02d}:{seconds:02d}'
                      f' loss: {loss.item()}')
            cnter += 1

        print(f'Epoch {epoch}. loss: {loss_sum / dataset_len}')

    torch.save(model.state_dict(), 'dldemos/Transformer/model.pth')
    print('Done.')

准备好超参数,并统计好词典的大小,就可以初始化模型结构及权重了。

# Config
batch_size = 64
lr = 0.0001
d_model = 256
d_ff = 1024
n_layers = 6
heads = 8

model = Transformer(len(en_vocab), len(zh_vocab), d_model, d_ff, n_layers,
                    heads)
model.to(device)

model.init_weights()

优化器用 Adam,非常普通。损失函数是交叉熵函数。这里需要注意一下,不仅模型计算中需要忽略掉,算损失函数时也需要扣除它们的贡献。CrossEntropyLoss的ignore_index表示标签中的哪一类需要被忽略。这里我们传入字符在词典中的序号即可。

optimizer = torch.optim.Adam(model.parameters(), lr)
citerion = nn.CrossEntropyLoss(ignore_index=PAD_ID)

模型训练的部分也比较简单。先预处理出输入里的 mask 和模型的输入输出。如前所述,decoder的输入是第一个到倒数第二个字符y[:, :-1],期待的输出是第二到最后一个字符y[:, 1:]。

再来就是模型前向传播与误差计算了。注意,由于数据带了序列长度那一维,在传入交叉熵之前要把那一维和 batch 维合并。

最后梯度下降的操作也很常规。注意截断一下梯度,防止梯度爆炸即可。

for x, y in dataloader_train:
    x, y = x.to(device), y.to(device)
    x_mask = x == PAD_ID
    y_mask = y == PAD_ID
    y_input = y[:, :-1]
    y_label = y[:, 1:]
    y_mask = y_mask[:, :-1]
    y_hat = model(x, y_input, x_mask, y_mask)
    n, seq_len = y_label.shape
    y_hat = torch.reshape(y_hat, (n * seq_len, -1))
    y_label = torch.reshape(y_label, (n * seq_len, ))
    loss = citerion(y_hat, y_label)

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
    optimizer.step()

由于 Transformer 训练得较慢,得过很久才能看到训练结果。在调试时,我们只要观察到 loss 在不断下降,就可以大胆宣告胜利了。

实验

待更新

总结

只要读懂了 Transfomer 的论文,用 PyTorch 实现一遍 Transformer 是很轻松的。但是,代码实现中有非常多论文不会提及的细节,预处理数据集也算是一个很烦人的工作。在这篇文章里,我完整地介绍了一个英中翻译 Transformer 的 PyTorch 实现,相信读者能够跟随这篇文章实现自己的 Transformer,并在代码实现的过程中加深对论文的理解。

再稍微总结一下代码实现中的一些值得注意的地方。代码中最大的难点是 mask 的实现。mask 的处理稍有闪失,就可能会让计算结果中遍布 NaN。一定要想清楚各个模块的 mask 是从哪来的,它们在注意力计算里是怎么被用上的。

另外,有两处地方的实现比较灵活。一处是位置编码的实现,一处是多头注意力中怎么描述“多头”。其他模块的实现都大差不差,千篇一律。

至此,我的《吴恩达深度学习专项代码实战》总算是完结了。过几天来写总结。

参考资料

https://github.com/hyunwoongko/transformer

https://towardsdatascience.com/

原文:https://zhuanlan.zhihu.com/p/581334630

展开
收起
游客tqsxlcu6ie37g 2023-04-14 18:03:49 937 0
0 条回答
写回答
取消 提交回答

包括语音识别、语音合成、语音唤醒、声学设计及信号处理、声纹识别、音频事件检测等多个领域

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载