1、 Tensorflow队列

在这里插入图片描述
队列的API
在这里插入图片描述

1、同步操作

示例:完成一个出队列、+1、入队列操作
代码

# 模拟一下同步操作,先处理数据,然后才能取数据训练
import tensorflow as tf
# 1.首先定义队列
Q = tf.FIFOQueue(3, tf.float32)

# 放入一些数据
enq_many = Q.enqueue_many([[0.1, 0.2, 0.3], ])  # [,]用逗号隔开指定列表,防止与tensor混淆

# 2.定义一些处理数据, 取数据的过程   取数据, +1, 入队列
out_q = Q.dequeue()

data = out_q + 1

en_q = Q.enqueue(data)

with tf.Session() as sess:
    # 初始化队列
    sess.run(enq_many)

    # 处理数据
    for i in range(2):
        sess.run(en_q)

    # 训练数据
    for i in range(Q.size().eval()):   # Q.size()获取队列的大小,也是一个节点所以使用eval获取数值
        print(sess.run(Q.dequeue()))

结果:

C:\Users\FCX-PC\Envs\tensorflow\Scripts\python.exe F:/同步操作.py
0.3
1.1
1.2
Process finished with exit code 0

需要 注意 enq_many = Q.enqueue_many([[0.1, 0.2, 0.3], ]) # [,]用逗号隔开指定列表,防止与tensor混淆,参数不能直接写列表,否则会当成张量。

2、异步操作

分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个线程,实现异步读取。
在这里插入图片描述
示例:通过队列管理器来实现变量加1,入队,主线程出队列的操作,观察效果

代码:

# 模拟异步子线程 存入样本, 主线程 读取样本
import tensorflow as tf

# 1. 定义一个队列, 1000
Q = tf.FIFOQueue(1000, tf.float32)

# 2. 定义要做的事情 循环 值加1 放入队列
var = tf.Variable(0.0)

# 实现一个自增 tf.assign_add
data = tf.assign_add(var, 1)  # 这里不可以直接var = var+1

en_q = Q.enqueue(data)

# 3. 定义队列管理器op,指定多少个子线程, 子线程干什么事
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q]*2)

# 初始化变量的op
init_op = tf.global_variables_initializer()

with tf.Session() as sess:
    # 初始化变量
    sess.run(init_op)
    # 真正开启子线程
    thread = qr.create_threads(sess, start=True)

    # 主线程, 不断读取数据训练
    for i in range(10):
        print(sess.run(Q.dequeue()))

结果

C:\Users\FCX-PC\Envs\tensorflow\Scripts\python.exe F:/异步操作.py
7.0
16.0
31.0
42.0
57.0
69.0
78.0
88.0
106.0
121.0
E0730 19:42:52.070327  3548 queue_runner_impl.py:275] Exception in QueueRunner: Session has been closed.
E0730 19:42:52.071326  1004 queue_runner_impl.py:275] Exception in QueueRunner: Session has been closed.
Exception in thread QueueRunnerThread-fifo_queue-fifo_queue_enqueue:
Traceback (most recent call last):
  File "c:\users\fcx-pc\appdata\local\programs\python\python36\Lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "c:\users\fcx-pc\appdata\local\programs\python\python36\Lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\FCX-PC\Envs\tensorflow\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 257, in _run
    enqueue_callable()
  File "C:\Users\FCX-PC\Envs\tensorflow\lib\site-packages\tensorflow\python\client\session.py", line 1279, in _single_operation_run
    self._call_tf_sessionrun(None, {}, [], target_list, None)
  File "C:\Users\FCX-PC\Envs\tensorflow\lib\site-packages\tensorflow\python\client\session.py", line 1429, in _call_tf_sessionrun
    run_metadata)
tensorflow.python.framework.errors_impl.CancelledError: Session has been closed.

Exception in thread QueueRunnerThread-fifo_queue-fifo_queue_enqueue:
Traceback (most recent call last):
  File "c:\users\fcx-pc\appdata\local\programs\python\python36\Lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "c:\users\fcx-pc\appdata\local\programs\python\python36\Lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\FCX-PC\Envs\tensorflow\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 257, in _run
    enqueue_callable()
  File "C:\Users\FCX-PC\Envs\tensorflow\lib\site-packages\tensorflow\python\client\session.py", line 1279, in _single_operation_run
    self._call_tf_sessionrun(None, {}, [], target_list, None)
  File "C:\Users\FCX-PC\Envs\tensorflow\lib\site-packages\tensorflow\python\client\session.py", line 1429, in _call_tf_sessionrun
    run_metadata)
tensorflow.python.framework.errors_impl.CancelledError: Session has been closed.

Process finished with exit code 0

结果出来了,但是有个错误,这时候有一个问题就是,入队自顾自的去执行,在需要的出队操作完成之后,程序没法结束。需要一个实现线程间的同步,终止其他线程。
也就是,主线程完成了,然后会话结束,但是子线程没有结束,但当会话结束意味着就资源释放,没有资源分配给子线程进行存入数据进队列所以报错。

这就需要线程协调器

3. 线程协调器
在这里插入图片描述

示例代码:

# 模拟异步子线程 存入样本, 主线程 读取样本
import tensorflow as tf

# 1. 定义一个队列, 1000
Q = tf.FIFOQueue(1000, tf.float32)

# 2. 定义要做的事情 循环 值加1 放入队列
var = tf.Variable(0.0)

# 实现一个自增 tf.assign_add
data = tf.assign_add(var, 1)  # 这里不可以直接var = var+1

en_q = Q.enqueue(data)

# 3. 定义队列管理器op,指定多少个子线程, 子线程干什么事
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q]*2)

# 初始化变量的op
init_op = tf.global_variables_initializer()

with tf.Session() as sess:
    # 初始化变量
    sess.run(init_op)
    # 开启线程管理器
    coord = tf.train.Coordinator()

    # 真正开启子线程
    thread = qr.create_threads(sess, coord=coord, start=True)  # coord参数表示那个线程管理器管理这个子线程

    # 主线程, 不断读取数据训练
    for i in range(10):
        print(sess.run(Q.dequeue()))

    # 回收子线程
    coord.request_stop()  # 请求回收子线程
    coord.join(thread)

结果:

C:\Users\FCX-PC\Envs\tensorflow\Scripts\python.exe F:/异步操作.py
8.0
22.0
40.0
54.0
61.0
73.0
82.0
95.0
108.0
122.0

Process finished with exit code 0

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