目录

1、摘要

 2、Method

2.1 模拟异常样本

 2.2 Memory Module

 2.3 空间注意模块

2.4 多尺度特征融合模块

2.5 损失函数设置

2.6 Decoder模块


1、摘要

本文认为人为创建类内差异和保持类内共性可以帮助模型实现更好的缺陷检测能力,从而更好地区分非正常图像。如图一所示。

差异(differences):

MemSeg引入人工模拟的异常图像在训练阶段使模型能够区分正常和异常的图像,减轻了半监督学习只能用正常样本进行训练的不足,然后在推理阶段可以直接输入图像无需做任何后处理。

共性(commonalities):

MemSeg引入了一个记忆池来记录正常样本的通用模式,在训练和推理阶段,比较输入图像和记忆池中的样本之间的差异,为异常区域的定位提供有用的信息。同时为了更好地协调来自记忆池和输入图像的信息,还引入了多尺度特征融合模块和空间注意模块。

论文主要贡献:

  1. 设计了一种异常模拟策略来进行模型的自监督学习,该方法整合了目标前景异常、纹理异常和结构异常。
  2. 提出了一种具有更有效的特征匹配算法的记忆模块,并在U-net结构中引入正常模式的记忆信息来辅助模型学习。
  3. 结合上述两点,并结合多尺度特征融合模块和空间注意模块,将半监督异常检测简化为端到端语义分割任务,使半监督图像表面缺陷检测更加灵活。
  4. 通过实验验证,MemSeg在表面缺陷检测和定位任务中具有较高的精度,同时可以更好地满足工业场景的实时需求。

数据可视化分析:

本文采用的是 MVTec-AD(工业质检数据集):工业质检数据集MVTec-AD

最小最大函数:将一张图像张量作为输入,并对其应用最小-最大缩放。缩放是通过从每个像素中减去图像的最小值,除以值的范围(最大值 - 最小值),然后乘以255来缩放到0到255之间的值。最后,使用to()方法将结果张量转换为uint8数据类型。

def minmax_scaling(img):
    """最大-最小缩放函数:将图像像素值归一化到特定范围内,这可以用于各种计算机视觉任务,例如图像分类或物体检测"""
    return (((img - img.min()) / (img.max() - img.min())) * 255).to(torch.uint8)

# 数据对应的图片、mask、target
img, mask, target = trainset[0]

# permute:重新排列张量的维度
fig, ax = plt.subplots(1,2,figsize=(10,15))
ax[0].imshow(minmax_scaling(img.permute(1,2,0)))
ax[0].set_title('image')
ax[1].imshow(minmax_scaling(mask), cmap='gray')
ax[1].set_title('mask')

 2、Method

      MemSeg采用U-Net作为框架,在训练阶段利用模拟的异常图片和记忆信息来完成语义分割任务,如上图所示采用预训练的ResNet18作为编码器,从左到右依次为:模拟异常样本->记忆模块->多尺度特征融合模块->空间注意模块->采用双损失函数进行端到端训练。

2.1 模拟异常样本

实际场景下数据集不可能覆盖所有的异常,同时在半监督框架中仅使用正常样本进行训练而不与异常样本进行比较不足以让模型了解什么是正常模式,本文提出的方法主要有三个步骤:

step1:

生成一个二维的Perlin噪声P,然后通过阈值T进行二值化得到掩码Mp,perlin噪声中有多个随机峰值,由它得到的Mp可以提取图像中连续的区域块,同时考虑到采集图像中某些工业成分主体的比例比较小,如果不进行处理直接进行数据增强,容易在图像的背景部分产生噪声,增加模拟异常样本与真实样本在数据分布上的差异,不利于模型学习有效的鉴别信息,因此对这类图像采用前景增强策略,使用开操作和闭操作去除在二值化处理中产生的噪声,即输入图像对应的Mi,然后通过对获得的两个掩码进行元素级乘积得到最终的掩码图像M。

 注意:

(1)Perlin噪声是一种用于计算机图形学中的随机噪声函数,它是一种连续可微的梯度噪声函数,由Ken Perlin于1983年发明。Perlin噪声可以用于生成自然界中的各种纹理,例如云彩、火焰、树木等。它在游戏开发中也得到了广泛应用。

Perlin噪声的原理是将坐标系划分成一块一块的晶格,之后在晶格的顶点处生成一个随机的梯度,通过与晶格顶点到晶格内点的向量进行点乘加权计算后得到噪声 。

(2)在图像处理中,开操作和闭操作是形态学图像处理中的两种基本操作。开操作可以平滑物体轮廓、断开较窄的狭颈并消除细的突出物,而闭操作则可以弥合较窄的间断和细长的沟壑,消除小的孔洞,填补轮廓线中的断裂。

在Python中,我们可以使用OpenCV库来实现形态学图像处理中的开操作和闭操作。下面是一个示例代码,演示如何使用OpenCV实现这两种操作:

import cv2
import numpy as np

# 读取图像
img = cv2.imread('image.jpg', 0)

# 定义结构元素
kernel = np.ones((5,5),np.uint8)

# 进行开操作
opening = cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel)

# 进行闭操作
closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)

# 显示结果
cv2.imshow('Original Image', img)
cv2.imshow('Opening', opening)
cv2.imshow('Closing', closing)
cv2.waitKey(0)
cv2.destroyAllWindows()

