1

PyTorch中实现Transformer模型 - zh-jp

 7 months ago
source link: https://www.cnblogs.com/zh-jp/p/18001551
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
  1. 关于Transformer原理与论文的介绍:详细了解Transformer:Attention Is All You Need

对于论文给出的模型架构,使用 PyTorch 分别实现各个部分。

3039442-20230925200236923-909086371.png

引入的相关库函数:

import copyimport torchimport mathfrom torch import nnfrom torch.nn.functional import log_softmax # module: 需要深拷贝的模块# n: 拷贝的次数# return: 深拷贝后的模块列表def clones(module, n: int) -> list: return [copy.deepcopy(module) for _ in range(n)]

1. 编码器与解码器堆叠

Encoder 编码器

编码器由 N 个相同的编码层堆叠而成,每个编码层含两个子层:多头注意力层和前馈网络层。每个子层后跟着一层,用于残差连接与标准化。

Add & Norm 残差连接和标准化

对于上一层的结果:SubLayer(x)与输出上一层的变量:x做残差连接并进行标准化:LayerNorm(x+Sublayer(x))。

# 层标准化class LayerNorm(nn.Module): # 设置 features 形状的张量作为可学习的参数,初始化 def __init__(self, features, eps=1e-6): super(LayerNorm, self).__init__() # 初始化两个参数,α为权重,β为偏置 self.a_2 = nn.Parameter(torch.ones(features)) self.b_2 = nn.Parameter(torch.zeros(features)) self.eps = eps def forward(self, x): # 计算最后一个维度的均值、方差 mean = x.mean(-1, keepdim=True) std = x.std(-1, keepdim=True) return self.a_2 * (x - mean) / (std + self.eps) + self.b_2 # 子层残差连接class SublayerConnection(nn.Module): # size: 参数矩阵的shape, # dropout_prob: dropout概率 def __init__(self, size, dropout_prob): super(SublayerConnection, self).__init__() self.norm = LayerNorm(size) self.dropout = nn.Dropout(p=dropout_prob) def forward(self, x, sublayer): return x + self.dropout(sublayer(self.norm(x)))
  • nn.Dropout()初始化参数p表示训练时,以概率 p 将输入张量的一些元素归零,对于没有归零的元素将乘以11−p。
  • 输入为任意形状的张量,输出为与输入张量形状相同并经过处理的张量。[Source]

Multi-Head Attention 多头注意力层

3039442-20230926113755250-1730455332.png

计算点乘注意力:Attention(Q,K,V)=softmax(QKTdk)V

# q, k, v: 表示公式中的 Q, K, V# mask: 当输入存在掩码时,将 mask 对应位置设置为负无穷# dropout: dropout层# return: 注意力层的输出,以及注意力权重def attention(q, k, v, mask=None, dropout=None): d_k = q.size(-1) scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) p_attn = scores.softmax(dim=-1) if dropout is not None: p_attn = dropout(p_attn) return torch.matmul(p_attn, v), p_attn # 多头注意力class MultiHeadedAttention(nn.Module): # h: 多头注意力的头数 # d_model: 嵌入词的维度 def __init__(self, h, d_model, dropout_prob=0.1): super(MultiHeadedAttention, self).__init__() assert d_model % h == 0 self.d_k = d_model // h self.h = h self.linears = clones(nn.Linear(d_model, d_model), 4) self.attn = None self.dropout = nn.Dropout(p=dropout_prob) def forward(self, q, k, v, mask=None): if mask is not None: mask = mask.unsqueeze(1) # 相同的mask应用于所有的注意力头h batch_size = q.size(0) # 1) 执行线性变换,将 d_model 维度的 x 分割成 h 个 d_k 维度 q, k, v = [ # 通过 view 改变张量形状,并使用 transpose 方法交换张量维度 lin(x).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) for lin, x in zip(self.linears, (q, k, v)) ] # 2) 将 attention 用于每个 batch 的投影向量上 x, self.attn = attention(q, k, v, mask=mask, dropout=self.dropout) # 3) 通过线性层连接多头注意力计算完的向量 x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k) return self.linears[-1](x)

关于contiguous()transpose()不改变张量物理上的存储顺序,而是改变了查看时逻辑上的顺序,使得在内存上不连续(可以通过is_contiguous()查看张量是否是连续的)。

