Data

这个任务是一个多分类问题:
从语音进行帧级音素预测。

数据分析

一个语音结果处理之后会得到一个特征向量,这个向量的是Txd, T是frame的长度,d是嵌入维度,使用不同的特征提取方法,得到的特征也是不同的,如果是采用MFCC,d=39,所以得到的特征向量就是Tx39
在这里插入图片描述
通常一个frame在结合前后的几个frame之后的预测效果会更好,所以预测一个frame的时候,将前后5个frame加进来,所以一共11个frame,那么输入的vector就变成了(11x39)
在这里插入图片描述
如下图,中间的一个frame就是我们需要预测的label
在这里插入图片描述

数据格式

给出了三个文件,每个文件都是.npy的格式,train_11.npy就是训练数据集,是11x39,因为每个frame是39,给出了前后五个,一共11段,所以特征向量展平之后就是一个11*39=429的特征向量。train_label_11.npy是标签,一共有39种标签,test_11.npy是测试数据集,是11x39,同训练集一个道理。
在这里插入图片描述
所以给出的数据都输处理好的,我们只需要根据给定的训练集的输入维度429去做分类任务即可,输入是429,输出是39(类别个数)。

定义DataSet

  • 将数据读入内存
  • 从训练集划分出验证集
class TIMITDataset(Dataset):
    def __init__(self, path, mode='train', val_traio=0.2):
        self.mode = mode

        test = np.load(data_root + path)
        train = np.load(data_root + path)
        target = np.load(data_root + "train_label_11.npy")
        target = target.astype(np.int64)

        if mode == 'test':
            self.data = torch.from_numpy(test).float()
            #self.data = test
        else:
            percent = int(train.shape[0] * (1-val_traio))
            if mode == 'train':
                self.data = torch.from_numpy(train[:percent]).float()
                self.target = torch.LongTensor(target[:percent])

                # self.data = train[:percent]
                # self.target = target[:percent]
            else:
                self.data = torch.from_numpy(train[percent:]).float()
                self.target = torch.LongTensor(target[percent:])
                # self.data = train[percent:]
                # self.target = target[percent:]

        self.dim = self.data.shape[1]

        print('Finished reading the {} set of TIMIT Dataset ({} samples found, each dim = {})'
              .format(mode, len(self.data), self.dim))

    def __getitem__(self, idx):
        if self.mode == 'test':
            return self.data[idx]
        else:
            return self.data[idx], self.target[idx]

    def __len__(self):
        return len(self.data)

定义DataLoader

# 定义dataLoader
def dataloader(path, mode, val_traio, batch_size):
    dataset = TIMITDataset(path, mode, val_traio)
    dataLoader = DataLoader(dataset, batch_size, shuffle=(mode=='train'))
    return dataLoader

定义model

# 定义model
class Classifier(nn.Module):
    def __init__(self, dim):
        super(Classifier, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(dim, 1024),
            nn.Sigmoid(),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, 512),
            nn.Sigmoid(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 128),
            nn.Sigmoid(),
            nn.BatchNorm1d(128),
            nn.Linear(128, 39)
        )
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        return self.layers(x)

    def cal_loss(self, target, y):
        return self.criterion(target, y)

定义训练时需要的一些参数

# 定义训练时需要的一些参数
device = get_device()
os.makedirs('models', exist_ok=True) # 保存训练模型

config = {
    'n_epochs': 3000,
    'batch_size' : 120,
    'optimizer': 'Adam',
    'opt_params':{
        'lr' : 0.0001, # 学习率
    },
    'save_path': 'model.pth',
    'early_stop': 200,
    'val_traio': 0.2
}

定义验证函数

# 定义验证函数
def dev(d_set, model, device):
    model.eval()
    total_loss = 0
    for x, y in d_set:
        x, y = x.to(device), y.to(device)
        with torch.no_grad():
            pred = model(x)
            loss = model.cal_loss(pred, y)
        total_loss += loss.detach().cpu().item() * len(x)
    return total_loss

定义训练函数

# 定义训练
def train(tr_set, d_set, model,config, device):
    record_loss = {'train': [], 'dev': []}
    n_epochs = config['n_epochs']
    optimizer = getattr(torch.optim, config['optimizer'])(model.parameters(), **config['opt_params'])
    early_cnt = 0
    min_loss = 20000
    epoch = 0
    while epoch < n_epochs:
        model.train()
        for x, y in tr_set:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            pred = model(x)
            t_loss = model.cal_loss(pred, y)
            t_loss.backward()
            optimizer.step()
            record_loss['train'].append(t_loss.detach().cpu().item())
        print('dev_loss: {}'.format(dev_loss))
        dev_loss = dev(d_set, model, device)
        if dev_loss < min_loss:
            min_loss = dev_loss
            np.save(model.state_dict(), config['save_file'])
            print('Saving model (epoch = {:4d}, loss = {:.4f})'
                  .format(epoch + 1, min_loss))
            early_cnt = 0
        else:
            early_cnt += 1

        epoch += 1
        record_loss['dev'].append(dev_loss)
        if early_cnt > config['early_stop']:
            break
    return min_loss, record_loss

加载数据集和初始化model

# 加载数据集和初始化model
tr_set = dataloader(tr_path, 'train', config['val_traio'], config['batch_size'])
tt_set = dataloader(tt_path, 'test', config['val_traio'], config['batch_size'])
de_set = dataloader(tr_path, 'dev', config['val_traio'], config['batch_size'])

model = Classifier(tr_set.dataset.dim).to(device)

开始train

# 开始训练
min_loss, record_loss = train(tr_set, de_set, model, config, device)

开始预测并保存模型

def save_preds(preds, file):
    print('Saving results to {}'.format(file))
    with open(file, 'w') as fp:
        writer = csv.writer(fp)
        writer.writerow(['id', 'class'])
        for i, p in enumerate(preds):
            writer.writerow([i, p])


preds = test(tt_set, model, device)
save_preds(preds, 'preds.csv')

参考文献
李宏毅老师源码:gitee

更多推荐