'''
# System --> Windows & Python3.10.0
# File ----> main.py
# Author --> Illusionna
# Create --> 2024/07/10 19:32:15
'''
# -*- Encoding: UTF-8 -*-
"""
Step 1: 查看当前 GPU 的所有基础信息
>>> nvidia-smi
CUDA Version: 12.1
Step 2: 配置 main.py 程序依赖环境
>>> conda create -n NLP python==3.10.0
>>> conda activate NLP
>>> pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
Step 3: 激活并测试环境
>>> conda activate NLP
>>> python
>>> import torch
>>> print(torch.__version__)
2.3.1+cu121
>>> print(torch.cuda.is_available())
True
>>> exit(0)
Step 4: Transformer 简单测试示例推理.
>>> python main.py
"""
"""
"Attention is All You Need", 代码 pytorch 推荐两篇:
1. 哈佛大学 NLP 研究组
旧版: http://nlp.seas.harvard.edu/2018/04/03/attention.html
新版: https://nlp.seas.harvard.edu/annotated-transformer
2. 台湾小哥通俗代码
https://github.com/jadore801120/attention-is-all-you-need-pytorch
"""
import math
import copy
import torch
def debug(**kwargs) -> None:
"""
调试打印, 仅此处局部 import 库.
>>> debug(kwargs='2.718281828')
"""
# <S>---------------------------------------------------------------
import os
import sys
import random
import inspect
# ---------------------------------------------------------------<E>
os.system('')
line = inspect.getframeinfo(sys._getframe(1))
file = os.path.relpath(line.filename, os.getcwd())
print('<S>---------------------------------------------------------------')
print(f'\033[3{random.randint(1, 6)}m[+debug] "{file}", line {line.lineno}')
for key, value in kwargs.items():
print('')
print(key, '=')
print(value, end='\n')
print('\033[0m', end='')
print('---------------------------------------------------------------<E>')
def clone(module: object, N: int) -> object:
"""
克隆 N 个相同的块层.
"""
return torch.nn.ModuleList([copy.deepcopy(module) for _ in range(0, N, 1)])
def subsequentMask(size: int) -> torch.Tensor:
"""
掩码矩阵图示见 https://s21.ax1x.com/2024/07/10/pkf52UH.png
创建掩码矩阵, 屏蔽后续位置, 防止解码器计算自注意力"看到"未来的词, 保持序列生成的因果关系, 自注意力是并行计算, 掩码设计可以让解码器训练阶段高效处理长序列数据, 同时保持正确序列生成顺序.
>>> subsequentMask(4)
>>> tensor(
[
[
[ True, False, False, False],
[ True, True, False, False],
[ True, True, True, False],
[ True, True, True, True]
]
]
)
"""
attention_shape = (1, size, size)
# torch.triu 把一个矩阵强制转化成上三角矩阵, 从索引为 1 的对角线处处理.
subsequent_mask = torch.triu(torch.ones(attention_shape), diagonal=1).type(torch.uint8)
return subsequent_mask == 0
def scaledDotProductAttention(
Q: torch.Tensor,
K: torch.Tensor,
V: torch.Tensor,
attention_mask: torch.Tensor,
dropout: object
) -> tuple[torch.Tensor, torch.Tensor]:
"""
计算注意力, 返回得分和权重, 函数公式见 https://s21.ax1x.com/2024/07/10/pkfThG9.png
>>> scores = [
[0.1, 0.2, 0.3, 0.4, 0.5],
[0.6, 0.7, 0.8, 0.9, 1.0],
[1.1, 1.2, 1.3, 1.4, 1.5],
[1.6, 1.7, 1.8, 1.9, 2.0],
[2.1, 2.2, 2.3, 2.4, 2.5]
]
# mask 矩阵也可以为布尔矩阵, 效果是等价于 0-1 矩阵.
>>> mask = [
[1, 1, 1, 0, 0],
[1, 1, 1, 0, 0],
[1, 1, 1, 0, 0],
[1, 1, 1, 0, 0],
[1, 1, 1, 0, 0]
]
>>> scores.masked_fill --> [
[0.1, 0.2, 0.3, -INFINITY, -INFINITY],
[0.6, 0.7, 0.8, -INFINITY, -INFINITY],
[1.1, 1.2, 1.3, -INFINITY, -INFINITY],
[1.6, 1.7, 1.8, -INFINITY, -INFINITY],
[2.1, 2.2, 2.3, -INFINITY, -INFINITY]
]
"""
INFINITY = 1e9
d_k = Q.size(dim=-1)
# 除以根号 d_k 即 scale 缩放, 原因在于防止 softmax 推向梯度极小的区域不利于训练.
scores = torch.matmul(Q, K.transpose(dim0=-1, dim1=-2)) / math.sqrt(d_k)
if attention_mask is not None:
# 遮掩处值全部置为负无穷, 以便下一行 softmax 函数计算其为数值零.
scores = scores.masked_fill(attention_mask == False, -INFINITY)
# 注意力权重概率表示, Q 矩阵元素对应 K 矩阵元素的相似性或重要性, 正相关.
probability_attention = scores.softmax(dim=-1)
# 丢弃一部分神经元.
if dropout is not None:
probability_attention = dropout(probability_attention)
# 除了 Scaled Dot Product Attention 点积注意力以外, 还有一种 Additive Attention 加性注意力, 但斯坦福自然语言处理研究组发现, 在实践中, 点积注意力更快, 更节省空间, 因为它可以使用底层库的高度优化的矩阵乘法 torch.matmul() 实现.
return (torch.matmul(probability_attention, V), probability_attention)
class MultiHeadAttention(torch.nn.Module):
"""
多头(交叉)注意力, 架构见 https://s21.ax1x.com/2024/07/10/pkhVUXt.png
"""
def __init__(self, *args, h: int, d_model: int, dropout: float=0.1, **kwargs) -> None:
super().__init__(*args, **kwargs)
# 若等号成立, 则继续执行, 否则抛出 AssertionError 错误.
assert d_model % h == 0
self.d_k = d_model // h
self.n_heads = h
# 克隆 4 块.
self.linears = clone(torch.nn.Linear(d_model, d_model), 4)
self.dropout = torch.nn.Dropout(dropout)
def forward(
self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor,
mask: torch.Tensor | None = None
) -> torch.Tensor:
if mask is not None:
# 所有的 h 个头都应用掩码.
mask = mask.unsqueeze(dim=1)
# RESIDUAL = Q
batch_size = Q.size(dim=0)
# <S>---------------------------------------------------------------
# 循环三次, linear 每次取自 self.linears 迭代器, x 张量每次取自列表 [Q, K, V] 中的元素.
[Q, K, V] = [
# view() 相对于重塑, batch_size 批量, -1 代表自动根据 h * d_k 进行调整.
linear(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
for linear, x in zip(self.linears, [Q, K, V])
]
'''
# 上面列表推导式等价于下面写法.
Q = self.linears[0](Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
K = self.linears[1](K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
V = self.linears[2](V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
'''
# ---------------------------------------------------------------<E>
(x, _) = scaledDotProductAttention(
Q = Q, K = K, V = V,
attention_mask = mask, dropout = self.dropout
)
x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads * self.d_k)
del Q; del K; del V
# <S>---------------------------------------------------------------
# 最后一个块进行推理, 即索引为 3 的块.
'''
# 台湾小哥在最后返回处进行了残存网络连接, 然后再正则化.
norm = LayerNorm(x.size(dim=-1))
return norm(self.linears[-1](x) + RESIDUAL)
'''
return self.linears[-1](x)
# ---------------------------------------------------------------<E>
class PositionWiseFeddForward(torch.nn.Module):
"""
位置逐元素全连接前馈神经网络, 函数公式见 https://s21.ax1x.com/2024/07/10/pkfvZr9.png
"""
def __init__(
self,
*args,
d_model: int,
d_ff: int,
dropout: float = 0.1,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.W1 = torch.nn.Linear(d_model, d_ff, bias=True)
self.W2 = torch.nn.Linear(d_ff, d_model, bias=True)
self.dropout = torch.nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# torch.Tensor 含有属性 relu() 函数, 可以直接调用.
# return self.W2(self.dropout(self.W1(x).relu()))
return self.W2(self.dropout(torch.relu(self.W1(x))))
class PositionalEncoding(torch.nn.Module):
"""
位置编码, 原理见该作者博客 https://wmathor.com/index.php/archives/1453
"""
def __init__(
self,
*args,
d_model: int,
dropout: float = 0.1,
max_tokens: int = 5000,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.dropout = torch.nn.Dropout(dropout)
pe = torch.zeros(max_tokens, d_model)
# 创建 max_tokens x 1 的列向量.
position = torch.arange(0, max_tokens, 1).unsqueeze(dim=1)
'''
矩阵的所有行, 每行从索引 0 开始, 跳步 2 单位.
>>> matrix = torch.Tensor(
[
[1, 2, 3, 4, 5, 6, 7, 8, 9],
[2, 2, 3, 4, 5, 6, 7, 8, 9],
[3, 2, 3, 4, 5, 6, 7, 8, 9]
]
)
>>> matrix[:, 0::2] = torch.Tensor(
[1, 3, 5, 7, 9],
[2, 3, 5, 7, 9],
[3, 3, 5, 7, 9]
)
>>> matrix[:, 0:-1:2] = torch.Tensor(
[1, 3, 5, 7],
[2, 3, 5, 7],
[3, 3, 5, 7]
)
'''
divisor = torch.exp(torch.arange(0, d_model, 2) * -(math.log(1e4) / d_model))
pe[:, 0::2] = torch.sin(position * divisor)
pe[:, 1::2] = torch.cos(position * divisor)
'''
>>> pe = torch.Tensor(
[
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
)
>>> pe.unsqueeze(0) = torch.Tensor(
[
[
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
]
)
# 增加维度, 给索引为 0 维即 batch 维增加一单位, 相当于添加一个大小为 1 的 batch 维.
'''
pe = pe.unsqueeze(dim=0)
# 将 pe 注册成为模型的缓冲区, 可以在 forward 函数中访问.
self.register_buffer('pe', pe)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
1. 缓冲区是模型的一部分;
2. 位置编码是固定不变的, 训练过程不会更新的, 所以将 pe 注册到缓冲区是合适的;
3. 梯度计算设置为 False, 意味反向传播计算梯度不会更新 pe 编码矩阵;
4. pe 作为模型的状态被保存和加载.
"""
# dropout 正则化来防止过拟合.
return self.dropout(x + self.pe[:, :x.size(1)].requires_grad_(requires_grad=False))
class Embedding(torch.nn.Module):
"""
嵌入层, d_model 输出向量的维度, vocabulary 词汇表的大小即输入的离散值的范围.
"""
def __init__(self, *args, d_model: int, vocabulary: int, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.d_model = d_model
self.embedding = torch.nn.Embedding(vocabulary, d_model)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# 输出嵌入词向量时, 进行缩放, 让嵌入词向量数值范围更稳定, 并与位置编码数值范围相互匹配.
return self.embedding(x) * math.sqrt(self.d_model)
class LayerNorm(torch.nn.Module):
"""
归一化 LN 层: 函数公式见 https://s21.ax1x.com/2024/07/09/pkfrUN8.png
"""
def __init__(self, *args, features: int, eps: float=1e-6, **kwargs) -> None:
# features 代表归一化层特征维度大小.
super().__init__(*args, **kwargs)
# 创建可学习参数(训练中通过反向传播进行更新优化), γ 初始化均为 1, 不缩放.
self.gamma = torch.nn.Parameter(torch.ones(features))
# 创建可学习参数, β 初始化均为 0, 不偏移.
self.beta = torch.nn.Parameter(torch.zeros(features))
self.epsilon = eps
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
计算 y = LN(x) 结果.
"""
# keepdim = True, 输出与输入维度相同, 保持输出的维度.
average = x.mean(dim=-1, keepdim=True)
standard = x.std(dim=-1, keepdim=True)
return self.gamma * (x - average) / torch.sqrt(standard + self.epsilon) + self.beta
class SublayerConnection(torch.nn.Module):
"""
子层块: 残差网络连接, 模型见 https://s21.ax1x.com/2024/07/09/pkfc0I0.png
残差网络可以在反向传播过程中, 梯度连乘, 也不会造成梯度消失.
"""
def __init__(self, *args, size: int, dropout: float, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.norm = LayerNorm(features=size)
# dropout 丢弃神经网络结点图示见 https://s21.ax1x.com/2024/07/09/pkfc2L9.png
self.dropout = torch.nn.Dropout(dropout)
def forward(self, x: torch.Tensor, sublayer: torch.Tensor) -> torch.Tensor:
"""
无信息损失的 x 加上一个推理损失的函数, 这是残差网络重要思想, 避免梯度消失(求导多了常数项).
"""
return x + self.dropout(sublayer(self.norm(x)))
class EncoderLayer(torch.nn.Module):
"""
编码器块层: 由自"注意力"和"位置逐元素全连接前馈神经网络"构成, 多个块层可构成集群.
"""
def __init__(
self,
*args,
size: int,
self_attention: object,
feed_forward_network: object,
dropout: float,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.self_attention = self_attention
self.feed_forward_network = feed_forward_network
# 克隆 2 块, 每个编码器层包含两个子层块.
self.sublayer = clone(SublayerConnection(size=size, dropout=dropout), 2)
self.size = size
def forward(self, x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
# query, key, value 三个集合输入到 attention 函数是统一的.
# mask 掩码指定元素 padding, 计算自注意力权重忽略遮掩的元素.
x = self.sublayer[0](x, lambda x: self.self_attention(x, x, x, mask))
# 把子层块的输出 x 推流至子层块的第二个, 进行 FFN 推理计算.
return self.sublayer[1](x, self.feed_forward_network)
class DecoderLayer(torch.nn.Module):
"""
解码器块层: 由"自注意力", "(普通)注意力", "前馈神经网络构成", 多个块层可构成集群.
1. 解码器块比编码器块多一个(普通)注意力层, 它的输入来自解码器集群的输出 memory
2. 自注意力和注意力相同, 只不过 query, key, value 输入不同
3. 注意力的 query 由下层输入支持, 即自注意力的输出
4. key, value 是编码器集群最后一层输出 memory
5. 自注意力的 query, key, value 均由下层输入支持
"""
def __init__(
self,
*args,
size: int,
self_attention: object,
source_attention: object,
feed_forward_network: object,
dropout: float,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.size = size
self.self_attention = self_attention
self.source_attention = source_attention
self.feed_forward_network = feed_forward_network
# 克隆 3 块子层.
self.sublayer = clone(SublayerConnection(size=size, dropout=dropout), 3)
def forward(
self,
x: torch.Tensor,
memory: torch.Tensor,
source_mask: object,
target_mask: object
) -> torch.Tensor:
# 自注意力.
x = self.sublayer[0](x, lambda x: self.self_attention(x, x, x, target_mask))
# (普通)注意力, 此处的 memory 来自于编码器的输出.
x = self.sublayer[1](x, lambda x: self.source_attention(x, memory, memory, source_mask))
# 前馈神经网络.
return self.sublayer[2](x, self.feed_forward_network)
class Encoder(torch.nn.Module):
"""
编码器集群: 编码器块堆叠, "Attention Is All You Need" 采用六层.
"""
def __init__(self, *args, layer: object, N: int, **kwargs) -> None:
super().__init__(*args, **kwargs)
# 克隆拷贝 N 个编码器, 堆叠构成集群.
self.layers = clone(layer, N)
self.norm = LayerNorm(features=layer.size)
def forward(self, x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
"""
依次通过集群的每个块层传递输入(和掩码).
"""
# 集群中的每个块层反复推理.
for layer in self.layers:
x = layer(x, mask)
# 最后输出前通过一次 LN 层.
return self.norm(x)
class Decoder(torch.nn.Module):
"""
解码器集群: 解码器块堆叠, 生成编码器集群将输入序列变成词向量产生的最终结果.
"""
def __init__(self, *args, layer: object, N: int, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.layers = clone(layer, N)
self.norm = LayerNorm(features=layer.size)
def forward(
self,
x: torch.Tensor,
memory: torch.Tensor,
source_mask: object,
target_mask: object
) -> torch.Tensor:
"""
含掩码.
"""
for layer in self.layers:
x = layer(x, memory, source_mask, target_mask)
return self.norm(x)
class Generator(torch.nn.Module):
"""
生成器: 定义标准 Linear + Softmax 生成步骤.
"""
def __init__(self, *args, d_model: int, vocabulary: int, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.projection = torch.nn.Linear(d_model, vocabulary)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return torch.log_softmax(self.projection(x), dim=-1)
class EncoderDecoder(torch.nn.Module):
"""
标准编码器-解码器架构, 这是未来魔改版本的基础.
"""
def __init__(
self,
*args,
encoder: object,
decoder: object,
source_embedding: object,
target_embedding: object,
generator: object,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.encoder = encoder
self.decoder = decoder
self.src_embed = source_embedding
self.tgt_embed = target_embedding
self.generator = generator
def encode(self, src: torch.Tensor, src_mask: torch.Tensor) -> torch.Tensor:
return self.encoder(self.src_embed(src), src_mask)
def decode(
self,
memory: torch.Tensor,
src_mask: torch.Tensor,
tgt: torch.Tensor,
tgt_mask: torch.Tensor
) -> torch.Tensor:
return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
def forward(
self,
src: torch.Tensor,
tgt: torch.Tensor,
src_mask: torch.Tensor,
tgt_mask: torch.Tensor
) -> torch.Tensor:
return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)
class Transformer:
"""
完整的 Transformer 模型, 架构见 https://s21.ax1x.com/2024/07/10/pkhVMm6.png
"""
def __new__(
cls,
source_vocabulary: int,
target_vocabulary: int,
N: int = 6,
d_model: int = 512,
d_ff: int = 512,
n_heads: int = 8,
dropout: float = 0.1
) -> object:
attn = MultiHeadAttention(
h = n_heads,
d_model = d_model,
dropout = dropout
)
ffn = PositionWiseFeddForward(
d_model = d_model,
d_ff = d_ff,
dropout = dropout
)
pos = PositionalEncoding(
d_model = d_model,
dropout = dropout,
max_tokens = 5000
)
model = EncoderDecoder(
encoder = Encoder(
layer = EncoderLayer(
size = d_model,
self_attention = copy.deepcopy(attn),
feed_forward_network = copy.deepcopy(ffn),
dropout = dropout
),
N = N
),
decoder = Decoder(
layer = DecoderLayer(
size = d_model,
self_attention = copy.deepcopy(attn),
source_attention = copy.deepcopy(attn),
feed_forward_network = copy.deepcopy(ffn),
dropout = dropout
),
N = N
),
source_embedding = torch.nn.Sequential(
Embedding(d_model=d_model, vocabulary=source_vocabulary),
copy.deepcopy(pos)
),
target_embedding = torch.nn.Sequential(
Embedding(d_model=d_model, vocabulary=target_vocabulary),
copy.deepcopy(pos)
),
generator = Generator(d_model=d_model, vocabulary=target_vocabulary)
)
# 用 Glorot / fan_avg 初始化参数.
for param in model.parameters():
if param.dim() > 1:
torch.nn.init.xavier_uniform_(param)
return model
def demoInference(epoch: int) -> None:
"""
示例推理测试.
"""
model = Transformer(source_vocabulary=11, target_vocabulary=11, N=1)
model.eval()
src = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]], dtype=torch.long)
src_mask = torch.ones(1, 1, 10)
memory = model.encode(src, src_mask)
prediction = torch.zeros(1, 1).type_as(src)
for _ in range(0, 9, 1):
output = model.decode(
memory = memory,
src_mask = src_mask,
tgt = prediction,
tgt_mask = subsequentMask(prediction.size(1)).type_as(src.data)
)
probability = model.generator.forward(output[:, -1])
_, next_word = torch.max(probability, dim=1)
next_word = next_word.data[0]
prediction = torch.cat(
tensors = [prediction, torch.empty(1, 1).type_as(src.data).fill_(next_word)],
dim = 1
)
debug(epoch=epoch, ExampleUntrainedModel=prediction)
if __name__ == '__main__':
for idx in range(0, 10, 1):
demoInference(epoch = idx + 1)
GNU GPLv3 project by Illusionna: orzzz.net