如果不是连续的,可以通过contiguous()方法返回内存上连续、数值上相同的张量。view()方法改变张量的形状需要张量是连续的。[Source]

Feed Forward 前馈网络层

由两个线性层组成,中间使用 ReLU 激活函数:FFN(x)=max(0,xW1+b1)W2+b2

# 基于位置的前馈网络class PositionwiseFeedForward(nn.Module): # d_model: 嵌入词的维度 # d_ff: 前馈网络中间层的维度 def __init__(self, d_model, d_ff, dropout_prob=0.1): super(PositionwiseFeedForward, self).__init__() self.w_1 = nn.Linear(d_model, d_ff) self.w_2 = nn.Linear(d_ff, d_model) self.dropout = nn.Dropout(p=dropout_prob) def forward(self, x): return self.w_2(self.dropout(self.w_1(x).relu()))

每个编码层,含一个多头注意力层,一个前馈网络层,以及两个用于残差连接与标准化层分别跟在两个子层后面。N 个编码层组成编码器,每层的编码层的输出作为下一层的输入。

# 编码层class EncoderLayer(nn.Module): # size: 参数矩阵的shape, # self_attn: 多头注意力层 # feed_forward: 前馈网络层 # dropout_prob: dropout概率 def __init__(self, size, self_attn, feed_forward, dropout_prob): super(EncoderLayer, self).__init__() self.self_attn = self_attn self.feed_forward = feed_forward self.sublayer = clones(SublayerConnection(size, dropout_prob), 2) self.size = size def forward(self, x, mask): x = self.sublayer[0](x, lambda i: self.self_attn(i, i, i, mask)) return self.sublayer[1](x, self.feed_forward) # 编码器:由 N 个相同的层组成class Encoder(nn.Module): def __init__(self, layer, n): super(Encoder, self).__init__() self.layers = clones(layer, n) self.norm = LayerNorm(layer.size) def forward(self, x, mask): for layer in self.layers: x = layer(x, mask) return self.norm(x)

EncoderLayerforward()内的x = self.sublayer[0](x, lambda i: self.self_attn(i, i, i, mask)),虽然此处输入的 q,k,v 均为 i 但在注意力层内,它们将分别与对应的 Q,K,V 矩阵(由线性层Linear实现)相乘,得到用于计算注意力的 q,k,v 。

Decoder 解码器

解码器由 N 层解码层组成。结构与编码层相似,由三个子层组成:带掩码的多头注意力层,多头注意力层和前馈网络层。每个子层后跟着一层,用于残差连接与标准化。

对于第二个子层,输入每一解码层的 K,V 为Encoder(第 N 层的编码层)的输出。为了区别输入Encoder和Decoder的嵌入词,分别用 src(Source,源) 和 tgt(Target,目标) 表示。

# 解码层:由多头注意力层、源-目标注意力层和前馈神经网络组成class DecoderLayer(nn.Module): # size: 参数矩阵的shape, # self_attn: 多头注意力层 # src_attn: 源-目标注意力层 # feed_forward: 前馈网络层 # dropout_prob: dropout概率 def __init__(self, size, self_attn, src_attn, feed_forward, dropout_prob): super(DecoderLayer, self).__init__() self.size = size self.self_attn = self_attn self.src_attn = src_attn self.feed_forward = feed_forward self.sublayer = clones(SublayerConnection(size, dropout_prob), 3) # x: 解码曾输入 # memory: 编码器的输出 # src_mask: 源嵌入词掩码 # tgt_mask: 目标嵌入词掩码 # return: 解码层的输出 def forward(self, x, memory, src_mask, tgt_mask): m = memory x = self.sublayer[0](x, lambda i: self.self_attn(i, i, i, tgt_mask)) x = self.sublayer[1](x, lambda i: self.src_attn(i, m, m, src_mask)) return self.sublayer[2](x, self.feed_forward) # 解码器:由 N 个相同的层组成class Decoder(nn.Module): def __init__(self, layer, n): super(Decoder, self).__init__() self.layers = clones(layer, n) self.norm = LayerNorm(layer.size) def forward(self, x, memory, src_mask, tgt_mask): for layer in self.layers: x = layer(x, memory, src_mask, tgt_mask) return self.norm(x)

2. Generator 生成器

