一、整体流程

github代码地址

1. 数据集下载地址:https:/​/​www.​kaggle.​com/​paultimothymooney/​chest-​xray-​pneumonia/​download

2. 数据集展示

 

案例主要流程:

第一步:加载预训练模型ResNet,该模型已在ImageNet上训练过。

第二步:冻结预训练模型中低层卷积层的参数(权重)

第三步:用可训练参数的多层替换分类层。

第四步:在训练集上训练分类层。

第五步:微调超参数,根据需要解冻更多层。

 

ResNet 网络结构图

 

 二、显示图片功能

#1加载库import torchimport torch.nn as nnimport numpy as npimport matplotlib.pyplot as pltfrom torchvision import datasets, transformsimport osfrom torchvision.utils import make_gridfrom torch.utils.data import DataLoader#2、定义一个方法:显示图片def img_show(inp, title=None):plt.figure(figsize=(14,3))inp = inp.numpy().transpose((1,2,0)) #转成numpy,然后转置mean = np.array([0.485, 0.456, 0.406])std = np.array([0.229, 0.224,0.225])inp = std * inp + meaninp = np.clip(inp, 0, 1)plt.imshow(inp)if title is not None:plt.title(title)plt.pause(0.001)plt.show()def main():pass#3、定义超参数BATCH_SIZE = 8DEVICE = torch.device("gpu" if torch.cuda.is_available() else "cpu")#4、图片转换使用字典进行转换data_transforms = {'train': transforms.Compose([transforms.Resize(300),transforms.RandomResizedCrop(300) ,#随机裁剪transforms.RandomHorizontalFlip(),transforms.CenterCrop(256),transforms.ToTensor(), #转为张量transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) #正则化]),'val': transforms.Compose([transforms.Resize(300),transforms.CenterCrop(256),transforms.ToTensor(), #转为张量transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) #正则化])}#5、操作数据集# 5.1、数据集路径data_path = "D:/chest_xray/"#5.2、加载数据集的train valimg_datasets = { x : datasets.ImageFolder(os.path.join(data_path,x),data_transforms[x]) for x in ["train","val"]}#5.3、为数据集创建一个迭代器,读取数据dataloaders = {x : DataLoader(img_datasets[x], shuffle=True,batch_size= BATCH_SIZE) for x in ["train","val"]}# 5.4、训练集和验证集的大小(图片的数量)data_sizes = {x : len(img_datasets[x]) for x in ["train","val"]}# 5.5、获取标签类别名称 NORMAL 正常 -- PNEUMONIA 感染target_names = img_datasets['train'].classes#6 显示一个batch_size 的图片(8张图片)#6.1 读取8张图片datas ,targets = next(iter(dataloaders['train'])) #iter把对象变为可迭代对象,next去迭代#6.2、将若干正图片平成一副图像out = make_grid(datas, norm = 4, padding = 10)#6.3显示图片img_show(out, title=[target_names[x] for x in targets]) #title拿到类别,也就是标签呢if __name__ == '__main__':main()

上面代码实现的功能就是展示图片样例 (未完待续)

显示数据集中的图片样例:

 三、迁移学习,进行模型的微调

迁移学习(Transfer learning) 就是把已经训练好的模型参数迁移到新的模型来帮助新模型训练。

 

 后面这个代码使用Jupyter NoteBook

