目录

 

                          Tensorflow线程队列与IO操作

1 线程和队列

1.1 前言

1.2 队列

1.3 队列管理器

1.4 线程协调器

2 文件读取

2.1 流程

2.2 文件读取API:

3 图像读取

3.1 图像读取基本知识

3.2 图像基本操作

3.3 图像读取API

3.4 图片批处理流程

3.5 读取图片案例

4 二进制文件读取

4.1 CIFAR-10 二进制数据读取

5 TFRecords分析存储

5.1 简介

5.2 TFRecords存储

5.3 TFRecords读取方法

5.4 案例


                          Tensorflow线程队列与IO操作

1 线程和队列

1.1 前言

IO操作进行大文件读取时,如果一次性进行读取时非常消耗内存的,那么一次性读取就需要一次性训练。这样是非常慢的。Tensorflow是计算型的,重点在计算,所以不能在读写上花太多时间,那么就提供了多线程,队列等机制。在Tensorflow中的多线程是真正的多线程,能并行的执行任务。

1.2 队列

(1)先进先出队列,按顺序出队列 :tf.FIFOQueue

FIFOQueue(capacity, dtypes, name='fifo_queue') 创建一个以先进先出的顺序对元素进行排队的队列

           capacity:整数。可能存储在此队列中的元素数量的上限

           dtypes:DType对象列表。长度dtypes必须等于每个队列元 素中的张量数,dtype的类型形状,决定了后面进队列元素形状

常用方法:

           dequeue(name=None) :取数据,出队列

           enqueue(vals, name=None): 放数据

           enqueue_many(vals, name=None):放数据,vals列表或者元组

返回一个进队列操作 size(name=None)

(2)随机出队列:tf.RandomShuffleQueue

import tensorflow as tf
# 模拟同步先处理数据,取数据训练

# 1、定义队列
Q = tf.FIFOQueue(10, tf.float32)

# 放入数据,参数如果是[0.1, 0.2, 0.3]会认为是一个张量
enq_many = Q.enqueue_many([[0.1, 0.2, 0.3], ])

# 2、定义处理数据的逻辑,取数据*2, 入队列
out= Q.dequeue()
data = out*2
enter = Q.enqueue(data)

with tf.Session() as sess:
    # 初始化队列
    sess.run(enq_many)

    # 处理数据
    for i in range(10):
        sess.run(enter)

    # 训练数据
    for i in range(Q.size().eval()):
        print(sess.run(Q.dequeue()))

注意:tensorflow当中,运行操作有依赖性,有操作之间计算的关系才能叫做依赖性

1.3 队列管理器

当数据量很大时,入队操作从硬盘中读取数据,放入内存中, 主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个 线程,实现异步读取。

tf.train.QueueRunner(queue, enqueue_ops=None) 创建一个QueueRunner

           queue:一个队列                             

           enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程。[]里面指定线程做什么操作

方法:create_threads(sess, coord=None,start=False)     创建线程来运行给定会话的入队操作,返回线程的实例

           coord:线程协调器,后面线程管理需要用到

           start:布尔值,如果True启动线程;如果为False调用者 必须调用start()启动线程

1.4 线程协调器

tf.train.Coordinator()     线程协调员,实现一个简单的机制来协调一 组线程的终止 。返回线程协调员实例

           request_stop() :请求停止线程

           should_stop(): 检查是否要求停止

           join(threads=None, stop_grace_period_secs=120)   等待线程终止,回收线程

import tensorflow as tf
# 模拟异步子线程存入样本, 主线程读取样本

# 1、定义队列
Q = tf.FIFOQueue(10, tf.float32)

# 2、定义处理逻辑 循环值+1, 放入队列当中
var = tf.Variable(0.0)

# 实现一个自增  tf.assign_add
data = tf.assign_add(var, tf.constant(1.0))

# 放数据
enter = Q.enqueue(data)

# 3、定义队列管理器op, 指定开启多少个子线程,子线程的任务
qr = tf.train.QueueRunner(Q, enqueue_ops=[enter] * 2)

# 初始化变量的OP
init_op = tf.global_variables_initializer()

with tf.Session() as sess:
    # 初始化变量
    sess.run(init_op)

    # 开启线程管理器
    coord = tf.train.Coordinator()

    #  真正开启子线程
    threads = qr.create_threads(sess, coord=coord, start=True)

    # 主线程,不断读取数据训练
    for i in range(100):
        print(sess.run(Q.dequeue()))

    # 回收
    coord.request_stop()
    coord.join(threads)