在这个示例中,我们首先读取了一张灰度图像,然后定义了一个5x5的结构元素。接下来,我们使用`cv2.morphologyEx()`函数分别对原始图像进行了开操作和闭操作,并将结果显示出来。

需要注意的是,在进行形态学操作之前,我们需要将图像转换为灰度图像,并确保图像的大小和结构元素的大小相同。此外,我们还可以使用不同的结构元素来获得不同程度的平滑效果。

元素级别乘积:

指对两个数组中每个位置上的元素进行乘法运算。 这意味着两个数组必须具有相同的形状,否则就会出现形状不匹配的错误。 例如,我们可以通过以下代码创建两个2×2的NumPy数组:

import numpy as np

a = np.array([[1, 2], [3, 4]])

b = np.array([[5, 6], [7, 8]])

现在,我们将使用元素级别乘法将这两个数组相乘:

c = np.multiply(a, b)

print(c)

这将输出: array([[ 5, 12], [21, 32]])

我们可以看到,在每个位置上,一个数组中的元素与另一个数组中的相应元素进行了乘法运算,并将结果存储在输出数组中。

代码实现:

一、生成M_{I}

# 1、原图和调整图像大小
    img = cv2.imread(train_file_list[0])
    print(img.shape) # H*W*C,opencv读取的图像都是BGR格式,需要转成RGB格式
    img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
    img = cv2.resize(img,dsize=(256,256))
    print(img.shape)
    plt.imshow(img)
    plt.show()

 

# step1:生成mask
    img_gray = cv2.cvtColor(img,cv2.COLOR_RGB2GRAY)
    plt.imshow(img_gray,cmap="gray")
    # plt.show()
    _,mask_target_background = cv2.threshold(img_gray,100,255,cv2.THRESH_BINARY | cv2.THRESH_OTSU)
    # 新版本np.bool被更换成np.bool_,可以打开对应的文件查看里面的说明
    mask_target_background = mask_target_background.astype(np.bool_).astype(np.intp)
    # print(mask_target_background)
    # 这里是将0转换成1,1转换成0
    mask_target_foreground = -(mask_target_background-1)
    # print(mask_target_background)
    fig,ax = plt.subplots(1,2,figsize=(10,15))
    ax[0].imshow(mask_target_background,cmap="gray")
    ax[0].set_title("Background")
    ax[1].imshow(mask_target_foreground,cmap="gray")
    ax[1].set_title("Foreground")
    plt.show()

 

 二、生成Perlin噪声M_{P}

