文章目录

前言

一、数据集介绍

二、处理过程

1.处理图片文本标签数据

(1)将label由消极、中立、积极的标签变成数字,0,1,2,并且将其保存在path2label字典中

 (2)分别将图像文件和文本文件形成列表

(3)分别将文本内容、对应的图像路径和label读取对应列表

(4)将所有文本内容写入all_data.txt 为了后续将所有字符编码

(5)将文本和label重新写入train.txt文件 生成数据标签对

2.读入数据

3.构建模型

4.训练并保存模型

总结


前言

此项目主要是小白入门教学,项目可以直接运行,需要请私信!!!有问题欢迎联系作者WX Qwe1398276934

多模态(图像文本)分类是深度学习多模态领域最基本的识别任务。本文采用二维卷积处理图像,用一维卷积处理文本,在全连接层进行相加,最后分类为3个分类。


本文实现三分类,积极、消极和中立。

训练图像收敛结果如下图所示

 

一、数据集介绍

数据集格式如下图

——data

其中一个图片和文本的具体情况如下图所示:

图片:

 

文本信息:

 label 如下图所示

二、处理过程

1.处理图片文本标签数据

(1)将label由消极、中立、积极的标签变成数字,0,1,2,并且将其保存在path2label字典中

path2label={}
with open('dataset/train.txt', 'r') as f:
    for line in f:

        #print(line.strip())
        path,label=line.split(',')
        if label=="negative\n":
            label=0
        elif label=="neutral\n":
            label=1
        else:
            label=2
        path2label[path]=label

 (2)分别将图像文件和文本文件形成列表

import os
path="./dataset/data"
txt_files = [f+".txt" for f in path2label.keys()]
img_files=[f+".jpg" for f in path2label.keys()]

(3)分别将文本内容、对应的图像路径和label读取对应列表

contents=[]   #存储对应的文本
images=[]     #存储相应顺序的图片
labels=[]
for i in txt_files:
    try:
        with open(path+"/"+i, 'r') as f:
            
            content=f.read()
            contents.append(content)
            images.append(path+"/"+i[:-4]+".jpg")
            labels.append(path2label[i[:-4]])
            # print(content)
            can+=1
    except:
      
        print("无法打开")

(4)将所有文本内容写入all_data.txt 为了后续将所有字符编码

目录

all_data_path="all_data.txt"
with open(all_data_path, 'a') as f:
    for data in contents:
        f.write(data) 