2 文件读取

2.1 流程

(1)构造一个文件队列

(2)构造文件阅读器,read读取队列内容,默认只读取一个样本

①csv文件,默认读取一行;②二进制文件,指定一个样本的bytes读取;③图片文件,按一张一张的读取;④threcords

(3)decode解码

(4)批处理

2.2 文件读取API:

(1)文件队列构造

tf.train.string_input_producer(string_tensor,shuffle=True) 将输出字符串(例如文件名)输入到管道队列

           string_tensor: 含有文件名的1阶张量

           num_epochs:过几遍数据,默认无限过数据

           return:具有输出字符串的队列

(2)文件阅读器

tf.TextLineReader 根据文件格式,选择对应的文件阅读器 

           阅读文本文件逗号分隔值(CSV)格式,默认按行读取

           return:读取器实例

tf.FixedLengthRecordReader(record_bytes)

           要读取每个记录是固定数量字节的二进制文件

           record_bytes:整型,指定每次读取的字节数

           return:读取器实例

tf.TFRecordReader 读取TfRecords文件

注意:有一个共同的读取方法: read(file_queue):从队列中指定数量内容 返回一个Tensors元组(key文件名字,value默认的内容(行,字节))

(3)文件内容解码器

由于从文件中读取的是字符串,需要函数去解析这些字符串到张量

tf.decode_csv(records,record_defaults=None,field_delim = None,name = None)             将CSV转换为张量,与tf.TextLineReader搭配使用

           records:tensor型字符串,每个字符串是csv中的记录行

           field_delim:默认分割符”,”

           record_defaults:参数决定了所得张量的类型,并设置一个值在输入字符串中缺少使用默认值,如 tf.decode_raw(bytes,out_type,little_endian = None,name = None)     将字节转换为一个数字向量表示,字节为一字符串类型的张量,与函数tf.FixedLengthRecordReader搭配使用,二进制读取为uint8格式

(4)开启线程操作

tf.train.start_queue_runners(sess=None,coord=None)     收集所有图中的队列线程,并启动线程

           sess:所在的会话中

           coord:线程协调器

           return:返回所有线程队列

(5)管道读端批处理

tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None) 读取指定大小(个数)的张量

           tensors:可以是包含张量的列表

           batch_size:从队列中读取的批处理大小

           num_threads:进入队列的线程数

           capacity:整数,队列中元素的最大数量

           return:tensors

tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue,    num_threads=1,)                    乱序读取指定大小(个数)的张量

           min_after_dequeue:留下队列里的张量个数,能够保持随机打乱

import tensorflow as tf
import os
def csvread(filelist):
    """
    读取CSV文件
    :param filelist: 文件路径+名字的列表
    :return: 读取的内容
    """
    # 1、构造文件的队列
    file_queue = tf.train.string_input_producer(filelist)

    # 2、构造csv阅读器读取队列数据(按一行)
    reader = tf.TextLineReader()
    key, value = reader.read(file_queue)

    # 3、对每行内容解码
    # record_defaults:指定每一个样本的每一列的类型,指定默认值[["None"], [4.0]]
    records = [["None"], ["None"]]
    example, label = tf.decode_csv(value, record_defaults=records,field_delim=" ")

    # 4、想要读取多个数据,就需要批处理
    example_batch, label_batch = tf.train.batch([example, label], batch_size=4, num_threads=1, capacity=4)

    print(example_batch, label_batch)
    return example_batch, label_batch

if __name__ == "__main__":
    # 1、找到文件,放入列表   路径+名字  ->列表当中
    file_name = os.listdir("./floder")

    filelist = [os.path.join("./floder", file) for file in file_name ]

    # 打印文件名
    example_batch, label_batch = csvread(filelist)

    # 开启会话运行结果
    with tf.Session() as sess:
        # 定义一个线程协调器
        coord = tf.train.Coordinator()

        # 开启读文件的线程
        threads = tf.train.start_queue_runners(sess, coord=coord)

        # 打印读取的内容
        print(sess.run([example_batch, label_batch]))

        # 回收子线程
        coord.request_stop()
        coord.join(threads)

3 图像读取

3.1 图像读取基本知识

机器学习算法输入是特征值+目标值。每个图片由像素组成的,读取的时候是读取像素值去识别。