案例:肺部检测# 1 加入必要的库import torchimport torch.nn as nnimport numpy as npimport torch.optim as optimfrom torch.optim import lr_schedulerfrom torchvision import datasets, transforms, utils, modelsimport timeimport matplotlib.pyplot as pltfrom torch.utils.data import DataLoaderfrom torch.utils.tensorboard.writer import SummaryWriterimport osimport torchvisionimport copy# 2 加载数据集​# 2.1 图像变化设置data_transforms = {"train":transforms.Compose([transforms.RandomResizedCrop(300), transforms.RandomHorizontalFlip(), transforms.CenterCrop(256),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),"val":transforms.Compose([transforms.Resize(300),transforms.CenterCrop(256),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),'test':transforms.Compose([transforms.Resize(size=300),transforms.CenterCrop(size=256),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224,0.225])]),}# 3 可视化图片def imshow(inp, title=None):inp = inp.numpy().transpose((1,2,0))mean = np.array([0.485, 0.456, 0.406])std = np.array([0.229, 0.224, 0.225])inp = std * inp + meaninp = np.clip(inp, 0, 1)plt.imshow(inp)if title is not None:plt.title(title)plt.pause(0.001)# 6 可视化模型预测​def visualize_model(model, num_images=6):"""显示预测的图片结果Args:model: 训练后的模型num_images: 需要显示的图片数量Returns:无"""was_training = model.trainingmodel.eval()images_so_far = 0fig = plt.figure()with torch.no_grad():for i, (datas, targets) in enumerate(dataloaders['val']):datas, targets = datas.to(device), targets.to(device)outputs = model(datas) # 预测数据_, preds = torch.max(outputs, 1) # 获取每行数据的最大值for j in range(datas.size()[0]):images_so_far += 1 # 累计图片数量ax = plt.subplot(num_images // 2, 2, images_so_far) # 显示图片ax.axis('off') # 关闭坐标轴ax.set_title('predicted:{}'.format(class_names[preds[j]]))imshow(datas.cpu().data[j])if images_so_far == num_images:model.train(mode=was_training)returnmodel.train(mode=was_training)# 7 定义训练函数def train(model, device, train_loader, criterion, optimizer, epoch, writer):# 作用:声明在模型训练时,采用Batch Normalization 和 Dropout# Batch Normalization : 对网络中间的每层进行归一化处理,保证每层所提取的特征分布不会被破坏# Dropout : 减少过拟合model.train()total_loss = 0.0 # 总损失初始化为0.0# 循环读取训练数据,更新模型参数for batch_id, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad() # 梯度初始化为零output = model(data) # 训练后的输出loss = criterion(output, target) # 计算损失loss.backward() # 反向传播optimizer.step() # 参数更新total_loss += loss.item() # 累计损失# 写入日志writer.add_scalar('Train Loss', total_loss / len(train_loader), epoch)writer.flush() # 刷新return total_loss / len(train_loader) # 返回平均损失值# 8 定义测试函数def test(model, device, test_loader, criterion, epoch, writer):# 作用:声明在模型训练时,不采用Batch Normalization 和 Dropoutmodel.eval()# 损失和正确total_loss = 0.0correct = 0.0# 循环读取数据with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)# 预测输出output = model(data)# 计算损失total_loss += criterion(output, target).item()# 获取预测结果中每行数据概率最大的下标_,preds = torch.max(output, dim=1) # pred = output.data.max(1)[1]# 累计预测正确的个数correct += torch.sum(preds == target.data)# correct += pred.eq(target.data).cpu().sum()######## 增加 #######misclassified_images(preds, writer, target, data, output, epoch) # 记录错误分类的图片# 总损失total_loss /= len(test_loader)# 正确率accuracy = correct / len(test_loader)# 写入日志writer.add_scalar('Test Loss', total_loss, epoch)writer.add_scalar('Accuracy', accuracy, epoch)writer.flush()# 输出信息print("Test Loss : {:.4f}, Accuracy : {:.4f}".format(total_loss, accuracy))return total_loss, accuracy# 定义函数,获取Tensorboard的writerdef tb_writer():timestr = time.strftime("%Y%m%d_%H%M%S")writer = SummaryWriter('logdir/' + timestr)return writer​# 8 模型微调​# 定义一个池化层处理函数class AdaptiveConcatPool2d(nn.Module):def __init__(self, size=None):super().__init__()size = size or (1,1) # 池化层的卷积核大小,默认值为(1,1)self.pool_one = nn.AdaptiveAvgPool2d(size) # 池化层1self.pool_two = nn.AdaptiveMaxPool2d(size) # 池化层2def forward(self, x):return torch.cat([self.pool_one(x), self.pool_two(x)], 1) # 连接两个池化层 def get_model():model_pre = models.resnet50(pretrained=True) # 获取预训练模型​# 冻结预训练模型中所有参数for param in model_pre.parameters():param.requires_grad = False​# 替换ResNet最后的两层网络,返回一个新的模型(迁移学习)model_pre.avgpool = AdaptiveConcatPool2d() # 池化层替换model_pre.fc = nn.Sequential(nn.Flatten(), # 所有维度拉平nn.BatchNorm1d(4096), # 正则化处理nn.Dropout(0.5), # 丢掉神经元nn.Linear(4096, 512), # 线性层处理nn.ReLU(), # 激活函数nn.BatchNorm1d(512), # 正则化处理nn.Dropout(p=0.5), # 丢掉神经元nn.Linear(512, 2), # 线性层nn.LogSoftmax(dim=1) # 损失函数)return model_pre​def train_epochs(model, device, dataloaders, criterion, optimizer, num_epochs, writer):"""Returns:返回一个训练过后最好的模型"""print("{0:>20} | {1:>20} | {2:>20} | {3:>20} |".format('Epoch', 'Training Loss', 'Test Loss', 'Accuracy'))best_score = np.inf # 假设最好的预测值start = time.time() # 开始时间# 开始循环读取数据进行训练和验证for epoch in num_epochs:train_loss = train(model, device, dataloaders['train'], criterion, optimizer, epoch, writer)test_loss, accuracy = test(model, device, dataloaders['val'], criterion, epoch, writer)if test_loss 20} | {1:>20} | {2:>20} | {3:>20.2f} |".format(epoch, train_loss, test_loss, accuracy))writer.flush()# 训练完所耗费的总时间time_all = time.time() - start# 输出时间信息print("Training complete in {:.2f}m {:.2f}s".format(time_all // 60, time_all % 60))def train_epochs(model, device, dataloaders, criterion, optimizer, num_epochs, writer):"""Returns:返回一个训练过后最好的模型"""print("{0:>20} | {1:>20} | {2:>20} | {3:>20} |".format('Epoch', 'Training Loss', 'Test Loss', 'Accuracy'))best_score = np.inf # 假设最好的预测值start = time.time() # 开始时间# 开始循环读取数据进行训练和验证for epoch in num_epochs:train_loss = train(model, device, dataloaders['train'], criterion, optimizer, epoch, writer)test_loss, accuracy = test(model, device, dataloaders['val'], criterion, epoch, writer)if test_loss 20} | {1:>20} | {2:>20} | {3:>20.2f} |".format(epoch, train_loss, test_loss, accuracy))writer.flush()# 训练完所耗费的总时间time_all = time.time() - start# 输出时间信息print("Training complete in {:.2f}m {:.2f}s".format(time_all // 60, time_all % 60))def misclassified_images(pred, writer, target, data, output, epoch, count=10):misclassified = (pred != target.data) # 记录预测值与真实值不同的True和Falsefor index, image_tensor in enumerate(data[misclassified][:count]):# 显示预测不同的前10张图片img_name = '{}->Predict-{}x{}-Actual'.format(epoch,LABEL[pred[misclassified].tolist()[index]],LABEL[target.data[misclassified].tolist()[index]],)writer.add_image(img_name, inv_normalize(image_tensor), epoch)# 9 训练和验证# 定义超参数model_path = 'model.pth'batch_size = 16device = torch.device('gpu' if torch.cuda.is_available() else 'cpu') # gpu和cpu选择# 2.2 加载数据data_path = "D:/chest_xray/" # 数据集所在的文件夹路径# 2.2.1 加载数据集image_datasets = {x : datasets.ImageFolder(os.path.join(data_path, x), data_transforms[x]) for x in ['train', 'val', 'test']}# 2.2.2 为数据集创建iteratordataloaders = {x : DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'val', 'test']}# 2.2.3 训练集和验证集的大小data_sizes = {x : len(image_datasets[x]) for x in ['train', 'val', 'test']}# 2.2.4 训练集所对应的标签class_names = image_datasets['train'].classes # 一共有2个:NORMAL正常 vs PNEUMONIA肺炎LABEL = dict((v, k ) for k, v in image_datasets['train'].class_to_idx.items())print("-" * 50)# 4 获取trian中的一批数据datas, targets = next(iter(dataloaders['train']))# 5 显示这批数据out = torchvision.utils.make_grid(datas)imshow(out, title=[class_names[x] for x in targets])# 将tensor转换为imageinv_normalize = transforms.Normalize(mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],std=[1/0.229, 1/0.224, 1/0.255])writer = tb_writer()images, labels = next(iter(dataloaders['train'])) # 获取一批数据grid = torchvision.utils.make_grid([inv_normalize(image) for image in images[:32]]) # 读取32张图片writer.add_image('X-Ray grid', grid, 0) # 添加到TensorBoardwriter.flush() # 将数据读取到存储器中model = get_model().to(device) # 获取模型criterion = nn.NLLLoss() # 损失函数optimizer = optim.Adam(model.parameters())train_epochs(model, device, dataloaders, criterion, optimizer, range(0,10), writer)writer.close() # 9 训练和验证​# 定义超参数model_path = 'model.pth'batch_size = 16device = torch.device('gpu' if torch.cuda.is_available() else 'cpu') # gpu和cpu选择​# 2.2 加载数据data_path = "D:/chest_xray/" # 数据集所在的文件夹路径​# 2.2.1 加载数据集image_datasets = {x : datasets.ImageFolder(os.path.join(data_path, x), data_transforms[x]) for x in ['train', 'val', 'test']}​# 2.2.2 为数据集创建iteratordataloaders = {x : DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'val', 'test']}​# 2.2.3 训练集和验证集的大小data_sizes = {x : len(image_datasets[x]) for x in ['train', 'val', 'test']}​# 2.2.4 训练集所对应的标签class_names = image_datasets['train'].classes # 一共有2个:NORMAL正常 vs PNEUMONIA肺炎LABEL = dict((v, k ) for k, v in image_datasets['train'].class_to_idx.items())​print("-" * 50)​# 4 获取trian中的一批数据datas, targets = next(iter(dataloaders['train']))​# 5 显示这批数据out = torchvision.utils.make_grid(datas)​imshow(out, title=[class_names[x] for x in targets])​# 将tensor转换为imageinv_normalize = transforms.Normalize(mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],std=[1/0.229, 1/0.224, 1/0.255])​writer = tb_writer()images, labels = next(iter(dataloaders['train'])) # 获取一批数据grid = torchvision.utils.make_grid([inv_normalize(image) for image in images[:32]]) # 读取32张图片writer.add_image('X-Ray grid', grid, 0) # 添加到TensorBoardwriter.flush() # 将数据读取到存储器中​model = get_model().to(device) # 获取模型criterion = nn.NLLLoss() # 损失函数optimizer = optim.Adam(model.parameters())train_epochs(model, device, dat