# 生成数据字典
def create_dict(data_path, dict_path):
    with open(dict_path, 'w') as f:
        f.seek(0)
        f.truncate() 

    dict_set = set()
    # 读取全部数据
    with open(data_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    # 把数据生成一个元组
    for line in lines:
        content = line.split('\t')[-1].replace('\n', '')
        for s in content:
            dict_set.add(s)
    # 把元组转换成字典,一个字对应一个数字
    dict_list = []
    i = 0
    for s in dict_set:
        dict_list.append([s, i])
        i += 1
    # 添加未知字符
    dict_txt = dict(dict_list)
    end_dict = {"<unk>": i}
    dict_txt.update(end_dict)
    end_dict = {"<pad>": i+1}
    dict_txt.update(end_dict)
    # 把这些字典保存到本地中
    with open(dict_path, 'w', encoding='utf-8') as f:
        f.write(str(dict_txt))
        
    print("数据字典生成完成!")

all_data_path="all_data.txt"
dict_path="dict.txt"
create_dict(all_data_path,dict_path)

(5)将文本和label重新写入train.txt文件 生成数据标签对

# 创建序列化表示的数据
def load_vocab(file_path):
    fr = open(file_path, 'r', encoding='utf8')
    vocab = eval(fr.read())   #读取的str转换为字典
    fr.close()
    return vocab

def f_write_txt(words, dict_txt, label):
    labs = ""
    
    for s in words:
        lab = str(dict_txt[s])
        labs = labs + lab + ','
    labs = labs[:-1]
    labs = labs + '\t\t\t\t\t' + label + '\n'
    return labs

def create_data_list(data_path, train_path,dict_path):
    
    dict_txt = load_vocab(dict_path)
    #print(dict_txt)
    with open(data_path, 'r', encoding='utf-8') as f_data:
        lines = f_data.readlines()
        print(len(lines))
        #print(lines)

    i = 0
    maxlen = 0
    with open(train_path, 'a', encoding='utf-8') as f_train:
        for line in contents:
            
            words = line.split('\t')[-1].replace('\n', '')
            #print(words)
            maxlen = max(maxlen, len(words))
            #label = line.split('\t')[0]
            label=str(labels[i])
            labs = f_write_txt(words, dict_txt, label)
            # 每8个 抽取一个数据用于验证
            # if i % 8 == 0:
            #     f_eval.write(labs)
            # else:
            f_train.write(labs)
            i += 1
    print("数据列表生成完成!")
    print(maxlen)

train_path="train_data.txt"
create_data_list(all_data_path,train_path,dict_path)

2.读入数据

重载数据集

import paddle
from paddle.nn import Conv2D, Linear, Embedding
from paddle import to_tensor
import paddle.nn.functional as F
import os, zipfile
import io, random, json
import numpy as np
import matplotlib.pyplot as plt
print(paddle.__version__)
from PIL import Image
vocab=load_vocab("dict.txt")
import sys


# 程序3
# 这是一个丹炉
from paddle.vision import transforms #as T

# # 定义训练集增强算子
# train_transforms = T.Compose([
#     T.RandomCrop(crop_size=224),
#     T.RandomHorizontalFlip(),
#     T.Normalize()])


class RumorDataset(paddle.io.Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.all_data = []
        self.images=images
       
        with io.open(self.data_dir, "r", encoding='utf8') as fin:
            for line in fin:
                cols = line.strip().split("\t\t\t\t\t")
                if len(cols) != 2:
                    sys.stderr.write("[NOTICE] Error Format Line!")
                    continue
                label = []
                label.append(int(cols[1]))
                wids = cols[0].split(",")
                if len(wids)>=150:
                    wids = np.array(wids[:150]).astype('int64')     
                else:
                    wids = np.concatenate([wids, [vocab["<pad>"]]*(150-len(wids))]).astype('int64')
                label = np.array(label).astype('int64')
                self.all_data.append((wids, label))

        
    def __getitem__(self, index):
        data, label = self.all_data[index]
        image=self.images[index]
        #print(len(self.images),len(self.all_data))
        #root="./dataset/data"
        #path=os.path.join(root,image)
        img = Image.open(image)

        # Resize the image to 512*412
        img = img.resize((224, 224))
        img = transforms.ToTensor()(img)
        return img,data, label

    def __len__(self):
        return len(self.all_data)


batch_size = 32
train_dataset = RumorDataset(train_path)
test_dataset = RumorDataset(train_path)

train_loader = paddle.io.DataLoader(train_dataset, return_list=True,
                                     batch_size=batch_size, drop_last=True)
test_loader = paddle.io.DataLoader(train_dataset, return_list=True,
                                     batch_size=batch_size, drop_last=True)




#check

print('=============train_dataset =============') 
for image,data, label in train_dataset:
    print(data)
    print(np.array(data).shape)
    print(label)
    break


print('=============test_dataset =============') 
for image,data, label in test_dataset:
    print(data)
    print(np.array(data).shape)
    print(label)
    break


3.构建模型

此处构建模型,处理图像的模型为resnet18,处理文本的简单的一维cnn卷积

import paddle
import paddle.nn as nn
from paddle.nn import Conv2D, MaxPool2D, AdaptiveAvgPool2D, Linear, ReLU, BatchNorm2D
import paddle.nn.functional as F

class Basicblock(paddle.nn.Layer):
    def __init__(self, in_channel, out_channel, stride = 1):
        super(Basicblock, self).__init__()
        self.stride = stride
        self.conv0 = Conv2D(in_channel, out_channel, 3, stride = stride, padding = 1)
        self.conv1 = Conv2D(out_channel, out_channel, 3, stride=1, padding = 1)
        self.conv2 = Conv2D(in_channel, out_channel, 1, stride = stride)
        self.bn0 = BatchNorm2D(out_channel)
        self.bn1 = BatchNorm2D(out_channel)
        self.bn2 = BatchNorm2D(out_channel)

    def forward(self, inputs):
        y = inputs
        x = self.conv0(inputs)
        x = self.bn0(x)
        x = F.relu(x)
        x = self.conv1(x)
        x = self.bn1(x)
        if self.stride == 2:
            y = self.conv2(y)
            y = self.bn2(y)
        z = F.relu(x+y)
        return z

class Bottleneckblock(paddle.nn.Layer):
    def __init__(self, inplane, in_channel, out_channel, stride = 1, start = False):
        super(Bottleneckblock, self).__init__()
        self.stride = stride
        self.start = start
        self.conv0 = Conv2D(in_channel, inplane, 1, stride = stride)
        self.conv1 = Conv2D(inplane, inplane, 3, stride=1, padding=1)
        self.conv2 = Conv2D(inplane, out_channel, 1, stride=1)
        self.conv3 = Conv2D(in_channel, out_channel, 1, stride = stride)
        self.bn0 = BatchNorm2D(inplane)
        self.bn1 = BatchNorm2D(inplane)
        self.bn2 = BatchNorm2D(out_channel)
        self.bn3 = BatchNorm2D(out_channel)

    def forward(self, inputs):
        y = inputs
        x = self.conv0(inputs)
        x = self.bn0(x)
        x = F.relu(x)
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.start:
            y = self.conv3(y)
            y = self.bn3(y)
        z = F.relu(x+y)
        return z

# class Resnet(paddle.nn.Layer):
#     def __init__(self, ):
#         super(Resnet, self).__init__()
# def resnet18():
#     return Resnet()

#定义卷积网络
class CNN(paddle.nn.Layer):
    def __init__(self,num, bottlenet):
        super(CNN,self).__init__()
        self.dict_dim = vocab["<pad>"]
        self.emb_dim = 128
        self.hid_dim = 128
        self.fc_hid_dim = 96
        self.class_dim = 2
        self.channels = 1
        self.win_size = [3, self.hid_dim]
        self.batch_size = 32
        self.seq_len = 150
        self.embedding = Embedding(self.dict_dim + 1, self.emb_dim, sparse=False)
        self.hidden1 = paddle.nn.Conv2D(in_channels=1,                        #通道数
                                            out_channels=self.hid_dim,        #卷积核个数
                                            kernel_size=self.win_size,        #卷积核大小
                                            padding=[1, 1]
                                            )                         
        self.relu1 = paddle.nn.ReLU()
        self.hidden3 = paddle.nn.MaxPool2D(kernel_size=2,         #池化核大小
                                            stride=2)             #池化步长2
        self.hidden4 = paddle.nn.Linear(128*75, 512)




        self.conv0 = Conv2D(3, 64, 7, stride=2)
        self.bn = BatchNorm2D(64)
        self.pool1 = MaxPool2D(3, stride=2)
        if bottlenet:
            self.layer0 = self.add_bottleneck_layer(num[0], 64, start = True)
            self.layer1 = self.add_bottleneck_layer(num[1], 128)
            self.layer2 = self.add_bottleneck_layer(num[2], 256)
            self.layer3 = self.add_bottleneck_layer(num[3], 512)
        else:
            self.layer0 = self.add_basic_layer(num[0], 64, start = True)
            self.layer1 = self.add_basic_layer(num[1], 128)
            self.layer2 = self.add_basic_layer(num[2], 256)
            self.layer3 = self.add_basic_layer(num[3], 512)
        self.pool2 = AdaptiveAvgPool2D(output_size = (1, 1))
        self.hidden5 = paddle.nn.Linear(512, 3)

    def add_bottleneck_layer(self, num, inplane, start = False):
        layer = []
        if start:
            layer.append(Bottleneckblock(inplane, inplane, inplane*4, start = True))
        else:
            layer.append(Bottleneckblock(inplane, inplane*2, inplane*4, stride = 2, start = True))
        for i in range(num-1):
            layer.append(Bottleneckblock(inplane, inplane*4, inplane*4))
        return nn.Sequential(*layer)

    #def forward(self, inputs):
       

    def add_basic_layer(self, num, inplane, start = False):
        layer = []
        if start:
            layer.append(Basicblock(inplane, inplane))
        else:
            layer.append(Basicblock(inplane//2, inplane, stride = 2))
        for i in range(num-1):
            layer.append(Basicblock(inplane, inplane))
        return nn.Sequential(*layer)

    #网络的前向计算过程
    def forward(self,inputs,input):
        
        #print('输入维度:', input.shape)
        x = self.embedding(input)
        x = paddle.reshape(x, [32, 1, 150, 128])   
        x = self.hidden1(x)
        x = self.relu1(x)
        #print('第一层卷积输出维度:', x.shape)
        x = self.hidden3(x)
        #print('池化后输出维度:', x.shape)
        #在输入全连接层时,需将特征图拉平会自动将数据拉平.

        x = paddle.reshape(x, shape=[self.batch_size, -1])
        out1 = self.hidden4(x)

        x = self.conv0(inputs)
        x = self.bn(x)
        x = self.pool1(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)        
        x = self.pool2(x)
        x = paddle.squeeze(x)
        out=out1+x
        out=self.hidden5(out)

        return out

4.训练并保存模型

def draw_process(title,color,iters,data,label):
    plt.title(title, fontsize=24)
    plt.xlabel("iter", fontsize=20)
    plt.ylabel(label, fontsize=20)
    plt.plot(iters, data,color=color,label=label) 
    plt.legend()
    plt.grid()
    plt.show()

def train(model):
    model.train()
    opt = paddle.optimizer.Adam(learning_rate=0.0002, parameters=model.parameters())
    steps = 0
    Iters, total_loss, total_acc = [], [], []
    for epoch in range(50):
        for batch_id, data in enumerate(train_loader):
            steps += 1
            image=data[0]
            sent = data[1]
            label = data[2]
            logits = model(image,sent)
            loss = paddle.nn.functional.cross_entropy(logits, label)
            acc = paddle.metric.accuracy(logits, label)
            if batch_id % 50 == 0:
                Iters.append(steps)
                total_loss.append(loss.numpy()[0])
                total_acc.append(acc.numpy()[0])
                print("epoch: {}, batch_id: {}, loss is: {}".format(epoch, batch_id, loss.numpy()))
            
            loss.backward()
            opt.step()
            opt.clear_grad()

        # evaluate model after one epoch
        model.eval()
        accuracies = []
        losses = []
        for batch_id, data in enumerate(test_loader):
            # sent = data[0]
            # #print(type(sent))
            # label = data[1]
            # logits = model(sent)
            image=data[0]
            sent = data[1]
            label = data[2]
            logits = model(image,sent)
            loss = paddle.nn.functional.cross_entropy(logits, label)
            acc = paddle.metric.accuracy(logits, label)
            accuracies.append(acc.numpy())
            losses.append(loss.numpy())
        
        avg_acc, avg_loss = np.mean(accuracies), np.mean(losses)
        print("[validation] accuracy: {}, loss: {}".format(avg_acc, avg_loss))
        model.train()

    paddle.save(model.state_dict(),"model_final.pdparams")
    draw_process("trainning loss","red",Iters,total_loss,"trainning loss")
    draw_process("trainning acc","green",Iters,total_acc,"trainning acc")
        
model=CNN([2,2,2,2], bottlenet = False)
train(model)

总结

本文实现了一个图像文本跨模态处理,采用resnnet18和cnn一维卷积完成分类任务,此项目作为小白跨模态入门项目。如有不足,请私信作者,欢迎补充和提问

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