在图像数字化表示当中,分为黑白和彩色两种。在数字化表示图片的时候,有三个因素。分别是图片的长、图片的宽、图片的颜色通道数。

①黑白图片:颜色通道数为1,个像素点只有一个值,称为灰度值[0-255];

②彩色图片:它有三个颜色通道,分别为RGB,通过三个数字表示一个像素位。TensorFlow支持JPG、PNG图像格式,RGB、RGBA颜色空间。图像用与图像尺寸相同(heightwidthchnanel)张量表示。图像所有像素存在磁盘文件,需要被加载到内存。

3.2 图像基本操作

操作:缩小图片大小,为了所有图片统一特征数(像素值一样)。

目的:①增加图片数据的统一性②所有图片转换成指定大小 ③缩小图片数据量,防止增加开销

图片存储计算的类型:存储uint8(节约空间) 矩阵计算float32(提高精度)

API:

tf.image.resize_images(images, size) 缩小图片

            images:4-D形状[batch, height, width, channels]或3-D形状的张 量[height, width, channels]的图片数据

            size:1-D int32张量:new_height, new_width,图像的新尺寸

            返回4-D格式或者3-D格式图片

3.3 图像读取API

图像读取器 :

①tf.WholeFileReader 将文件的全部内容作为值输出的读取器

            return:读取器实例

            read(file_queue):输出将是一个文件名(key)和该文件的内容 (值)

图像解码器 :

①tf.image.decode_jpeg(contents) 将JPEG编码的图像解码为uint8张量

            return:uint8张量,3-D形状[height, width, channels]

②tf.image.decode_png(contents) 将PNG编码的图像解码为uint8或uint16张量

            return:张量类型,3-D形状[height, width, channels]

3.4 图片批处理流程

(1)构造图片文件队列

(2)构造图片阅读器

(3)读取图片数据

(4)处理图片数据

3.5 读取图片案例

import tensorflow as tf
import os
def pictureRead(filelist):
    """
    读取狗图片并转换成张量
    :param filelist: 文件路径+ 名字的列表
    :return: 每张图片的张量
    """
    # 1、构造文件队列
    file_queue = tf.train.string_input_producer(filelist)

    # 2、构造阅读器去读取图片内容(默认读取一张图片)
    reader = tf.WholeFileReader()
    key, value = reader.read(file_queue)
    print(value)

    # 3、对读取的图片数据进行解码
    image = tf.image.decode_jpeg(value)
    print(image)

    # 5、处理图片的大小(统一大小)
    image_resize = tf.image.resize_images(image, [300, 300])
    print(image_resize)

    # 注意:一定要把样本的形状固定 [300, 300, 3],在批处理的时候要求所有数据形状必须定义
    image_resize.set_shape([300, 300, 3])
    print(image_resize)

    # 6、进行批处理
    image_batch = tf.train.batch([image_resize], batch_size=50, num_threads=2, capacity=50)

    print(image_batch)

    return image_batch

if __name__ == "__main__":
    # 1、找到文件,放入列表   路径+名字  ->列表当中
    file_name = os.listdir("./cat")

    filelist = [os.path.join("./cat", file) for file in file_name ]

    # 图片的张量
    image_batch = pictureRead(filelist)

    # 开启会话运行结果
    with tf.Session() as sess:
        # 定义一个线程协调器
        coord = tf.train.Coordinator()

        # 开启读文件的线程
        threads = tf.train.start_queue_runners(sess, coord=coord)

        # 打印读取的内容
        print(sess.run([image_batch]))

        # 回收子线程
        coord.request_stop()
        coord.join(threads)

 

4 二进制文件读取

4.1 CIFAR-10 二进制数据读取

网址:http://www.cs.toronto.edu/~kriz/cifar.html

由介绍可知每个样本的大小为:1(目标值)+3072(特征值)=3073字节

import tensorflow as tf
import os
def binaryRead(filelist):
    # 定义读取的图片的一些属性
    height,width,channel = 32,33,3
    # 二进制文件每张图片的字节
    label_bytes = 1
    image_bytes = height * width * channel
    bytes = label_bytes + image_bytes

    # 1、构造文件队列
    file_queue = tf.train.string_input_producer(filelist)

    # 2、构造二进制文件读取器,读取内容, 每个样本的字节数
    reader = tf.FixedLengthRecordReader(bytes)
    key, value = reader.read(file_queue)

    # 3、解码内容, 二进制文件内容的解码
    label_image = tf.decode_raw(value, tf.uint8)
    print(label_image)

    # 4、分割出图片和标签数据,切除特征值和目标值
    label = tf.cast(tf.slice(label_image, [0], [label_bytes]), tf.int32)
    image = tf.slice(label_image, [label_bytes], [image_bytes])

    # 5、对图片的特征数据进行形状的改变 [3072] --> [32, 32, 3]
    image_reshape = tf.reshape(image, [height, width, channel])
    print(label, image_reshape)

    # 6、批处理数据
    image_batch, label_batch = tf.train.batch([image_reshape, label], batch_size=20, num_threads=2, capacity=20)
    print(image_batch, label_batch)

    return image_batch, label_batch

if __name__ == "__main__":
    # 1、找到文件,放入列表   路径+名字  ->列表当中
    file_name = os.listdir("./data/cifar-10-batches-bin")

    filelist = [os.path.join("./data/cifar-10-batches-bin", file) for file in file_name if file[-3:]=="bin"]

    # 二进制的张量
    image_batch, label_batch= binaryRead(filelist)

    # 开启会话运行结果
    with tf.Session() as sess:
        # 定义一个线程协调器
        coord = tf.train.Coordinator()

        # 开启读文件的线程
        threads = tf.train.start_queue_runners(sess, coord=coord)

        # 打印读取的内容
        print(sess.run([image_batch, label_batch]))

        # 回收子线程
        coord.request_stop()
        coord.join(threads)

 

5 TFRecords分析存储

5.1 简介

FRecords是Tensorflow设计的一种内置文件格式,是一种二进制文件, 它能更好的利用内存,更方便复制和移动。从机器学习角度,一个样本是特征值和目标值组成,FRecords是为了将二进制数据和标签(训练的类别标签)数据存储在同一个文件中

文件格式:*.tfrecords                      写入文件内容:Example协议块(类字典的格式)

优点:特征值目标值共同存储,获取的时候只要指定键是什么值是什么就能获取到了。

5.2 TFRecords存储

(1)建立TFRecord存储器

tf.python_io.TFRecordWriter(path) 写入tfrecords文件

           path: TFRecords文件的路径

           return:写文件

方法method:

           write(record):向文件中写入一个字符串记录(就是example) 

           close():关闭文件写入器

注意:字符串为一个序列化的Example,使用Example.SerializeToString()

(2)构造每个样本的Example协议块

tf.train.Example(features=None)

           写入tfrecords文件

           features:tf.train.Features类型的特征实例

           return:example格式协议块

tf.train.Features(feature=None) 构建每个样本的信息键值对

           feature:字典数据,key为要保存的名字, value为tf.train.Feature实例

           return:Features类型

tf.train.Feature(**options)

           **options:例如:

            bytes_list=tf.train. BytesList(value=[Bytes])

           int64_list=tf.train. Int64List(value=[Value])

tf.train. Int64List(value=[Value])

tf.train. BytesList(value=[Bytes])

tf.train. FloatList(value=[value])

5.3 TFRecords读取方法

同文件阅读器流程,中间需要解析过程

解析TFRecords的example协议内存块:

tf.parse_single_example(serialized,features=None,name=None)

           解析一个单一的Example原型

           serialized:标量字符串Tensor,一个序列化的Example

           features:dict字典数据,键为读取的名字,值为FixedLenFeature

           return:一个键值对组成的字典,键为读取的名字

tf.FixedLenFeature(shape,dtype)

           shape:输入数据的形状,一般不指定,为空列表

           dtype:输入数据类型,与存储进文件的类型要一致 类型只能是float32,int64,string

5.4 案例

CIFAR-10批处理结果存入tfrecords流程

(1)构造存储器

(2)构造每一个样本的Example

(3)写入序列化的Example

读取tfrecords流程

(1)构造文件队列

(2)构造TFRecords阅读器

(3)解析Example

(4)转换格式,bytes解码

import tensorflow as tf
import os

# 定义数据等命令行参数
FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string("data_dir", "./data/cifar-10-batches-bin", "文件的目录")
tf.app.flags.DEFINE_string("data_tfrecords", "./tmp/dataTFR.tfrecords", "存进tfrecords的文件")

class TFRRead(object):
    """完成读取二进制文件, 写进tfrecords,读取tfrecords
    """
    def __init__(self, filelist):
        # 文件列表
        self.file_list = filelist
        # 定义读取的图片的一些属性
        self.height = 32
        self.width = 32
        self.channel = 3
        # 二进制文件每张图片的字节
        self.label_bytes = 1
        self.image_bytes = self.height * self.width * self.channel
        self.bytes = self.label_bytes + self.image_bytes

    def read_and_decode(self):
        # 1、构造文件队列
        file_queue = tf.train.string_input_producer(self.file_list)

        # 2、构造二进制文件读取器,读取内容, 每个样本的字节数
        reader = tf.FixedLengthRecordReader(self.bytes)
        key, value = reader.read(file_queue)

        # 3、解码内容, 二进制文件内容的解码
        label_image = tf.decode_raw(value, tf.uint8)

        # 4、分割出图片和标签数据,切除特征值和目标值
        label = tf.cast(tf.slice(label_image, [0], [self.label_bytes]), tf.int32)
        image = tf.slice(label_image, [self.label_bytes], [self.image_bytes])

        # 5、对图片的特征数据进行形状的改变 [3072] --> [32, 32, 3]
        image_reshape = tf.reshape(image, [self.height, self.width, self.channel])

        # 6、批处理数据
        image_batch, label_batch = tf.train.batch([image_reshape, label], batch_size=20, num_threads=1, capacity=20)

        return image_batch, label_batch

    def write_ro_tfrecords(self, image_batch, label_batch):
        """
        将图片的特征值和目标值存进tfrecords
        :param image_batch: 20张图片的特征值
        :param label_batch: 20张图片的目标值
        :return: None
        """
        # 1、建立TFRecord存储器
        writer = tf.python_io.TFRecordWriter(FLAGS.data_tfrecords)

        # 2、循环将所有样本写入文件,每张图片样本都要构造example协议
        for i in range(20):
            # 取出第i个图片数据的特征值和目标值,image_batch[i]是类型,调用eval()获取值,因为是个张量,需要调用.tostring()转换成字符串
            image = image_batch[i].eval().tostring()
            label = int(label_batch[i].eval()[0])

            # 构造一个样本的example
            example =  tf.train.Example(features=tf.train.Features(feature={
                "image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
                "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
            }))

            # 写入单独的样本,字符串要为一个序列化的Example
            writer.write(example.SerializeToString())

        # 关闭
        writer.close()
        return None

    def read_from_tfrecords(self):

        # 1、构造文件队列
        file_queue = tf.train.string_input_producer([FLAGS.data_tfrecords])

        # 2、构造文件阅读器,读取内容example,value=一个样本的序列化example
        reader = tf.TFRecordReader()
        key, value = reader.read(file_queue)

        # 3、解析example
        features = tf.parse_single_example(value, features={
            "image": tf.FixedLenFeature([], tf.string),
            "label": tf.FixedLenFeature([], tf.int64),
        })

        # 4、解码内容, 如果读取的内容格式是string需要解码, 如果是int64,float32不需要解码
        image = tf.decode_raw(features["image"], tf.uint8)

        # 固定图片的形状,方便与批处理
        image_reshape = tf.reshape(image, [self.height, self.width, self.channel])
        label = tf.cast(features["label"], tf.int32)
        print(image_reshape, label)

        # 进行批处理
        image_batch, label_batch = tf.train.batch([image_reshape, label], batch_size=20, num_threads=1, capacity=20)

        return image_batch, label_batch


if __name__ == "__main__":
    # 1、找到文件,放入列表   路径+名字  ->列表当中
    file_name = os.listdir(FLAGS.data_dir)

    filelist = [os.path.join(FLAGS.data_dir, file) for file in file_name if file[-3:] == "bin"]

    # print(file_name)
    cf = TFRRead(filelist)

    #image_batch, label_batch = cf.read_and_decode()
    image_batch, label_batch = cf.read_from_tfrecords()

    # 开启会话运行结果
    with tf.Session() as sess:
        # 定义一个线程协调器
        coord = tf.train.Coordinator()

        # 开启读文件的线程
        threads = tf.train.start_queue_runners(sess, coord=coord)

        #存进tfrecords文件
        # print("开始存储")
        # threads = cf.write_ro_tfrecords(image_batch, label_batch)
        # print("结束存储")

        # 打印读取的内容
        print(sess.run([image_batch, label_batch]))

        # 回收子线程
        coord.request_stop()
        coord.join(threads)

 

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