2016-11-07 1 views
1

기능이 포함 된 여러 CSV 파일이 있습니다. 하나의 기능은 이미지의 파일 이름입니다. csv 파일을 줄 단위로 읽으려면 해당 이미지의 경로를 새 큐로 푸시합니다. 두 대기열은 모두 병렬로 처리해야합니다. (예를 들어, read_csv) 헬퍼 함수의텐서 흐름의 자동 엔큐와 비동기 io의 대기열 합계

flow

csv_queue = tf.FIFOQueue(10, tf.string) 
csv_init = csv_queue.enqueue_many(['sample1.csv','sample2.csv','sample3.csv']) 

path, label = read_label(csv_queue) 

image_queue = tf.FIFOQueue(100, tf.string) 
image_init = image_queue.enqueue(path) 
_, image = read_image(image_queue) 


with tf.Session() as sess: 
    csv_init.run() 
    image_init.run() 

    print(sess.run([key, label, path])) # works 
    print(sess.run(image)) # works 

    print(sess.run([key, label, path])) # works 
    print(sess.run(image)) # will deadlock unlike I do iq_init.run() 

구현

here을 찾을 수 있습니다 할 수 있습니까 "숨기기"일괄 처리 교착 상태를 피할 수 있도록 sess.run(image) 뒤에 iq_init.run() 호출? image_queue 비어 있기 때문에

+0

두 개의 대기열 (하나는 csv 파일 읽기, 다른 하나는 이미지 읽기)을 사용해야합니까? 귀하의 경우 모든 csv 파일을 하나로 묶는 것이 어렵습니까? – Seven

+0

@Seven 네,이 특별한 경우는 약간 "인공적"일지 모르지만 한 큐에서 다른 큐로 비동기 개체를 푸시하려는 다른 상황에서 나타날 수 있습니다. – bodokaiser

답변

1
filenames = ['./cs_disp_train.txt', './cs_limg_train.txt'] 
txt_queue = tf.train.string_input_producer(filenames) 
# txt_queue = tf.FIFOQueue(10, tf.string) 
# init_txt_queue = txt_queue.enqueue_many(filenames) 

enqueue_ops = [] 
image_queues = tf.FIFOQueue(100, tf.string) 

num_reader = len(filenames) 
for i in range(num_reader): 
    reader = tf.TextLineReader() 
    _, buffer = reader.read(txt_queue) 
    enqueue_ops.append(image_queues.enqueue(buffer)) 

tf.train.queue_runner.add_queue_runner(
    tf.train.queue_runner.QueueRunner(image_queues, enqueue_ops)) 

y = image_queues.dequeue() 

sess = tf.Session() 
coord = tf.train.Coordinator() 
threads = tf.train.start_queue_runners(sess=sess, coord=coord) 

# sess.run(init_txt_queue) 

print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 
print sess.run([y]) 

coord.request_stop() 
coord.join(threads) 

예를 들어, 내가 하나가 깊이 이미지를위한 두 개의 파일, 'cs_disp_train.txt'과 'cs_limg_train.txt',이 주소 '파일의 주소를, 다른 하나는 컬러 이미지에 대응을위한'입니다. 이 코드는 두 개의 FIFOQueue를 작성합니다. 하나는이 두 파일을 읽고 다른 하나는 모든 파일 이름을 읽습니다.

나는 tf.train.QueueRunner도 사용하고 있습니다. 그러나 나는 그것을 이해하지 못했다. 나는 TFRecords 파일을 읽었지만 here에서 영감을 얻었다. 희망이 당신을 도울 수 있습니다.

+0

죄송합니다. 코드를 약간 편집했습니다. FIFOQueue는 비어있을 것이므로 첫 번째 큐에는 tf.train.string_input_producer를 사용합니다. tf.train.string_input_producer는 대기열을 반환하고 차단하지 않습니다. – Seven

0

교착 상태 sess.run() 내지 제 통화 발생, 뭔가가 큐에 추가 될 때까지 (그리고 keybuffer 생산)을 reader.read() 동작은 차단된다. TensorFlow에서 이것은 일반적으로 tf.train.QueueRunner을 생성하여 이루어집니다.이 요소는 백그라운드 스레드에서 실행될 수있는 작업 집합을 정의하여 요소를 큐로 계속 이동시킵니다. 자세한 내용은 TensorFlow의 tutorial on threading and queues을 참조하십시오.

+0

코드 수정은 어떻게 생겼습니까? 다른 질문은? – bodokaiser

관련 문제