数据集的介绍
花分类数据集:
百度云链接下载: https://pan.baidu.com/s/1QLCTA4sXnQAw_yvxPj9szg
提取码:58p0
这篇博客的完整代码:
下载好之后,解压到flower_data文件夹下,此时flower_data\flower_photos下就是放的我们的数据
集,我们看一下原始的数据是什么样子的:
分类类别:共包含 5 类花卉,对应 5 个文件夹: daisy(雏菊) dandelion(蒲公英) roses(玫
瑰) sunflowers(向日葵) tulips(郁金香)
跑过一些项目的应该都有印象,比如YOLO等,他们的数据集的放置是有要求的一般情况下都是分
成两个,一个是train文件夹,train文件夹下是各种分类的文件夹(每个文件夹的名字是类报名)。
另外一个是val文件夹,val文件夹下是各种分类的文件夹(每个文件夹的名字是类报名)。一般是
按照8:2的比例去分这两个数据集的。这里的话可以用AI写代码整理,但是别忘记了检查一下。
整理好之后:
训练集的路径:D:\vscode\shenduxvexishizhan\CNN\flower_data\train
验证集的路径是:D:\vscode\shenduxvexishizhan\CNN\flower_data\val
小白代码书写
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from torchvision import datasets, transforms import os import sys # 引入 tqdm 用于进度条显示 from tqdm import tqdm # 设置设备 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"使用设备: {device}") # 数据路径(假设路径变量已正确设置) train_path = r"flower_data/train" val_path = r"flower_data/val" # ========================================================= # 保持用户原有的 Data Transform 设定 # ========================================================= data_transform = { "train": transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ]), "val": transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ]) } # 加载数据 train_dataset = datasets.ImageFolder(train_path, transform=data_transform["train"]) val_dataset = datasets.ImageFolder(val_path, transform=data_transform["val"]) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) print(f"训练集大小: {len(train_dataset)}") print(f"验证集大小: {len(val_dataset)}") print(f"类别: {train_dataset.classes}") # 定义AlexNet模型 (结构与用户提供的一致) class AlexNet(nn.Module): def __init__(self, num_classes=5): super(AlexNet, self).__init__() self.features = nn.Sequential( nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Conv2d(64, 192, kernel_size=5, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Conv2d(192, 384, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), ) self.classifier = nn.Sequential( nn.Dropout(), nn.Linear(256 * 6 * 6, 4096), nn.ReLU(inplace=True), nn.Dropout(), nn.Linear(4096, 4096), nn.ReLU(inplace=True), nn.Linear(4096, num_classes), ) def forward(self, x): x = self.features(x) x = x.view(x.size(0), -1) x = self.classifier(x) return x # 初始化模型 model = AlexNet(num_classes=5).to(device) criterion = nn.CrossEntropyLoss() # 学习率保持为 0.0002 optimizer = optim.Adam(model.parameters(), lr=0.0002) # ========================================================= # 引入学习率调度器 (Learning Rate Scheduler) # ========================================================= # 每 5 个 epoch 学习率衰减为原来的 0.5 倍 scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5) # 训练函数 def train_epoch(): model.train() total_loss = 0 correct = 0 total = 0 train_bar = tqdm(train_loader, file=sys.stdout, desc="Training") for images, labels in train_bar: images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() total_loss += loss.item() _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() # 更新进度条 train_bar.set_postfix({'loss': loss.item(), 'acc': correct / total}) avg_loss = total_loss / len(train_loader) accuracy = correct / total return avg_loss, accuracy # 验证函数 (保持不变) def validate(): model.eval() correct = 0 total = 0 val_bar = tqdm(val_loader, file=sys.stdout, desc="Validating") with torch.no_grad(): for images, labels in val_bar: images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() val_bar.set_postfix({'acc': correct / total}) accuracy = correct / total return accuracy # 训练模型 num_epochs = 50 # 增加到 50 个 Epoch,以解决欠拟合 print("\n开始训练...\n") # ========================================================= # 引入早停 (Early Stopping) 逻辑 # ========================================================= best_acc = 0.0 patience = 10 # 容忍验证准确率不提升的 Epochs 数量 patience_counter = 0 save_path = r"alexnet_flower_optimized.pth" for epoch in range(num_epochs): train_loss, train_acc = train_epoch() val_acc = validate() # 学习率调度器在每个 epoch 结束时调用 scheduler.step() print(f"\nEpoch [{epoch+1}/{num_epochs}] | Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Val Acc: {val_acc:.4f}") # 早停逻辑 if val_acc > best_acc: best_acc = val_acc torch.save(model.state_dict(), save_path) patience_counter = 0 print(f"🚀 Best model saved! New best Val Acc: {best_acc:.4f}") else: patience_counter += 1 print(f"Validation Acc did not improve. Patience: {patience_counter}/{patience}") if patience_counter >= patience: print(f"🛑 Early stopping triggered after {patience} epochs without improvement.") break # 提前结束训练循环 print('\nFinished Training') print(f"最终最佳验证准确率: {best_acc:.4f}") # (predict_image 函数保持不变)
运行结果:
这里也分享一下啊,为啥当我们自己写代码的时候很容易出现过拟合和欠拟合的现象:
(1)一开始我对训练集和测试集都使用了相同的处理方式,出现了过拟合的问题
(2)对训练集使用了很复杂的数据增强的方式,出现了欠拟合的问题
这两种的现象出现的原因都很有可能是数据处理部分出现的问题,如果一开始独立写代码的话,建
议参考别人成熟代码中的数据预处理部分的代码。
打怪升级路线
下面我们将会从下面几个部分来做这个项目:
(1)网络结构模块
(2)数据集读取模块
(3)训练文件模块
(4)测试文件模块
(5)辅助函数模块
编
🌻 项目模块划分与功能概述
📂 configs文件夹
Alexnet.yaml
这个结构模仿了 YOLOv5 的配置风格,清晰地划分了数据、模型和训练的超参数。
📂 models文件夹
网络结构文件(model文件)
models/vit.py,定义Alexnet的全部组件和前向传播逻辑。
models/common.py ,里面可以放一些通用的模块
📂utils文件夹
utils/datasets.py 数据集加载
utils/general.py 通用工具函数
utils/metrics.py 评估指标
utils/torch_utils.py PyTorch工具
📂runs文件夹
train/ 存放训练结果
val/ 存放验证结果
📂weights 预训练权重
🚀train.py 网络训练
🔍 val.py 加载训练好的权重,对单个图像或整个测试集进行推理和评估。
🔍 predict.py 预测脚本
configs文件夹
这个结构模仿了 YOLOv5 的配置风格,清晰地划分了数据、模型和训练的超参数。使用 YAML 配
置文件进行深度学习项目管理具有显著的优势,它实现了代码逻辑与配置参数的清晰分离,是构建
专业、灵活且可复现项目的基础 。
flower.yaml
里面存放的内容:
📁 数据路径 (Data Paths):这一块告诉程序去哪里找训练和验证用的图片。
🌸 类别信息 (Class Information):
nc: 5 (Number of Classes):我们总共要识别五种不同的花。这是模型输出层的神经元数量。names: [...] 这五种花的名字分别是:雏菊、蒲公英、玫瑰、向日葵和郁金香。程序会把模型的输
出索引 (0, 1, 2, 3, 4) 映射到这些名字上。
⚙️ 训练超参数 (Training Hyperparameters)
epochs: 100 总共要重复训练多少轮,
batch_size: 16:模型一次性看多少张图片。
img_size: 224:所有输入图片的统一尺寸。
lr0: 0.0001,模型学习的速度/步长。 这是优化器(Adam)开始时的初始学习率。
weight_decay: 0.0001 给模型施加的“偷懒”惩罚。 这是一种正则化技术(L2 正则化),目的是
防止模型权重变得太大而导致过拟合。它通过在损失函数上增加一个小的惩罚项来实现。
🧠 优化器和调度器 (Optimizer and Scheduler)
optimizer: 'Adam' 选择模型“大脑”的学习方式。 Adam 是一种流行的梯度下降算法,它能自适应
地调整每个参数的学习率,通常比传统的 SGD 效果好。
scheduler: 'CosineAnnealing':大白话: 学习率的调整策略。
CosineAnnealing 是一种平滑的学习率衰减方法,它会让学习率随着 Epoch 像余弦波一样先缓慢
下降,然后加速下降,最后又缓慢下降。
step_size: 10 / gamma: 0.1 这是针对 StepLR 策略的设置。 如果使用 StepLR,这意味着:每训
练 10 轮 (step_size),学习率就变成原来的 0.1倍 (gamma)。
🧱 模型参数 (Model Parameters)
dropout: 0.5这就是 Dropout正则化,意味着在每次前向传播时,全连接层中 50 的神经元会被随
机关闭。这是防止过拟合的有效手段 。
💻 设备和路径 (Device and Paths)
device: 'cuda:0':使用哪一个处理器。 告诉程序优先使用第一张 NVIDIA 显卡 (GPU) 进行训练,
因为 GPU 比 CPU 快得多。
workers: 4 数据加载工人数量。 告诉程序使用 4 个 CPU 核心来并行加载和预处理图片,以确保
GPU 不会因为等待数据而闲置。
save_dir: 'runs/train':训练结果的存储位置。 模型的权重文件、训练日志、配置备份等所有结果
都将保存在这个文件夹里。
# 花卉分类配置文件 # 数据路径 train: 'flower_data/train' val: 'flower_data/val' # 类别信息 nc: 5 names: ['daisy', 'dandelion', 'roses', 'sunflowers', 'tulips'] # 训练超参数 epochs: 100 batch_size: 16 img_size: 224 lr0: 0.0001 weight_decay: 0.0001 # 优化器和调度器 optimizer: 'Adam' # SGD, Adam scheduler: 'CosineAnnealing' # StepLR, CosineAnnealing step_size: 10 gamma: 0.1 # 模型参数 dropout: 0.5 # 设备 device: 'cuda:0' workers: 4 # 保存路径 save_dir: 'runs/train'
models文件夹
网络结构文件(model文件)
models/Alexnet_model.py,定义 网络的全部组件和前向传播逻辑。这个里
面主要存放的网络的结构部分。
"""AlexNet模型""" import torch.nn as nn from models.common import ConvBNReLU, LinearBNReLU class AlexNet(nn.Module): def __init__(self, num_classes=5, dropout=0.5): super().__init__() # 特征提取 self.features = nn.Sequential( ConvBNReLU(3, 96, 11, 4, 2), nn.MaxPool2d(3, 2), ConvBNReLU(96, 256, 5, 1, 2), nn.MaxPool2d(3, 2), ConvBNReLU(256, 384, 3, 1, 1), ConvBNReLU(384, 384, 3, 1, 1), ConvBNReLU(384, 256, 3, 1, 1), nn.MaxPool2d(3, 2), ) # 分类器 self.classifier = nn.Sequential( nn.Dropout(dropout), LinearBNReLU(256 * 6 * 6, 2048), nn.Dropout(dropout), LinearBNReLU(2048, 1024), nn.Linear(1024, num_classes) ) self._init_weights() def _init_weights(self): for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm2d, nn.BatchNorm1d)): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) elif isinstance(m, nn.Linear): nn.init.normal_(m.weight, 0, 0.01) nn.init.constant_(m.bias, 0) def forward(self, x): x = self.features(x) x = x.view(x.size(0), -1) x = self.classifier(x) return x
models/common.py,定义网络的全部组件,方便在Alexnet_model.py中直接调用。
"""通用模块""" import torch.nn as nn class ConvBNReLU(nn.Module): """卷积 + BN + ReLU""" def __init__(self, in_c, out_c, k=3, s=1, p=1): super().__init__() self.conv = nn.Conv2d(in_c, out_c, k, s, p, bias=False) self.bn = nn.BatchNorm2d(out_c) self.relu = nn.ReLU(inplace=True) def forward(self, x): return self.relu(self.bn(self.conv(x))) class LinearBNReLU(nn.Module): """全连接 + BN + ReLU""" def __init__(self, in_f, out_f): super().__init__() self.fc = nn.Linear(in_f, out_f) self.bn = nn.BatchNorm1d(out_f) self.relu = nn.ReLU(inplace=True) def forward(self, x): return self.relu(self.bn(self.fc(x)))
utils文件夹
utils/datasets.py 数据集加载
"""数据集加载""" from torch.utils.data import DataLoader from torchvision import transforms, datasets def create_dataloader(path, img_size=224, batch_size=16, shuffle=True, workers=4, augment=False): """创建数据加载器""" if augment: transform = transforms.Compose([ transforms.Resize((img_size + 32, img_size + 32)), transforms.RandomCrop(img_size), transforms.RandomHorizontalFlip(), transforms.RandomRotation(15), transforms.ColorJitter(0.3, 0.3, 0.3, 0.1), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), transforms.RandomErasing(p=0.3) ]) else: transform = transforms.Compose([ transforms.Resize((img_size, img_size)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) dataset = datasets.ImageFolder(path, transform=transform) loader = DataLoader(dataset, batch_size, shuffle, num_workers=workers, pin_memory=True) return loader, dataset
utils/general.py 通用工具函数
💻 load_yaml(path) 函数的作用
这个函数的作用就是读取一份训练项目的“说明书”或“配置清单”,并将其转换成 Python 能够操作的
字典 (Dictionary) 格式。
🚀 代码调用、输入与输出示例
为了演示这段代码的输入和输出,我们需要假设存在一个 YAML 配置文件,并需要导入 yaml 库。
import yaml import os from pathlib import Path # --- 待解释的函数 --- def load_yaml(path): """加载YAML配置""" # 注意:在实际运行中,需要确保 'yaml' 库已安装 (pip install pyyaml) with open(path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) # --- 演示调用 --- # 1. 创建一个模拟的 YAML 文件(以便代码可以运行) yaml_content = """ project_name: flower_classification epochs: 50 learning_rate: 0.0001 data: train_path: 'data/train' val_path: 'data/val' """ config_path = 'temp_config.yaml' Path(config_path).write_text(yaml_content, encoding='utf-8') # 2. 调用函数 # 输入 (Input): 配置文件路径 print(f"输入路径: {config_path}") config_data = load_yaml(config_path) # 3. 查看输出 # 输出 (Output): 函数返回的 Python 字典 print("\n--- 函数输出 (Python 字典) ---") print(config_data) print("\n--- 输出类型 ---") print(type(config_data)) # 4. 演示如何使用输出数据 print("\n--- 访问输出数据 ---") print(f"项目名称: {config_data['project_name']}") print(f"训练轮数 (Epochs): {config_data['epochs']}") print(f"训练集路径: {config_data['data']['train_path']}") # 清理模拟文件 #os.remove(config_path)
💾 save_checkpoint(...) 函数的作用
这个函数的作用是为您的模型训练进度拍摄“快照” (Snapshot),并确保您能够找到性能最好的那
个版本。
import torch from pathlib import Path import os # --- 待解释的函数 --- def save_checkpoint(state, path, is_best=False): """保存检查点""" # 确保目录存在 Path(path).parent.mkdir(parents=True, exist_ok=True) # 保存当前的检查点 torch.save(state, path) if is_best: best_path = Path(path).parent / 'best.pt' torch.save(state, best_path) print(f'✅ Best model saved: {best_path}') # --- 函数结束 --- # --- 演示调用 --- # 1. 模拟要保存的状态 (State) mock_state = { 'epoch': 5, 'model': {'weight_01': torch.tensor([1.2, 0.8])}, 'optimizer': 'Adam_state_info' } # 2. 设置保存路径 save_dir = 'mock_runs/train_test' save_path_current = os.path.join(save_dir, 'ckpt_005.pt') best_path_expected = os.path.join(save_dir, 'best.pt') # 3. 第一次调用:保存普通快照 print("--- 第一次调用:普通保存 ---") save_checkpoint(mock_state, save_path_current, is_best=False) print(f"输入状态: epoch {mock_state['epoch']}") print(f"输出文件: {save_path_current}") print("==================================\n") # 4. 第二次调用:保存最佳快照 print("--- 第二次调用:保存最佳模型 ---") # 假设这是第 10 轮,并且是目前的最佳成绩 mock_state['epoch'] = 10 save_checkpoint(mock_state, os.path.join(save_dir, 'ckpt_010.pt'), is_best=True) print(f"输入状态: epoch {mock_state['epoch']}") print(f"输出文件: {os.path.join(save_dir, 'ckpt_010.pt')} 和 {best_path_expected}") # 5. 清理模拟文件 # import shutil # shutil.rmtree(save_dir) # 在实际测试后,可以使用此行清理创建的文件夹
🔄 load_checkpoint(...) 函数的作用(大白话解释)
这个函数的作用就是让一个中断的训练任务能够从上次停止的地方无缝接续,或者让一个训练好的
模型准备好进行预测。
🚀 代码调用、输入与输出示例
为了演示其作用,我们假设您已经保存了一个检查点文件 (best.pt),现在要加载它来继续训练。
ckpt = torch.load(path, map_location='cpu')从指定 path 加载保存的序列化文件(通常是 .pth
或 .ckpt 格式),返回一个字典(checkpoint 字典)。ckpt:接收加载后的字典,该字典通常包含
训练过程中保存的关键信息,如 model(模型权重)、optimizer(优化器状态)、epoch(训练轮
次)、loss(损失值)等。
model.load_state_dict(ckpt['model']),将预训练的权重参数加载到模型实例中。ckpt['model']:
检查点字典中键为 model 的值,对应模型的 state_dict(状态字典),包含模型所有可学习参数
(如卷积层权重、全连接层偏置等)。
让当前的 model 实例拥有检查点中保存的权重,实现模型参数恢复(比如恢复到上次训练中断时
的状态,或加载预训练权重)。
如果判断检查点字典中有 optimizer 键,优化器的 load_state_dict 方法,恢复优化器的状态。
ckpt.get('epoch', 0):字典的 get 方法,安全获取 epoch 键的值: 若 ckpt 中存在 epoch 键,返
回对应的值(如 100,表示上次训练到第 100 轮); 若不存在 epoch 键,返回默认值 0。返回上
次训练中断的轮次,方便后续训练从该轮次继续。
import torch import torch.nn as nn import torch.optim as optim from pathlib import Path import os # 模拟一个简单的模型定义 class SimpleModel(nn.Module): def __init__(self): super().__init__() self.fc = nn.Linear(10, 1) def forward(self, x): return self.fc(x) # --- 待解释的函数 (假设已定义) --- def load_checkpoint(path, model, optimizer=None): """加载检查点""" ckpt = torch.load(path, map_location='cpu') model.load_state_dict(ckpt['model']) if optimizer and 'optimizer' in ckpt: optimizer.load_state_dict(ckpt['optimizer']) return ckpt.get('epoch', 0) # --- 函数结束 --- # --- 演示调用 --- # 1. 模拟一个检查点文件 (通常是 save_checkpoint 函数创建的) model_to_save = SimpleModel() optimizer_to_save = optim.Adam(model_to_save.parameters(), lr=0.001) mock_ckpt = { 'epoch': 50, 'model': model_to_save.state_dict(), 'optimizer': optimizer_to_save.state_dict() } ckpt_path = 'mock_runs/best.pt' Path(ckpt_path).parent.mkdir(parents=True, exist_ok=True) torch.save(mock_ckpt, ckpt_path) # 2. 初始化新的模型和优化器 (它们需要被加载) new_model = SimpleModel() new_optimizer = optim.Adam(new_model.parameters(), lr=0.01) # 注意:初始LR不同 print("--- 加载前状态 ---") # 打印新的模型和优化器的初始状态 (它们是随机的/初始化的) print(f"新模型参数的初始值 (部分): {list(new_model.parameters())[0].sum().item():.4f}") print(f"新优化器的初始LR: {new_optimizer.param_groups[0]['lr']:.4f}") print("==================================\n") # 3. 调用函数加载检查点 start_epoch = load_checkpoint(ckpt_path, new_model, new_optimizer) # 4. 查看加载后状态 (输出) print("--- 加载后状态 (函数输出) ---") # 模型的参数已经被检查点中的值覆盖 print(f"加载后模型参数的总和 (应与保存前的总和一致): {list(new_model.parameters())[0].sum().item():.4f}") # 优化器的LR已经被检查点中的值覆盖 (如果检查点中包含LR信息的话) print(f"加载后的优化器LR: {new_optimizer.param_groups[0]['lr']:.4f}") print(f"函数返回的下一轮起始 Epoch: {start_epoch}") # 5. 清理模拟文件 # os.remove(ckpt_path) # import shutil # shutil.rmtree('mock_runs')
--- 加载前状态 --- 新模型参数的初始值 (部分): 0.5234 # 随机值 新优化器的初始LR: 0.0100 ================================== --- 加载后状态 (函数输出) --- 加载后模型参数的总和 (应与保存前的总和一致): 0.3547 # 已经被检查点中的值覆盖 加载后的优化器LR: 0.0010 # 已经被检查点中的值覆盖 函数返回的下一轮起始 Epoch: 50
"""通用工具函数""" import yaml import torch from pathlib import Path def load_yaml(path): """加载YAML配置""" with open(path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def save_checkpoint(state, path, is_best=False): """保存检查点""" Path(path).parent.mkdir(parents=True, exist_ok=True) torch.save(state, path) if is_best: best_path = Path(path).parent / 'best.pt' torch.save(state, best_path) print(f'✅ Best model saved: {best_path}') def load_checkpoint(path, model, optimizer=None): """加载检查点""" ckpt = torch.load(path, map_location='cpu') model.load_state_dict(ckpt['model']) if optimizer and 'optimizer' in ckpt: optimizer.load_state_dict(ckpt['optimizer']) return ckpt.get('epoch', 0)
utils/metrics.py 评估指标
📊 Metrics 类的作用(大白话解释)
这个类的作用就是充当一个计分板。它在每一轮比赛(批次)中记录得分(预测和标签),直到整
个 Epoch结束,然后给出比赛的最终得分(各种指标)。
🚀 示例:在训练循环中的使用
from sklearn.metrics import accuracy_score, precision_recall_fscore_support import torch # ... Metrics 类的定义放在这里 ... # 初始化指标计算器 train_metrics = Metrics() # 模拟一个 Epoch 的训练循环 for epoch in range(num_epochs): train_metrics.reset() # ❶ 每个 Epoch 开始时重置计分板 # 模拟批次循环 for batch_id, (data, label) in enumerate(train_loader): # ... (模型前向传播、损失计算、反向传播) ... # 假设 model_outputs 是模型的输出 logits pred_labels = torch.argmax(model_outputs, dim=1) # ❷ 在每个批次结束后更新记录 train_metrics.update(pred_labels, label) # ❸ 在 Epoch 结束后计算指标 results = train_metrics.compute() print(f"Epoch {epoch+1} 训练准确率: {results['acc']:.4f}")
📏 AverageMeter 类的作用(大白话解释)
这个类的作用就是充当一个“在线平均值计算器”,它能够高效地在每次迭代(如每个训练批次)中
更新数值,并随时提供当前的平均值**,而不需要等到所有数据收集完毕。
在深度学习训练中,它最常用于计算:每个 Epoch的平均损失 (Average Loss) 或 每个 Epoch 的平
均准确率 (Average Accuracy)。
🚀 示例:在训练循环中的使用
# 初始化损失平均值计算器 loss_meter = AverageMeter() # 模拟一个 Epoch 的训练循环 for epoch in range(num_epochs): loss_meter.reset() # ❶ 每个 Epoch 开始时清零 for batch_id, (data, label) in enumerate(train_loader): # ... (模型前向传播,计算损失) ... current_loss = loss_function(outputs, label).item() # 假设损失是 0.15 batch_size = data.size(0) # 假设 Batch Size 是 32 # ❷ 在每个批次结束后更新损失计算器 loss_meter.update(current_loss, batch_size) # 随时可以查看当前状态 # print(f"Batch {batch_id}: 当前损失={loss_meter.val:.4f}, 累计平均损失={loss_meter.avg:.4f}") # ❸ 在 Epoch 结束时,输出最终的平均损失 print(f"Epoch {epoch+1} 最终平均损失: {loss_meter.avg:.4f}")
"""评估指标""" import numpy as np from sklearn.metrics import accuracy_score, precision_recall_fscore_support class Metrics: """指标计算器""" def __init__(self): self.reset() def reset(self): self.preds = [] self.labels = [] def update(self, pred, label): self.preds.extend(pred.cpu().numpy()) self.labels.extend(label.cpu().numpy()) def compute(self): acc = accuracy_score(self.labels, self.preds) p, r, f1, _ = precision_recall_fscore_support( self.labels, self.preds, average='macro', zero_division=0 ) return {'acc': acc, 'precision': p, 'recall': r, 'f1': f1} class AverageMeter: """平均值计算器""" def __init__(self): self.reset() def reset(self): self.val = 0 self.avg = 0 self.sum = 0 self.count = 0 def update(self, val, n=1): self.val = val self.sum += val * n self.count += n self.avg = self.sum / self.count
utils/torch_utils.py PyTorch工具
"""PyTorch工具""" import torch import torch.nn as nn def select_device(device=''): """选择设备""" if device and 'cuda' in device and torch.cuda.is_available(): return torch.device(device) return torch.device('cpu') def get_optimizer(model, name='Adam', lr=0.001, weight_decay=0.0001): """创建优化器""" if name == 'SGD': return torch.optim.SGD(model.parameters(), lr, momentum=0.9, weight_decay=weight_decay) elif name == 'Adam': return torch.optim.Adam(model.parameters(), lr, weight_decay=weight_decay) else: raise ValueError(f'Unknown optimizer: {name}') def get_scheduler(optimizer, name='CosineAnnealing', epochs=100, step_size=10, gamma=0.1): """创建学习率调度器""" if name == 'StepLR': return torch.optim.lr_scheduler.StepLR(optimizer, step_size, gamma) elif name == 'CosineAnnealing': return torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs) return None
主文件夹
parser = argparse.ArgumentParser()
🚀 代码作用(大白话解释)
这段代码的作用就像是程序的“启动器”和“控制台”。它让您可以在不修改代码的情况下,通过在命
令行中输入不同的指令来控制程序的行为,例如告诉程序使用哪个配置文件、在哪块硬件上运行等
等。
| 对应的代码 | parser = argparse.ArgumentParser() |
| 大白话: | “我准备好接收你的指令了。” |
| 作用: | 创建了一个参数解析器对象 (parser),这个对象专门负责识别和处理您在命令行中输入的各种参数和选项。 |
| 参数定义 | 对应的作用 | 大白话解释 |
--cfg |
default='configs/flower.yaml', help='config file' |
配置文件指令。 允许您指定要使用哪个 YAML 配置文件来运行训练。默认使用 configs/flower.yaml。 |
--device |
default='', help='cuda device, i.e. 0 or cpu' |
硬件选择指令。 允许您指定程序应该在哪个硬件上运行,例如 cuda:0(使用 GPU)或 cpu(使用中央处理器)。 |
--save-dir |
default='', help='save directory' |
结果保存指令。 允许您指定训练日志、模型权重等结果应该保存到哪个文件夹。 |
| 对应的代码 | opt = parser.parse_args() |
| 大白话: | “把所有指令都收集起来。” |
| 作用: | 当您在命令行中按下 Enter 键后,这个方法就会运行:它读取您输入的所有参数(例如 --device cuda:0),并将它们打包成一个名为 opt 的对象。 |
| 对应的代码 | main(opt) |
| 大白话: | “开始执行程序主体。” |
| 作用: | 调用程序的主函数 (main),并将所有收集到的配置和指令 (opt) 传递给它。主函数随后会根据 opt 中包含的参数来运行训练或预测逻辑。 |
💡 命令行使用示例
假设这段代码保存为 train.py,您可以在命令行中这样启动程序
# 运行示例:使用CPU,并将结果保存到 my_exp/ python train.py --device cpu --save-dir my_exp/ # 运行示例:使用 GPU 0,并指定一个不同的配置文件 python train.py --cfg configs/my_custom.yaml --device cuda:0
train.py
"""训练脚本""" import argparse import torch import torch.nn as nn from pathlib import Path from tqdm import tqdm from models.AlexNet import AlexNet from utils.datasets import create_dataloader from utils.general import load_yaml, save_checkpoint from utils.metrics import Metrics, AverageMeter from utils.torch_utils import select_device, get_optimizer, get_scheduler def train_epoch(model, loader, criterion, optimizer, device): """训练一个epoch""" model.train() loss_meter = AverageMeter() metrics = Metrics() pbar = tqdm(loader, desc='Training') for imgs, labels in pbar: imgs, labels = imgs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(imgs) loss = criterion(outputs, labels) loss.backward() optimizer.step() loss_meter.update(loss.item(), imgs.size(0)) metrics.update(outputs.argmax(1), labels) pbar.set_postfix({'loss': f'{loss_meter.avg:.4f}'}) results = metrics.compute() results['loss'] = loss_meter.avg return results def validate(model, loader, criterion, device): """验证""" model.eval() loss_meter = AverageMeter() metrics = Metrics() with torch.no_grad(): for imgs, labels in tqdm(loader, desc='Validating'): imgs, labels = imgs.to(device), labels.to(device) outputs = model(imgs) loss = criterion(outputs, labels) loss_meter.update(loss.item(), imgs.size(0)) metrics.update(outputs.argmax(1), labels) results = metrics.compute() results['loss'] = loss_meter.avg return results def main(opt): # 加载配置 cfg = load_yaml(opt.cfg) device = select_device(opt.device or cfg['device']) save_dir = Path(opt.save_dir or cfg['save_dir']) save_dir.mkdir(parents=True, exist_ok=True) print(f'🌻 Training on {device}') # 数据加载 train_loader, train_set = create_dataloader( cfg['train'], cfg['img_size'], cfg['batch_size'], True, cfg['workers'], augment=True ) val_loader, val_set = create_dataloader( cfg['val'], cfg['img_size'], cfg['batch_size'], False, cfg['workers'], augment=False ) print(f'Train: {len(train_set)}, Val: {len(val_set)}') # 模型 model = AlexNet(cfg['nc'], cfg['dropout']).to(device) criterion = nn.CrossEntropyLoss(label_smoothing=0.1) optimizer = get_optimizer(model, cfg['optimizer'], cfg['lr0'], cfg['weight_decay']) scheduler = get_scheduler(optimizer, cfg['scheduler'], cfg['epochs'], cfg.get('step_size', 10), cfg.get('gamma', 0.1)) # 训练 best_acc = 0 for epoch in range(1, cfg['epochs'] + 1): print(f'\n📊 Epoch {epoch}/{cfg["epochs"]}') train_results = train_epoch(model, train_loader, criterion, optimizer, device) val_results = validate(model, val_loader, criterion, device) if scheduler: scheduler.step() print(f'Train - Loss: {train_results["loss"]:.4f}, Acc: {train_results["acc"]:.4f}') print(f'Val - Loss: {val_results["loss"]:.4f}, Acc: {val_results["acc"]:.4f}') # 保存 is_best = val_results['acc'] > best_acc if is_best: best_acc = val_results['acc'] save_checkpoint({ 'epoch': epoch, 'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'best_acc': best_acc }, save_dir / 'last.pt', is_best) print(f'\n🎉 Training complete! Best Acc: {best_acc:.4f}') if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--cfg', type=str, default='configs/flower.yaml', help='config file') parser.add_argument('--device', type=str, default='', help='cuda device, i.e. 0 or cpu') parser.add_argument('--save-dir', type=str, default='', help='save directory') opt = parser.parse_args() main(opt)
val.py
"""验证脚本""" import argparse import torch from tqdm import tqdm from sklearn.metrics import confusion_matrix, classification_report from models.AlexNet import AlexNet from utils.datasets import create_dataloader from utils.general import load_yaml, load_checkpoint from utils.metrics import Metrics from utils.torch_utils import select_device def main(opt): # 加载配置 cfg = load_yaml(opt.cfg) device = select_device(opt.device or cfg['device']) print('🔍 Validation') # 加载模型 model = AlexNet(cfg['nc'], cfg['dropout']).to(device) load_checkpoint(opt.weights, model) model.eval() # 加载数据 val_loader, val_set = create_dataloader( cfg['val'], cfg['img_size'], batch_size=1, shuffle=False, workers=0, augment=False ) print(f'Dataset: {len(val_set)} images') # 验证 metrics = Metrics() all_preds, all_labels = [], [] with torch.no_grad(): for imgs, labels in tqdm(val_loader): imgs = imgs.to(device) outputs = model(imgs) preds = outputs.argmax(1) metrics.update(preds, labels) all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.numpy()) # 结果 results = metrics.compute() print(f'\n📊 Results:') print(f'Accuracy: {results["acc"]:.4f}') print(f'Precision: {results["precision"]:.4f}') print(f'Recall: {results["recall"]:.4f}') print(f'F1-Score: {results["f1"]:.4f}') # 混淆矩阵 print(f'\n📈 Confusion Matrix:') print(confusion_matrix(all_labels, all_preds)) # 分类报告 print(f'\n📋 Classification Report:') print(classification_report(all_labels, all_preds, target_names=cfg['names'], digits=4)) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--cfg', type=str, default='configs/flower.yaml', help='config file') parser.add_argument('--weights', type=str, default='runs/train/best.pt', help='model weights') parser.add_argument('--device', type=str, default='', help='cuda device') opt = parser.parse_args() main(opt)
predict.py
"""预测脚本""" import argparse import torch from PIL import Image from torchvision import transforms from models.AlexNet import AlexNet from utils.general import load_yaml, load_checkpoint from utils.torch_utils import select_device def main(opt): # 加载配置 cfg = load_yaml(opt.cfg) device = select_device(opt.device or cfg['device']) print('🔮 Prediction') # 加载模型 model = AlexNet(cfg['nc'], cfg['dropout']).to(device) load_checkpoint(opt.weights, model) model.eval() # 图像预处理 transform = transforms.Compose([ transforms.Resize((cfg['img_size'], cfg['img_size'])), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) img = Image.open(opt.source).convert('RGB') img_tensor = transform(img).unsqueeze(0).to(device) # 预测 with torch.no_grad(): outputs = model(img_tensor) probs = torch.softmax(outputs, dim=1) pred_class = probs.argmax(1).item() confidence = probs[0][pred_class].item() # Top-K top_k_prob, top_k_idx = torch.topk(probs, opt.top_k) # 结果 print(f'\n🌸 Prediction:') print(f'Class: {cfg["names"][pred_class]}') print(f'Confidence: {confidence:.4f} ({confidence*100:.2f}%)') print(f'\n📊 Top-{opt.top_k}:') for i, (prob, idx) in enumerate(zip(top_k_prob[0], top_k_idx[0]), 1): name = cfg["names"][idx.item()] print(f'{i}. {name:12s} - {prob.item():.4f} ({prob.item()*100:.2f}%)') if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--source', type=str, required=True, help='image path') parser.add_argument('--cfg', type=str, default='configs/flower.yaml', help='config file') parser.add_argument('--weights', type=str, default='runs/train/best.pt', help='model weights') parser.add_argument('--device', type=str, default='', help='cuda device') parser.add_argument('--top-k', type=int, default=3, help='top k predictions') opt = parser.parse_args() main(opt)
运行结果:
我们可以发现使用这种方式,训练结果竟然比之前高出4个百分点,开心,因为在项目中我们考
虑的内容更多了。
train.py
val.py
predict.py
🚀 如何使用预测脚本
python predict.py --source /path/to/your/image/test_flower.jpg
待预测的图片路径为 /path/to/your/image/test_flower.jpg
这个的话,我图片上的路径是是:
/root/autodl-tmp/CNN/flower_data/flower_photos/daisy/8710109684_e2c5ef6aeb_n.jpg
所以我使用的命令是:
python predict.py --source /root/autodl-tmp/CNN/flower_data/flower_photos/daisy/8710109684_e2c5ef6aeb_n.jpg
预测结果:
这里我们可以看到预测的结果是正确的哦。
