主要参考:https://www.tensorflow.org/api_guides/python/threading_and_queues#Queue_usage_overview
自动方式
For most use cases, the automatic thread startup and management provided by tf.train.MonitoredSession
is sufficient. In the rare case that it is not, TensorFlow provides tools for manually managing your threads and queues.
与tf.read_file()、tf.image.decode_jpeg()、tfrecord API等函数配合,可以实现自动图片流并行读取
import tensorflow as tfdef simple_shuffle_batch(source, capacity, batch_size=10): # Create a random shuffle queue. queue = tf.RandomShuffleQueue(capacity=capacity, min_after_dequeue=int(0.9*capacity), shapes=source.shape, dtypes=source.dtype) # Create an op to enqueue one item. enqueue = queue.enqueue(source) # Create a queue runner that, when started, will launch 4 threads applying # that enqueue op. num_threads = 4 qr = tf.train.QueueRunner(queue, [enqueue] * num_threads) # Register the queue runner so it can be found and started by # tf.train.start_queue_runners
later (the threads are not launched yet). tf.train.add_queue_runner(qr) # Create an op to dequeue a batch return queue.dequeue_many(batch_size)# create a dataset that counts from 0 to 99input = tf.constant(list(range(100)))input = tf.data.Dataset.from_tensor_slices(input)input = input.make_one_shot_iterator().get_next()# Create a slightly shuffled batch from the sorted elementsget_batch = simple_shuffle_batch(input, capacity=20)# `MonitoredSession` will start and manage the `QueueRunner` threads.with tf.train.MonitoredSession() as sess: # Since the `QueueRunners` have been started, data is available in the # queue, so the `sess.run(get_batch)` call will not hang. while not sess.should_stop(): print(sess.run(get_batch))
手动方式
通过官方例程微调(以便能正常运行)得到,目前能运行,结果也正确,但是运行警告,尚未解决。
WARNING:tensorflow:From /home/work/Downloads/python_scripts/tensorflow_example/test_tf_queue_manual.py:52: QueueRunner.__init__ (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.
Instructions for updating:To construct input pipelines, use the `tf.data` module.import tensorflow as tf# Using Python's threading library.import threadingimport timebatch_size = 10thread_num = 3print("-" * 50)def MyLoop(coord, id): step = 0 while not coord.should_stop(): step += 1 print("thread id: %02d, step: %02d, ...do something..." %(id, step)) time.sleep(0.01) if step >= 5: coord.request_stop()# Main thread: create a coordinator.coord = tf.train.Coordinator()# Create thread_num threads that run 'MyLoop()'threads = [threading.Thread(target=MyLoop, args=(coord,i)) for i in range(thread_num)]# Start the threads and wait for all of them to stop.for t in threads: t.start()coord.join(threads)print("-" * 50)# create a dataset that counts from 0 to 99example = tf.constant(list(range(100)))example = tf.data.Dataset.from_tensor_slices(example)example = example.make_one_shot_iterator().get_next()# Create a queue, and an op that enqueues examples one at a time in the queue.queue = tf.RandomShuffleQueue(capacity=20, min_after_dequeue=int(0.9*20), shapes=example.shape, dtypes=example.dtype)enqueue_op = queue.enqueue(example)# Create a training graph that starts by dequeueing a batch of examples.inputs = queue.dequeue_many(batch_size)train_op = inputs # ...use 'inputs' to build the training part of the graph...# Create a queue runner that will run thread_num threads in parallel to enqueue examples.qr = tf.train.QueueRunner(queue, [enqueue_op] * thread_num)# Launch the graph.sess = tf.Session()# Create a coordinator, launch the queue runner threads.coord = tf.train.Coordinator()enqueue_threads = qr.create_threads(sess, coord=coord, start=True)# Run the training loop, controlling termination with the coordinator.try: for step in range(1000000): if coord.should_stop(): break y = sess.run(train_op) print(step, ", y =", y)except Exception as e: # Report exceptions to the coordinator. coord.request_stop(e)finally: # Terminate as usual. It is safe to call `coord.request_stop()` twice. coord.request_stop() coord.join(threads)