生成器将解码器的输出映射到词汇表上,由一个线性层和一个 softmax 层组成,用于预测下一个token的概率。

# 生成器:线性层和 softmax 层class Generator(nn.Module): # d_model: 解码器输出的(嵌入词)向量维度 # vocab: 词汇表的维度大小 def __init__(self, d_model, vocab): super(Generator, self).__init__() self.proj = nn.Linear(d_model, vocab) def forward(self, x): return log_softmax(self.proj(x), dim=-1) # 对最后一个维度进行 softmax

3. Embedding 嵌入层

使用nn.Embedding构建查找表(Look-Up Table, LUT)。[Source]

  • 初始化时,num_embedding表示嵌入字典大小;embedding_dim表示每个嵌入词向量的维度大小。
  • forward()中使用时,输入维度为d的张量,返回维度为 d×embedding_dim 的张量。

文中,作者还将嵌入层返回的张量乘以dmodel。

class Embeddings(nn.Module): def __init__(self, d_model, vocab): super(Embeddings, self).__init__() self.lut = nn.Embedding(num_embeddings=vocab, embedding_dim=d_model) self.d_model = d_model def forward(self, x): return self.lut(x) * math.sqrt(self.d_model)

4. Positional Encoding 位置编码

为了使模型学习文本的顺序信息,需要引入位置编码:

{PE(pos,2i)=sin⁡(pos/100002i/dmodel)PE(pos,2i+1)=cos⁡(pos/100002i/dmodel)
class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout_prob, max_len=5000): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout_prob) # 计算位置编码 pe = torch.zeros(max_len, d_model) # Shape: max_len x d_model position = torch.arange(0, max_len).unsqueeze(1) # Shape: max_len x 1 div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000) / d_model)) res = position * div_term # Shape: max_len x d_model/2 pe[:, 0::2] = torch.sin(res) pe[:, 1::2] = torch.cos(res) pe = pe.unsqueeze(0) # Shape: 1 x max_len x d_model self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:, :x.size(1)].requires_grad_(False) return self.dropout(x)

self.register_buffer()用于将模型训练参数之外的变量注册加缓存,通过register_buffer()登记过的张量,会自动成为模型中的参数,随着模型移动(gpu/cpu)而移动,但是不会随着梯度进行更新。

在PyTorch中,对于梯度更新的需求,有着不同的张量定义方式[2]。

5. 整体架构

class EncoderDecoder(nn.Module): # encoder: 编码器 # decoder: 解码器 # src_embed: 源嵌入层 # tgt_embed: 目标嵌入层 # generator: 生成器 def __init__(self, encoder, decoder, src_embed, tgt_embed, generator): super(EncoderDecoder, self).__init__() self.encoder = encoder self.decoder = decoder self.src_embed = src_embed self.tgt_embed = tgt_embed self.generator = generator # src: 源语言句子 # src_mask: 源语言句子掩码 def encode(self, src, src_mask): return self.encoder(self.src_embed(src), src_mask) # 编码器 # memory: 编码器的输出 # src_mask: 源语言句子掩码 # tgt: 目标语言句子 # tgt_mask: 目标语言句子掩码 def decode(self, memory, src_mask, tgt, tgt_mask): return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask) def forward(self, src, tgt, src_mask, tgt_mask): memory = self.encode(src, src_mask) res_dec = self.decode(memory, src_mask, tgt, tgt_mask) return self.generator(res_dec) # src_vocab: 源语言词典大小# tgt_vocab: 目标语言词典大小# n: 编码器和解码器的层数# d_model: 嵌入词的维度# d_ff: 前馈网络中间层的维度# h: 多头注意力的头数# dropout_prb: dropout概率# return: Transformer 模型def make_model(src_vocab, tgt_vocab, n=6, d_model=512, d_ff=2048, h=8, dropout_prb=0.1): c = copy.deepcopy attn = MultiHeadedAttention(h, d_model) ff = PositionwiseFeedForward(d_model, d_ff, dropout_prb) position = PositionalEncoding(d_model, dropout_prb) model = EncoderDecoder( Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout_prb), n), Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout_prb), n), nn.Sequential(Embeddings(d_model, src_vocab), c(position)), nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)), Generator(d_model, tgt_vocab), ) # 初始化参数 for p in model.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) return model

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK