
上图红色部分是 Transformer 的 Encoder block 结构,可以看到是由 Multi-Head Attention, Add & Norm, Feed Forward, Add & Norm 组成的。刚刚已经了解了 Multi-Head Attention 的计算过程,现在了解一下 Add & Norm 和 Feed Forward 部分。
Add & Norm 层由 Add 和 Norm 两部分组成,其计算公式如下:

其中 X表示 Multi-Head Attention 或者 Feed Forward 的输入,MultiHeadAttention(X) 和 FeedForward(X) 表示输出 (输出与输入 X 维度是一样的,所以可以相加)。
Add指 X+MultiHeadAttention(X),是一种残差连接,通常用于解决多层网络训练的问题,可以让网络只关注当前差异的部分,在 ResNet 中经常用到:

Feed Forward 层比较简单,是一个两层的全连接层,第一层的激活函数为 Relu,第二层不使用激活函数,对应的公式如下。

是输入,Feed Forward 最终得到的输出矩阵的维度与X一致。
通过上面描述的 Multi-Head Attention, Feed Forward, Add & Norm 就可以构造出一个 Encoder block,Encoder block 接收输入矩阵 X(n×d),并输出一个矩阵O(n×d) 。通过多个 Encoder block 叠加就可以组成 Encoder。
第一个 Encoder block 的输入为句子单词的表示向量矩阵,后续 Encoder block 的输入是前一个 Encoder block 的输出,最后一个 Encoder block 输出的矩阵就是编码信息矩阵 C,这一矩阵后续会用到 Decoder 中。