def rand_perlin_2d_np(shape, res, fade=lambda t: 6 * t ** 5 - 15 * t ** 4 + 10 * t ** 3):
    delta = (res[0] / shape[0], res[1] / shape[1])
    d = (shape[0] // res[0], shape[1] // res[1])
    grid = np.mgrid[0:res[0]:delta[0], 0:res[1]:delta[1]].transpose(1, 2, 0) % 1

    angles = 2 * math.pi * np.random.rand(res[0] + 1, res[1] + 1)
    gradients = np.stack((np.cos(angles), np.sin(angles)), axis=-1)
    tt = np.repeat(np.repeat(gradients,d[0],axis=0),d[1],axis=1)

    tile_grads = lambda slice1, slice2: np.repeat(np.repeat(gradients[slice1[0]:slice1[1], slice2[0]:slice2[1]],d[0],axis=0),d[1],axis=1)
    dot = lambda grad, shift: (
                np.stack((grid[:shape[0], :shape[1], 0] + shift[0], grid[:shape[0], :shape[1], 1] + shift[1]),
                            axis=-1) * grad[:shape[0], :shape[1]]).sum(axis=-1)

    n00 = dot(tile_grads([0, -1], [0, -1]), [0, 0])
    n10 = dot(tile_grads([1, None], [0, -1]), [-1, 0])
    n01 = dot(tile_grads([0, -1], [1, None]), [0, -1])
    n11 = dot(tile_grads([1, None], [1, None]), [-1, -1])
    t = fade(grid[:shape[0], :shape[1]])
    return math.sqrt(2) * lerp_np(lerp_np(n00, n10, t[..., 0]), lerp_np(n01, n11, t[..., 0]), t[..., 1])


# perlin噪声
perlin_scale = 6
min_perlin_scale = 0
torch_seed(10)
perlin_scalex = 2**(torch.randint(min_perlin_scale,perlin_scale,(1,)).numpy()[0])
perlin_scaley = 2**(torch.randint(min_perlin_scale,perlin_scale,(1,)).numpy()[0])
perlin_noise = rand_perlin_2d_np((256,256),(perlin_scalex,perlin_scaley))
plt.imshow(perlin_noise,cmap="gray")
plt.show()

# 旋转
rot = iaa.Affine(rotate=(-90,90))
perlin_noise = rot(image=perlin_noise)
# plt.imshow(perlin_noise,cmap="gray")
# plt.show()
threshold = 0.5
mask_noise = np.where(perlin_noise > threshold,np.ones_like(perlin_noise),np.zeros_like(perlin_noise))
plt.imshow(mask_noise,cmap="gray")
plt.show()

阈值处理之后再旋转的图像

 

 生成二维Perlin噪声

def generate_perlin_noise(width=256, height=256, scale=10):
    """
    生成二维Perlin噪声
    :param width: 噪声图像的宽度
    :param height: 噪声图像的高度
    :param scale: 噪声的规模,默认为10
    :return: 二维Perlin噪声图像
    """
    image = np.zeros((height, width))
    for y in range(height):
        for x in range(width):
            perlin_value = pnoise2(x / scale, y / scale)
            image[y, x] = perlin_value
    return image

perlin_noise = generate_perlin_noise()
plt.imshow(perlin_noise,cmap="gray")
plt.show()

 

三、M_{I}M_{P}像素级乘积

 # generate mask
    mask = mask_noise*mask_target_foreground
    mask = np.expand_dims(mask,axis=2)
    plt.imshow(mask,cmap="gray")
    plt.show()

Mi和Mp的像素级乘积

四、纹理异常模拟

texture_source_file_list = glob(os.path.join("./dtd/images","*/*"))
    # print(texture_source_file_list)
    # fig,ax = plt.subplots(4,5,figsize=(20,15))
    # for i in range(4*5):
    #     texture_source_img = cv2.imread(texture_source_file_list[i])
    #     texture_source_img = cv2.cvtColor(texture_source_img,cv2.COLOR_BGR2RGB)
    #     ax[i//5,i%5].imshow(texture_source_img)
    # plt.show()

    texture_source_img = cv2.imread(texture_source_file_list[0])
    texture_source_img = cv2.cvtColor(texture_source_img,cv2.COLOR_BGR2RGB)
    texture_source_img = cv2.resize(texture_source_img,dsize=(256,256)).astype(np.float32)
    plt.imshow(texture_source_img.astype(np.uint8))
    plt.show()

 

结构异常:

we first perform random adjustment of mirror symmetry, rotation, brightness, saturation, and hue on the input image I. Then the preliminary processed image is uniformly divided into a 4×8 grid and randomly arranged to obtain the disordered image I。

 """结构图像异常"""
    augmenters = [
        iaa.GammaContrast((0.5,2.0),per_channel=True),
        iaa.MultiplyAndAddToBrightness(mul=(0.8,1.2),add=(-30,30)),
        iaa.pillike.EnhanceSharpness(),
        iaa.AddToHueAndSaturation((-50,50),per_channel=True),
        iaa.Solarize(0.5,threshold=(32,128)),
        iaa.Posterize(),
        iaa.Invert(),
        iaa.pillike.Autocontrast(),
        iaa.pillike.Equalize(),
        iaa.Affine(rotate=(-45,45))
    ]

    aug_id = np.random.choice(np.arange(len(augmenters)),3,replace=False)
    aug = iaa.Sequential([
        augmenters[aug_id[0]],
        augmenters[aug_id[1]],
        augmenters[aug_id[2]]
    ])
    struct_source_img = aug(image=img)
    plt.imshow(struct_source_img)
    plt.show()

 

 

grid_w,grid_h = 256//8,256//4
    struct_source_img = rearrange(tensor=struct_source_img,
                                  pattern='(h gh) (w gw) c -> (h w) gw gh c',
                                  gw = grid_w,
                                  gh = grid_h)
    disordered_idx = np.arange(struct_source_img.shape[0])
    np.random.shuffle(disordered_idx)
    struct_source_img = rearrange(
        tensor=struct_source_img[disordered_idx],
        pattern='(h w) gw gh c -> (h gh) (w gw) c',
        h=4,
        w=8
    ).astype(np.float32)
    plt.imshow(struct_source_img.astype(np.uint8))
    plt.show()

 

 

step2:

第一步得到的掩码图像M和噪声图像In执行元素级乘积得到由M定义的In中的感兴趣区域。与DREAM方法保持一致,在这个过程中引入了一个透明度因子来平衡原始图像和有噪声图像的融合,从而模拟异常模式更接近真实异常。

I_{n}^{'}=\sigma (M\odot I_{n} )+(1-\sigma )(M\odot I)

其中sigma代表透明度因子,采用上述公式可以得到有噪声的前景图像,对于噪声图像I_{n},希望其最大透明度,以增加模型学习的能力,其中sigma的取值范围为[0.15,1]。

factor = np.random.uniform(0.15,1,size=1)[0]
# print(factor)
texture_source_img = factor*(mask*texture_source_img)+(1-factor)*(mask*img)
struct_source_img = factor*(mask*struct_source_img)+(1-factor)*(mask*img)
fig, ax = plt.subplots(1, 2, figsize=(10, 15))
ax[0].imshow(texture_source_img.astype(np.uint8))
ax[0].set_title('texture')
ax[1].imshow(struct_source_img.astype(np.uint8))
ax[1].set_title('structure')
plt.show()

 

 

step3:

掩码图像M转换成\bar{M},然后在\bar{M}和原始图像I上执行元素级乘积,得到图像I^{'},根据下述公式:

I_{A}=\bar{M}\odot I+I_{n}^{'}

得到数据增强图像I_{A},即模拟异常图像,它采用原始输入图像I作为背景,以掩码图像M提取的噪声图像I_{n}中的ROI作为前景。

 

# Step 3. Blending image and anomaly source
texture_anomaly = ((- mask + 1) * img) + texture_source_img
structure_anomaly = ((- mask + 1) * img) + struct_source_img
fig, ax = plt.subplots(1, 2, figsize=(10, 15))
ax[0].imshow(texture_anomaly.astype(np.uint8))
ax[0].set_title('texture anomaly')
ax[1].imshow(structure_anomaly.astype(np.uint8))
ax[1].set_title('structure anomaly')
plt.show()

 

 

 2.2 Memory Module

人类的异常区域是通过比较测试图像和记忆中的正常图像来获得的,本文采用少量正常的样本作为记忆样本,并使用预训练ResNet18提取记忆样本的高级特征作为记忆信息,以协助MemSeg学习。

具体方法:从训练集中随机选择N张正常图像作为记忆样本,然后通过ResNet18得到三个不同维度的特征:N×64×64×64、N×128×32×32和N×256×16×16(分别表示block1、block2、block3),然后将三个block一起组成记忆信息MI,为了确保记忆信息和输入图像的高级特征的统一,于是冻结ResNet18在block1、block2、block3中的模型参数,但是模型其他部分是可训练的。

然后计算所有记忆样本与输入图像之间的L2距离,从而得到输入图像与记忆样本之间的N差分信息DI。

DI=\bigcup_{i=1}^{N}\left \| MI_{i} - II \right \|_{2}

L2距离是指两点之间的欧几里得距离,也称为直线距离。 该距离是在n维空间中两个点之间的最短距离,其中n可以是任何正整数。

L2距离的计算方式是两个向量中每个元素相减后平方,再求和,最后取平方根。 例如,对于两个n维向量X=(x1,x2,...,xn)和Y=(y1,y2,...,yn),L2距离的计算公式为:D(X,Y)=\sqrt{\sum_{i=1}^n{(x_i-y_i)^2}}其中$\sum_{i=1}^n{(x_i-y_i)^2}$是X和Y中每个元素差的平方和。

其中N是记忆样本数,对于N个差分信息,以每个DI中所有元素的最小和为标准,得到II和MI之间的最佳差分信息,也就是

DI^*= \text{argmin}_{DI_i \in DI} \sum_{x \in DI_i} x

其中i取值范围[1,N],最佳差分信息DI^{*}包含输入样本与其最相似的记忆样本之间的差,一个位置的差值越大,输入图像与该位置对于的区域异常的概率就越高。

和标准进行比较,与标准差别越大就表示其很大概率是异常图像!

为什么采用ResNet18的高级特征?(其他网络应该也可以吧TODO)

因为预训练模型在大量数据上进行了训练,学习到了通用的特征表示,这些特征可以很好地推广到新的任务上。 通过迁移学习,我们可以利用预训练模型学到的特征表示,来加速我们自己的模型训练过程,同时提高模型的性能。

高级特征?

是指从深度学习模型中提取出来的更加抽象、更加具有语义信息的特征,这些特征往往是在深层网络中提取出来的。 与低级特征相比,高级特征更加抽象,更加难以描述,但是它们能够更好地表达数据之间的复杂关系,从而提高模型的性能。

随后,将获得的最佳差分信息DI^{*}和输入图像II提取的高级特征执行串联操作,得到对应的串联信息CI_{1}CI_{2}CI_{3}。最后拼接后的信息通过多尺度特征融合模块进行特征融合,融合后的特征通过U-Net的跳连接传到解码器。

代码实现部分:

这是一个名为`MemoryBank`的Python类,它代表一个用于存储从普通图像中提取的特征的记忆库。这个类有几个方法:

  * `__init__(self, normal_dataset, nb_memory_sample: int = 30, device='cpu')`:这个方法用给定的普通数据集初始化记忆库,要保存在记忆中的样本数(默认为30),以及处理数据所需的设备(默认为CPU)。

  * `update(self, feature_extractor)`:这个方法通过使用给定的特征提取器从普通图像中提取特征来更新记忆库。它随机选择数据集中的一个子集的普通图像,使用特征提取器提取它们的特征,并将这些特征存储在记忆库中。

  * `_calc_diff(self, features: List[torch.Tensor]) -> torch.Tensor`:这个私有方法计算输入特征与存储在记忆库中的特征之间的差异。它通过对每对特征在所有级别上计算均方误差(MSE)损失来实现。

  * `select(self, features: List[torch.Tensor]) -> torch.Tensor`:这个方法选择具有最小差异的特征与普通数据集。它首先使用`_calc_diff`方法计算输入特征与存储在记忆库中的特征之间的差异。然后,它选择具有最小差异的所有级别的特征,并将它们与输入特征连接起来。最后,它返回更新后的特征。

class MemoryBank:
    def __init__(self, normal_dataset, nb_memory_sample: int = 30, device='cpu'):
        self.device = device
        
        # memory bank
        self.memory_information = {}
        
        # normal dataset
        self.normal_dataset = normal_dataset
        
        # the number of samples saved in memory bank
        self.nb_memory_sample = nb_memory_sample
        
        
    def update(self, feature_extractor):
        """这段代码定义了一个名为update的方法,用于更新特征提取器。 首先,它定义了一个样本索引数组samples_idx,其中包含从数据集中随机选择的样本索引。然后,它使用torch.no_grad()上下文管理器来禁用梯度计算,以提高训练效率。接下来,它使用一个循环来迭代地从数据集中选择图像,并将其输入到特征提取器中以提取特征。对于每个选定的图像,它将图像转换为张量并将其移动到指定的设备上(例如GPU)。然后,它使用特征提取器将图像转换为特征向量,并将这些特征向量存储在内存银行中。具体来说,它遍历每个特征向量,并将其添加到内存信息字典中。如果该级别尚未存在于字典中,则创建一个新的键值对;否则,将新的特征向量与现有特征向量连接起来。"""
        feature_extractor.eval()
        
        # define sample index
        samples_idx = np.arange(len(self.normal_dataset))
        np.random.shuffle(samples_idx)
        
        # extract features and save features into memory bank
        with torch.no_grad():
            for i in range(self.nb_memory_sample):
                # select image
                input_normal, _, _ = self.normal_dataset[samples_idx[i]]
                input_normal = input_normal.to(self.device)
                
                # extract features
                features = feature_extractor(input_normal.unsqueeze(0))
                
                # save features into memoery bank
                for i, features_l in enumerate(features[1:-1]):
                    if f'level{i}' not in self.memory_information.keys():
                        self.memory_information[f'level{i}'] = features_l
                    else:
                        self.memory_information[f'level{i}'] = torch.cat([self.memory_information[f'level{i}'], features_l], dim=0)

    """这段代码定义了一个名为`_calc_diff`的方法,用于计算特征提取器中存储的特征向量与目标特征之间的差异。首先,它创建一个大小为`(batch size, nb_memory_sample)`的零张量`diff_bank`,其中`batch size`是输入特征的数量,`nb_memory_sample`是存储在内存银行中的样本数量。然后,它遍历内存信息字典中的每个级别,并针对每个级别中的每个样本计算L2损失。具体来说,对于每个样本,它将该样本的特征向量重复`nb_memory_sample`次,并将其作为输入传递给`F.mse_loss`函数,该函数计算输入和目标特征之间的均方误差。最后,它将每个级别的损失累加到`diff_bank`张量中。最后,该方法返回`diff_bank`张量,其中包含每个级别中存储的特征向量与目标特征之间的差异。"""
    def _calc_diff(self, features: List[torch.Tensor]) -> torch.Tensor:
        # batch size X the number of samples saved in memory
        diff_bank = torch.zeros(features[0].size(0), self.nb_memory_sample)

        # level
        for l, level in enumerate(self.memory_information.keys()):
            # batch
            for b_idx, features_b in enumerate(features[l]):
                # calculate l2 loss
                diff = F.mse_loss(
                    input     = torch.repeat_interleave(features_b.unsqueeze(0), repeats=self.nb_memory_sample, dim=0), 
                    target    = self.memory_information[level], 
                    reduction ='none'
                ).mean(dim=[1,2,3])

                # sum loss
                diff_bank[b_idx] += diff
                
        return diff_bank
        
    """这段代码定义了一个名为`_calc_diff`的方法,用于计算特征提取器中存储的特征向量与目标特征之间的差异。首先,它创建一个大小为`(batch size, nb_memory_sample)`的零张量`diff_bank`,其中`batch size`是输入特征的数量,`nb_memory_sample`是存储在内存银行中的样本数量。然后,它遍历内存信息字典中的每个级别,并针对每个级别中的每个样本计算L2损失。具体来说,对于每个样本,它将该样本的特征向量重复`nb_memory_sample`次,并将其作为输入传递给`F.mse_loss`函数,该函数计算输入和目标特征之间的均方误差。最后,它将每个级别的损失累加到`diff_bank`张量中。最后,该方法返回`diff_bank`张量,其中包含每个级别中存储的特征向量与目标特征之间的差异。"""
    def select(self, features: List[torch.Tensor]) -> torch.Tensor:
        # calculate difference between features and normal features of memory bank
        diff_bank = self._calc_diff(features=features)
        
        # concatenate features with minimum difference features of memory bank
        for l, level in enumerate(self.memory_information.keys()):
            
            selected_features = torch.index_select(self.memory_information[level], dim=0, index=diff_bank.argmin(dim=1))
            diff_features = F.mse_loss(selected_features, features[l], reduction='none')
            features[l] = torch.cat([features[l], diff_features], dim=1)
            
        return features
    

 2.3 空间注意模块

为了充分利用差分信息,利用DI^{*}提取了三张空间注意图,用于加强对异常区域的最佳差分信息的猜测,对于DI^{*}中三个不同维度的特征,在通道维度计算平均值,分别得到大小为16×16、32×32和64×64的三个特征图。

16×16特征图直接用作空间注意图:M3,使用32×32特征图和上采样后的M3进行元素级乘积操作获得M2;再使用64×64特征图和上采样后的M2执行元素级乘积操作获得M1。

 如上图所示,空间注意图M1,M2,M3分别和经过多尺度特征融合处理后获得的信息进行加权,求解过程如上述公式所示。其中C3表示DI_{3}^{*}的通道数量,DI_{3i}^{*}表示通道i的特征图像,M_{2}^{U}表示上采样后的特征图。

特征图上采样?

是指将特征图的大小扩大,通常是通过复制边缘像素或者插值的方式来实现。 上采样可以增加模型的感受野,从而提高模型的性能。 常见的上采样方法有最近邻插值、双线性插值、双三次插值等。

通道维度?

指图像中的颜色通道,例如RGB图像有三个颜色通道。在卷积神经网络中,通道维度通常用于表示输入图像的通道数。例如,对于32×32×3大小的输入图像,其通道数为3。 

2.4 多尺度特征融合模块

通过记忆模块,得到输入图像信息II和最佳的差分信息DI^{*}组成的串联信息CI,直接使用这个串联信息存在特征冗余的问题,同时会降低推理速度。利用通道注意力机制和多尺度特征融合策略充分融合视觉信息和语义信息。

 对于串联信息CI_{n}(n=1,2,3),最开始经过一个3×3的卷积操作?

不采用2×2卷积,如果使用偶数大小的卷积核,网络在进行卷积操作时很难找到卷积的中心点,也就是偶数卷积核不对称这个问题也导致了导致在填充的(padding)的时候像素特征不断偏移。然后随着层次的加深,这个偏移现象就越来越明显。这种偏移问题,在一些任务,比如语义分割,等一些需要知道特征具体像素位置的任务中就产生了很大的影响。

相比之下,3×3卷积可以保留更多的空间信息,因为它可以通过重叠存储来捕获更大的区域。这使得3×3卷积在图像处理中更加灵活和可靠。

同时考虑到CI_{n}(n=1,2,3)  是通道维度中两种信息的简单连接,使用坐标注意(CA)来捕获CI_{n}(n=1,2,3)信道之间的信息关系。

然后,对于不同维度的特征加权坐标注意力,继续执行多尺度信息融合:不同维度的特征图首先采用上采样法进行分辨率对齐,然后利用卷积对通道数进行对齐,最后执行元素级加法操作,实现多尺度特征融合。融合后的特征与空间注意图M_{n}(n=1,2,3)进行加权。然后输入给最终的解码器。

代码实现部分:

 CoordAttention模块:CoordAttention是一种注意力机制,它可以捕获空间方向(即坐标)并生成coordinate-aware attention maps,从而提高模型对信息通道的敏感性。

 CoordAttention的注意力机制模块的计算过程如下: 

1. 沿着水平和垂直方向计算1维平均池化。
2. 拼接、卷积、BN、激活函数。
3. 分别卷积、Sigmoid。
4. 最后输出。

这段代码实现了一个名为CoordAttention的注意力机制模块,用于捕获空间方向信息并生成coordinate-aware attention maps。

首先定义了两个激活函数h_sigmoid和h_swish,其中h_sigmoid将输入通过ReLU6激活函数后除以6,h_swish将输入与h_sigmoid的输出相乘。

然后定义了一个名为CoordAtt的类,继承自nn.Module。在__init__方法中,定义了自适应平均池化层、卷积层、批归一化层和激活函数。其中自适应平均池化层用于对输入张量进行降维操作,卷积层用于提取特征,批归一化层用于加速训练过程,激活函数用于增加模型的非线性表达能力。

在forward方法中,首先将输入张量x进行池化操作,然后将其分成两个通道,分别进行卷积操作,得到两个特征图a_h和a_w。最后,将输入张量x与a_w和a_h相乘,得到输出张量out。这个操作相当于将输入张量的每个像素点与对应的特征图像素点相乘,从而得到一个新的张量,其中每个元素表示输入张量在该像素点的空间位置的重要性。

import torch
import torch.nn as nn
import math
import torch.nn.functional as F

class h_sigmoid(nn.Module):
    def __init__(self, inplace=True):
        super(h_sigmoid, self).__init__()
        self.relu = nn.ReLU6(inplace=inplace)

    def forward(self, x):
        return self.relu(x + 3) / 6

class h_swish(nn.Module):
    def __init__(self, inplace=True):
        super(h_swish, self).__init__()
        self.sigmoid = h_sigmoid(inplace=inplace)

    def forward(self, x):
        return x * self.sigmoid(x)


class CoordAtt(nn.Module):
    def __init__(self, inp, oup, reduction=32):
        super(CoordAtt, self).__init__()
        # nn.AdaptiveAvgPool2d((None, 1))表示将输入张量进行自适应平均池化操作,池化核大小为(1, 1),步长为1,返回的输出张量的尺寸为(n, 1)。这里的None表示输出张量的高和宽可以自动计算,以保持与输入张量的高度和宽度相同。
        self.pool_h = nn.AdaptiveAvgPool2d((None, 1))
        self.pool_w = nn.AdaptiveAvgPool2d((1, None))

        mip = max(8, inp // reduction)

        self.conv1 = nn.Conv2d(inp, mip, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(mip)
        self.act = h_swish()
        
        self.conv_h = nn.Conv2d(mip, oup, kernel_size=1, stride=1, padding=0)
        self.conv_w = nn.Conv2d(mip, oup, kernel_size=1, stride=1, padding=0)
        

    def forward(self, x):
        identity = x
        
        n,c,h,w = x.size()
        x_h = self.pool_h(x)
        x_w = self.pool_w(x).permute(0, 1, 3, 2)

        y = torch.cat([x_h, x_w], dim=2)
        y = self.conv1(y)
        y = self.bn1(y)
        y = self.act(y) 
        
        x_h, x_w = torch.split(y, [h, w], dim=2)
        x_w = x_w.permute(0, 1, 3, 2)

        a_h = self.conv_h(x_h).sigmoid()
        a_w = self.conv_w(x_w).sigmoid()

        out = identity * a_w * a_h

        return out

这段代码定义了两个PyTorch模块,用于多尺度特征金字塔(MSFF)架构。 MSFF架构用于目标检测任务中提取不同尺度的特征并将它们组合起来以提高性能。

第一个模块,MSFFBlock

表示MSFF架构中的单个块。 它接受一个形状为(in_channel,height,width)的输入张量,并应用两个卷积层和批归一化和ReLU激活函数。 然后,它将第二个卷积层的输出与输入张量相乘。 这个过程有助于学习突出输入图像中重要区域的空间注意力映射。

第二个模块,MSFF

定义了整个MSFF架构。 它由三个MSFFBlock组成,每个具有不同的输入通道数(128、256和512)。 在将输入通过这些块后,输出通过双线性插值进行上采样,然后通过两个附加的卷积层和批归一化和ReLU激活函数进行进一步处理。 这些层有助于细化特征并降低其分辨率。MSFF的forward方法接受一个形状为(batch_size,num_levels,height,width)的输入张量,其中num_levels是MSFF架构中的特征数。 它将输入分成三个级别(f1、f2、f3)并按顺序通过MSFFBlocks和上采样器传递。 在此之后,它通过计算相应特征图沿通道维度的平均值并应用来自注意力映射的比例因子来计算每个级别的空间注意力映射。 最后,它使用逐元素乘法将所有三个级别的特征组合起来,并返回形状为(batch_size,num_levels,height/2,width/2)的输出张量。

class MSFFBlock(nn.Module):
    def __init__(self, in_channel):
        super(MSFFBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channel, in_channel, kernel_size=3, stride=1, padding=1)
        self.attn = CoordAtt(in_channel, in_channel)
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channel, in_channel // 2, kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_channel // 2, in_channel // 2, kernel_size=3, stride=1, padding=1)
        )

    def forward(self, x):
        x_conv = self.conv1(x)
        x_att = self.attn(x)
        
        x = x_conv * x_att
        x = self.conv2(x)
        return x

    
class MSFF(nn.Module):
    def __init__(self):
        super(MSFF, self).__init__()
        self.blk1 = MSFFBlock(128)
        self.blk2 = MSFFBlock(256)
        self.blk3 = MSFFBlock(512)

        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        
        self.upconv32 = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1)
        )
        self.upconv21 = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)
        )

    def forward(self, features):
        # features = [level1, level2, level3]
        f1, f2, f3 = features 
        
        # MSFF Module
        f1_k = self.blk1(f1)
        f2_k = self.blk2(f2)
        f3_k = self.blk3(f3)

        f2_f = f2_k + self.upconv32(f3_k)
        f1_f = f1_k + self.upconv21(f2_f)

        # spatial attention
        
        # mask 
        m3 = f3[:,256:,...].mean(dim=1, keepdim=True)
        m2 = f2[:,128:,...].mean(dim=1, keepdim=True) * self.upsample(m3)
        m1 = f1[:,64:,...].mean(dim=1, keepdim=True) * self.upsample(m2)
        
        f1_out = f1_f * m1
        f2_out = f2_f * m2
        f3_out = f3_k * m3
        
        return [f1_out, f2_out, f3_out]
2.5 损失函数设置

采用L1损失和focal损失来保证图像空间中所有像素的相似性。focal损失减轻了图像中正常区域和异常区域之间的面积不平衡问题,使模型更加关注困难样本的分割,以提高异常分割的精度

L1损失相对L2损失来说能够保持更多的边缘信息。

 L1损失函数和L2损失函数?

都是机器学习中的常用损失函数,它们的区别在于对于异常值的敏感程度不同。L1损失函数对异常值更加鲁棒,因为它会把权重向量中小于某个阈值的元素都变为0,而L2损失函数则对异常值很敏感。但是,L1损失函数的导数是不连续的,从而让它无法有效地求解;而L2损失函数的导数是连续的,因此可以有效地求解 。

l1_criterion = nn.L1Loss()

Focal Loss损失函数?

是何恺明大神在RetinaNet网络中提出的,主要目的是为了解决one-stage目标检测中正负样本极不平衡的问题。它是基于二分类交叉熵CE的,是一个动态缩放的交叉熵损失,通过一个动态缩放因子,可以动态降低训练过程中易区分样本的权重,使得模型更加关注难以分类的样本  。

这段代码实现了一个名为FocalLoss的类,用于计算图像分类任务中的类别不平衡损失。

首先在初始化函数中定义了一些参数,包括gamma、alpha和size_average。其中gamma是一个调节因子,用于控制损失函数的形状;alpha是一个权重系数,用于平衡正负样本的权重;size_average表示是否对每个样本的损失进行平均。

在forward函数中,首先对输入张量进行处理,将其转换为N*H*W*C的形式,其中N为批次大小,H和W分别为图像的高和宽,C为类别数。然后将目标张量也转换为N*1的形式,其中1表示真实标签。接着计算log_softmax值,并将其与目标张量相乘得到logpt。最后根据alpha的值对logpt进行调整,并计算损失值loss。如果size_average为True,则返回loss的平均值,否则返回loss的总和。

import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):

    def __init__(self, gamma=0, alpha=None, size_average=True):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        if isinstance(alpha, (float, int)): self.alpha = torch.Tensor([alpha, 1 - alpha])
        if isinstance(alpha, list): self.alpha = torch.Tensor(alpha)
        self.size_average = size_average

    def forward(self, input, target):
        if input.dim()>2:
            input = input.view(input.size(0), input.size(1), -1)  # N,C,H,W => N,C,H*W
            input = input.transpose(1, 2)                         # N,C,H*W => N,H*W,C
            input = input.contiguous().view(-1, input.size(2))    # N,H*W,C => N*H*W,C
        target = target.view(-1, 1)

        logpt = F.log_softmax(input, dim=1)
        logpt = logpt.gather(1,target)
        logpt = logpt.view(-1)
        pt = logpt.exp()

        if self.alpha is not None:
            if self.alpha.type() != input.data.type():
                self.alpha = self.alpha.type_as(input.data)
            at = self.alpha.gather(0, target.data.view(-1))
            logpt = logpt * at

        loss = -1 * (1 - pt)**self.gamma * logpt
        if self.size_average: return loss.mean()
        else: return loss.sum()

Focal Loss的计算公式如下:FL(p_t)=-(1-p_t)^\gamma \log(p_t)其中,p_t=\left\{\begin{array}{l}p, if y=1\\ 1-p, otherwise\end{array}\right.\gamma为常数,当其为0时,FL就和交叉熵损失函数一样。当$gamma>0$时,$FL$会使得难分类样本的损失变小,而易分类样本的损失不变  。

2.6 Decoder模块

对于解码器部分,如图2所示,上采样层包含一个双线性插值层和一个基本的由卷积层、批归一化和一个ReLU激活函数组成的卷积块。Conv层包含两个堆叠的基本卷积块;只有最后一个Conv层包含一个基本卷积块和一个2通道卷积层。

双线性插值是一种用于重新采样图像和纹理的算法,它通过在两个方向上进行一次线性插值,然后在另一个方向上进行一次线性插值,得到一个加权平均值。   

MemSeg的训练过程经过2700次迭代,输入图像的大小设置为256×256,批量大小设置为8,其中包含4个正常样本和4个模拟异常样本。在进行异常模拟时,大多数类别使用纹理异常模拟和结构异常模拟的概率相等。

使用网格搜索来设置超参数:使用的学习率设置为0.04;focal损失中的Gamma设置为4;目标函数中的\lambda _{l1}\lambda _{f}分别设置为0.6和0.4。

网格搜索(Grid Search)是一种穷举搜索方法,它通过遍历超参数的所有可能组合来寻找最优超参数。网格搜索首先为每个超参数设定一组候选值,然后生成这些候选值的笛卡尔积,形成超参数的组合网格。对于每一种组合,都会训练模型并计算验证集上的误差。最后,选择误差最小的超参数组合作为最终的超参数组合 。

以下是一个使用网格搜索的例子,用于调整支持向量机(SVM)的超参数:

from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC

# 定义超参数组合
param_grid = {
    'C': [0.1, 1, 10, 100],
    'gamma': [1, 0.1, 0.01, 0.001],
    'kernel': ['rbf', 'poly', 'sigmoid'],
}

# 创建SVM分类器对象
svm = SVC()

# 创建网格搜索对象,传入超参数组合和分类器对象
grid_search = GridSearchCV(svm, param_grid=param_grid)

# 训练模型并输出最佳超参数组合和对应的准确率
grid_search.fit(X_train, y_train)
print("Best parameters:", grid_search.best_params_)
print("Best accuracy:", grid_search.best_score_)

代码实现部分:

 这段代码实现了一个名为Decoder的解码器模块,用于将编码器的输出进行上采样和卷积操作,最终生成目标图像。

首先定义了一个UpConvBlock类,该类实现了一个上采样和卷积操作的组合块。在初始化函数中,通过nn.Sequential将三个操作按顺序组合起来,包括上采样、卷积和批归一化操作。在forward函数中,将输入张量传入组合块中进行处理,并返回处理结果。

然后定义了Decoder类,该类继承自nn.Module。在初始化函数中,定义了一系列的上采样和卷积操作,包括四个UpConvBlock实例和两个卷积层。这些操作分别对输入张量进行不同尺度的上采样和特征提取,并将结果进行合并。

最后,在forward函数中,将编码器的输出和特征图进行一系列上采样和卷积操作,最终生成目标图像。具体来说,首先将编码器的输出通过四个UpConvBlock实例进行上采样和特征提取,然后将得到的特征图与对应的原始特征图进行拼接,再通过两个卷积层进行特征融合和降维操作,最终生成目标图像。

class UpConvBlock(nn.Module):
    def __init__(self, in_channel, out_channel):
        super(UpConvBlock, self).__init__()
        self.blk = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(out_channel),
            nn.ReLU()
        )

    def forward(self, x):
        return self.blk(x)