以下是Pytorch实现Transformer编码器完整的代码示例,包括训练和测试过程,并将模型的输出转换为字符串序列。
import torch
import torch.nn as nn
import torch.optim as optim
import math
# 定义位置编码层
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(1), :]
return x
# 定义多头自注意力机制
class MultiHeadAttention(nn.Module):
def __init__(self, embed_size, heads):
super(MultiHeadAttention, self).__init__()
self.embed_size = embed_size
self.heads = heads
self.head_dim = embed_size // heads
assert (
self.head_dim * heads == embed_size
), "Embedding size needs to be divisible by heads"
self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.fc_out = nn.Linear(heads * self.head_dim, embed_size)
def forward(self, values, keys, query, mask):
N = query.shape[0]
value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]
values = values.reshape(N, value_len, self.heads, self.head_dim)
keys = keys.reshape(N, key_len, self.heads, self.head_dim)
query = query.reshape(N, query_len, self.heads, self.head_dim)
values = self.values(values)
keys = self.keys(keys)
queries = self.queries(query)
energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
if mask is not None:
energy = energy.masked_fill(mask == 0, float("-1e20"))
attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)
out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
N, query_len, self.heads * self.head_dim
)
out = self.fc_out(out)
return out
# 定义Transformer编码器块
class TransformerBlock(nn.Module):
def __init__(self, embed_size, heads, dropout, forward_expansion):
super(TransformerBlock, self).__init__()
self.attention = MultiHeadAttention(embed_size, heads)
self.norm1 = nn.LayerNorm(embed_size)
self.norm2 = nn.LayerNorm(embed_size)
self.feed_forward = nn.Sequential(
nn.Linear(embed_size, forward_expansion * embed_size),
nn.ReLU(),
nn.Linear(forward_expansion * embed_size, embed_size),
)
self.dropout = nn.Dropout(dropout)
def forward(self, value, key, query, mask):
attention = self.attention(value, key, query, mask)
x = self.dropout(self.norm1(attention + query))
forward = self.feed_forward(x)
out = self.dropout(self.norm2(forward + x))
return out
# 定义Transformer编码器
class Encoder(nn.Module):
def __init__(
self,
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length,
):
super(Encoder, self).__init__()
self.embed_size = embed_size
self.device = device
self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
self.position_embedding = PositionalEncoding(embed_size, max_length)
self.layers = nn.ModuleList(
[
TransformerBlock(
embed_size,
heads,
dropout=dropout,
forward_expansion=forward_expansion,
)
for _ in range(num_layers)
]
)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
N, seq_length = x.shape
embeddings = self.word_embedding(x)
embeddings = self.dropout(embeddings)
out = self.position_embedding(embeddings)
for layer in self.layers:
out = layer(out, out, out, mask)
return out
# 定义一个简单的Transformer模型
class TransformerModel(nn.Module):
def __init__(
self,
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length,
):
super(TransformerModel, self).__init__()
self.encoder = Encoder(
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length,
)
self.fc_out = nn.Linear(embed_size, src_vocab_size)
def forward(self, src, src_mask):
enc_src = self.encoder(src, src_mask)
out = self.fc_out(enc_src)
return out
# 定义超参数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
src_vocab_size = 8 # 词汇表大小(假设词汇表包含 "我", "有", "一", "只", "猫", "爱", "吃", "苹果")
embed_size = 256
num_layers = 3
heads = 8
forward_expansion = 4
dropout = 0.1
max_length = 100
num_epochs = 100
# 创建模型
model = TransformerModel(
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length
).to(device)
# 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# 创建训练数据
# 词汇表:
# 0: "我", 1: "有", 2: "一", 3: "只", 4: "猫", 5: "爱", 6: "吃", 7: "苹果"
train_data = [
{
'src': torch.tensor([[0, 1, 2, 3]]).to(device), # "我 有 一 只"
'tgt': torch.tensor([[1, 2, 3, 4]]).to(device), # "有 一 只 猫"
'src_mask': torch.ones((1, 1, 4)).to(device) # 掩码
},
{
'src': torch.tensor([[0, 5, 6]]).to(device), # "我 爱 吃"
'tgt': torch.tensor([[5, 6, 7]]).to(device), # "爱 吃 苹果"
'src_mask': torch.ones((1, 1, 3)).to(device) # 掩码
}
]
# 训练函数
def train_model(model, optimizer, criterion, train_data, num_epochs):
model.train()
for epoch in range(num_epochs):
total_loss = 0
for batch in train_data:
src = batch['src']
tgt = batch['tgt']
src_mask = batch['src_mask']
optimizer.zero_grad()
output = model(src, src_mask)
loss = criterion(output.view(-1, output.shape[-1]), tgt.view(-1))
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_data)}")
# 训练模型
train_model(model, optimizer, criterion, train_data, num_epochs)
# 测试函数
def test_model(model, test_data, idx_to_word):
model.eval()
with torch.no_grad():
for batch in test_data:
src = batch['src'].to(device)
src_mask = batch['src_mask'].to(device)
output = model(src, src_mask)
# 将输出转换为预测的单词索引
_, predicted_indices = torch.max(output, dim=-1)
# 将索引转换为单词
predicted_sentence = ' '.join([idx_to_word[idx.item()] for idx in predicted_indices[0]])
print(f"输入序列: {batch['input_sentence']}")
print(f"预测结果: {predicted_sentence}\n")
# 创建测试数据
idx_to_word = {
0: "我",
1: "有",
2: "一",
3: "只",
4: "猫",
5: "爱",
6: "吃",
7: "苹果"
}
test_data = [
{
'src': torch.tensor([[0, 1, 2, 3]]).to(device), # "我 有 一 只"
'src_mask': torch.ones((1, 1, 4)).to(device),
'input_sentence': "我有一只"
},
{
'src': torch.tensor([[0, 5, 6]]).to(device), # "我 爱 吃"
'src_mask': torch.ones((1, 1, 3)).to(device),
'input_sentence': "我爱吃"
}
]
# 测试模型
test_model(model, test_data, idx_to_word)
运行效果:
Epoch 1, Loss: 2.9269840717315674
...
Epoch 100, Loss: 0.0005955279484624043
输入序列: 我有一只
预测结果: 有 一 只 猫
输入序列: 我爱吃
预测结果: 爱 吃 苹果