class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        
        self.conv = nn.Conv2d(64, 48, kernel_size=3, stride=1, padding=1)

        self.upconv3 = UpConvBlock(512, 256)
        self.upconv2 = UpConvBlock(512, 128)
        self.upconv1 = UpConvBlock(256, 64)
        self.upconv0 = UpConvBlock(128, 48)
        self.upconv2mask = UpConvBlock(96, 48)

        self.final_conv = nn.Conv2d(48, 2, kernel_size=3, stride=1, padding=1)

    def forward(self, encoder_output, concat_features):
        # concat_features = [level0, level1, level2, level3]
        f0, f1, f2, f3 = concat_features
        
        # 512 x 8 x 8 -> 512 x 16 x 16
        x_up3 = self.upconv3(encoder_output)
        x_up3 = torch.cat([x_up3, f3], dim=1)  

        # 512 x 16 x 16 -> 256 x 32 x 32
        x_up2 = self.upconv2(x_up3)
        x_up2 = torch.cat([x_up2, f2], dim=1)  

        # 256 x 32 x 32 -> 128 x 64 x 64
        x_up1 = self.upconv1(x_up2)
        x_up1 = torch.cat([x_up1, f1], dim=1)  

        # 128 x 64 x 64 -> 96 x 128 x 128
        x_up0 = self.upconv0(x_up1)
        f0 = self.conv(f0)
        x_up2mask = torch.cat([x_up0, f0], dim=1)  

        # 96 x 128 x 128 -> 48 x 256 x 256
        x_mask = self.upconv2mask(x_up2mask)  
        
        # 48 x 256 x 256 -> 1 x 256 x 256
        x_mask = self.final_conv(x_mask)  
        
        return x_mask

Logo

分享最新、最前沿的AI大模型技术,吸纳国内前几批AI大模型开发者

更多推荐