Jak * właściwie * czytać dane CSV w TensorFlow?

84

Jestem stosunkowo nowy w świecie TensorFlow i jestem dość zdumiony tym, jak faktycznie wczytujesz dane CSV do przydatnych tensorów przykładowych / etykiet w TensorFlow. Przykład z samouczka TensorFlow dotyczący odczytywania danych CSV jest dość pofragmentowany i zapewnia tylko część drogi do trenowania na danych CSV.

Oto mój kod, który utworzyłem razem na podstawie tego samouczka CSV:

from __future__ import print_function
import tensorflow as tf

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

filename = "csv_test_data.csv"

# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)

# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)

# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])

print("loading, " + str(file_length) + " line(s)\n")
with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, col5])
    print(example, label)

  coord.request_stop()
  coord.join(threads)
  print("\ndone loading")

A oto krótki przykład z pliku CSV, który ładuję - całkiem podstawowe dane - 4 kolumny funkcji i 1 kolumna etykiet:

0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0

Wszystko, co robi powyższy kod, to wypisuje każdy przykład z pliku CSV, jeden po drugim , który, choć ładny, jest cholernie bezużyteczny do treningu.

Zmagam się z tym, jak faktycznie zamienić te poszczególne przykłady, ładowane jeden po drugim, w zestaw danych treningowych. Na przykład, oto notatnik , nad którym pracowałem w ramach kursu Udacity Deep Learning. Zasadniczo chcę pobrać dane CSV, które ładuję, i umieścić je w czymś takim jak train_dataset i train_labels :

def reformat(dataset, labels):
  dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
  # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
  labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
  return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)

Próbowałem użyć tf.train.shuffle_batch, w ten sposób, ale w niewytłumaczalny sposób się zawiesza:

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, colRelevant])
    example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
    print(example, label)

Podsumowując, oto moje pytania:

  • Czego mi brakuje w tym procesie?
    • Wydaje mi się, że brakuje mi pewnej kluczowej intuicji dotyczącej tego, jak prawidłowo zbudować potok wejściowy.
  • Czy istnieje sposób, aby uniknąć znajomości długości pliku CSV?
    • Znajomość liczby wierszy, które chcesz przetworzyć, wydaje się dość nieelegancka ( for i in range(file_length)wiersz kodu powyżej)

Edycja: Jak tylko Jarosław wskazał, że prawdopodobnie mieszam tutaj elementy imperatywne i konstrukcyjne wykresu, zaczęło to robić się jaśniejsze. Udało mi się zebrać następujący kod, który, jak sądzę, jest bliższy temu, co zwykle robi się podczas uczenia modelu z CSV (z wyłączeniem kodu szkolenia modelu):

from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

def read_from_csv(filename_queue):
  reader = tf.TextLineReader(skip_header_lines=1)
  _, csv_row = reader.read(filename_queue)
  record_defaults = [[0],[0],[0],[0],[0]]
  colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
  features = tf.stack([colHour,colQuarter,colAction,colUser])  
  label = tf.stack([colLabel])  
  return features, label

def input_pipeline(batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)  
  example, label = read_from_csv(filename_queue)
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)

with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  try:
    while not coord.should_stop():
      example_batch, label_batch = sess.run([examples, labels])
      print(example_batch)
  except tf.errors.OutOfRangeError:
    print('Done training, epoch reached')
  finally:
    coord.request_stop()

  coord.join(threads) 
Obrabować
źródło
Wypróbowywałem Twój kod, ale nie mogę go uruchomić. Czy jest coś, czego brakuje mi, co zdecydowałeś? Dzięki. Opublikowałem tutaj wątek, abyś mógł uzyskać więcej informacji: stackoverflow.com/questions/40143019/…
Link

Odpowiedzi:

24

Myślę, że mieszasz tutaj elementy imperatywne i konstrukcyjne wykresu. Operacja tf.train.shuffle_batchtworzy nowy węzeł kolejki, a pojedynczy węzeł może być używany do przetwarzania całego zestawu danych. Myślę więc, że zawieszasz się, ponieważ utworzyłeś kilka shuffle_batchkolejek w swojej pętli for i nie uruchomiłeś dla nich kolejki biegaczy.

Normalne użycie potoku wejściowego wygląda następująco:

  1. Dodaj węzły, takie jak shuffle_batchdo potoku wejściowego
  2. (opcjonalnie, aby zapobiec niezamierzonej modyfikacji wykresu) sfinalizuj wykres

--- koniec budowy wykresu, początek programowania imperatywnego -

  1. tf.start_queue_runners
  2. while(True): session.run()

Aby być bardziej skalowalnym (aby uniknąć Python GIL), możesz generować wszystkie dane za pomocą potoku TensorFlow. Jeśli jednak wydajność nie jest krytyczna, możesz podłączyć tablicę numpy do potoku wejściowego, używając slice_input_producer.Oto przykład z niektórymi Printwęzłami, aby zobaczyć, co się dzieje (komunikaty Printprzechodzą do stdout, gdy węzeł jest uruchomiony)

tf.reset_default_graph()

num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data

(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()

try:
  while True:
    print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
  print "No more inputs."

Powinieneś zobaczyć coś takiego

[[0 1]
 [2 3]
 [4 5]
 [6 7]
 [8 9]]
[[0 1]
 [2 3]]
[[4 5]
 [6 7]]
No more inputs.

Liczby „8, 9” nie wypełniły całej partii, więc nie zostały wyprodukowane. tf.PrintSą również drukowane do sys.stdout, więc pojawiają się osobno w Terminalu.

PS: minimalne połączenie batchz ręcznie zainicjowaną kolejką znajduje się w numerze 2193 github

Ponadto, dla celów debugowania, możesz chcieć ustawić timeoutsesję tak, aby twój notatnik IPython nie zawieszał się na pustych kolejkach kolejki. Używam tej funkcji pomocnika podczas moich sesji

def create_session():
  config = tf.ConfigProto(log_device_placement=True)
  config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
  config.operation_timeout_in_ms=60000   # terminate on long hangs
  # create interactive session to register a default session
  sess = tf.InteractiveSession("", config=config)
  return sess

Uwagi dotyczące skalowalności:

  1. tf.constantwstawia kopię danych do wykresu. Istnieje podstawowy limit 2 GB rozmiaru definicji wykresu, więc jest to górny limit rozmiaru danych
  2. Możesz obejść ten limit, używając v=tf.Variablei zapisując tam dane, uruchamiając v.assign_opz tf.placeholderprawej strony i przekazując tablicę numpy do symbolu zastępczego ( feed_dict)
  3. To nadal tworzy dwie kopie danych, więc aby zaoszczędzić pamięć, możesz stworzyć własną wersję, slice_input_producerktóra działa na tablicach numpy, i przesyłać wiersze pojedynczo za pomocąfeed_dict
Jarosław Bulatow
źródło
2
Ach, tak! Masz całkowitą rację - gdy tylko powiedziałeś: „Myślę, że mylisz tutaj elementy imperatywne i konstrukcyjne wykresu”, zacząłem widzieć, gdzie idę źle. Opublikowałem edycję do mojego pytania, która zawiera najnowszy kod, który stworzyłem, co w rzeczywistości przybliża mnie do tego, czego chcę - jestem w stanie z powodzeniem odczytać dane CSV i przesyłać je w taki sposób, że mogłem wytrenować Model.
Rob
2
Proponuję aktualizację tę odpowiedź tak to działa z najnowszymi wersjami TensorFlow zamienić tf.slice_input_producer()z tf.train.slice_input_producer()(i podobnie dla wielu innych funkcji). A także dodaj sess.run(tf.initialize_local_variables())po sess.run(tf.initialize_all_variables()).
MiniQuark
Jeszcze kilka zmian do wprowadzenia: pack()jest teraz stack()i initialize_all_variables()powinno zostać zastąpione przez global_variables_initializer()i local_variables_initializer().
MiniQuark
W tensorflow 1.0.1 musisz zainicjować zmienne lokalne i globalne jako tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()).run(). Będziesz musiał zainicjować zmienne lokalne, ponieważ używasz num_epochs i zgodnie z dokumentacją „Uwaga: jeśli num_epochsnie None, ta funkcja tworzy lokalny licznik epochs”.
Bruno R. Cardoso,
13

Możesz też spróbować tego, kod ładuje zestaw danych Iris do tensorflow za pomocą pand i numpy, a prosty wynik jednego neuronu jest drukowany w sesji. Mam nadzieję, że pomoże to w podstawowym zrozumieniu… [Nie dodałem sposobu, w jaki sposób dekodować na gorąco etykiety].

import tensorflow as tf 
import numpy
import pandas as pd
df=pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [0,1,2,3,4],skiprows = [0],header=None)
d = df.values
l = pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [5] ,header=None)
labels = l.values
data = numpy.float32(d)
labels = numpy.array(l,'str')
#print data, labels

#tensorflow
x = tf.placeholder(tf.float32,shape=(150,5))
x = data
w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32)
y = tf.nn.softmax(tf.matmul(w,x))

with tf.Session() as sess:
    print sess.run(y)
Nagarjun Gururaj
źródło
To było bardzo pouczające, ale jeśli dobrze rozumiem, nie pokazuje, jak wykorzystać dane do treningu ...
dividebyzero
tak, dodam je wkrótce ... To powinno być trywialne, prawda ... oblicz stratę, uruchom optymalizator od razu dodam je wkrótce
Nagarjun Gururaj
2
Cześć dividebyzero, przepraszam, jestem spóźniony! Znalazłem inny link, który jest interesujący i naprawdę ułatwia problem tensorflow.org/tutorials/tflearn .... Tutaj możesz załadować pliki csv, wytrenować je, przeprowadzić klasyfikację ...
Nagarjun Gururaj
@NagarjunGururaj Czy mogę użyć zestawu danych utworzonego przez contrib_learn w normalnej procedurze tensorflow?
Jay Wong,
który zbiór danych? Masz na myśli Iris czy kogokolwiek innego?
Nagarjun Gururaj
2

Możesz użyć najnowszego API tf.data:

dataset = tf.contrib.data.make_csv_dataset(filepath)
iterator = dataset.make_initializable_iterator()
columns = iterator.get_next()
with tf.Session() as sess:
   sess.run([iteator.initializer])
Adarsh ​​Kumar
źródło
2

Jeśli ktoś przyszedł tutaj, szukając prostego sposobu na odczytanie absolutnie dużych i podzielonych na fragmenty plików CSV w tf.estimator API, to zobacz poniżej mój kod

CSV_COLUMNS = ['ID','text','class']
LABEL_COLUMN = 'class'
DEFAULTS = [['x'],['no'],[0]]  #Default values

def read_dataset(filename, mode, batch_size = 512):
    def _input_fn(v_test=False):
#         def decode_csv(value_column):
#             columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
#             features = dict(zip(CSV_COLUMNS, columns))
#             label = features.pop(LABEL_COLUMN)
#             return add_engineered(features), label

        # Create list of files that match pattern
        file_list = tf.gfile.Glob(filename)

        # Create dataset from file list
        #dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
        dataset = tf.contrib.data.make_csv_dataset(file_list,
                                                   batch_size=batch_size,
                                                   column_names=CSV_COLUMNS,
                                                   column_defaults=DEFAULTS,
                                                   label_name=LABEL_COLUMN)

        if mode == tf.estimator.ModeKeys.TRAIN:
            num_epochs = None # indefinitely
            dataset = dataset.shuffle(buffer_size = 10 * batch_size)
        else:
            num_epochs = 1 # end-of-input after this

        batch_features, batch_labels = dataset.make_one_shot_iterator().get_next()

        #Begins - Uncomment for testing only -----------------------------------------------------<
        if v_test == True:
            with tf.Session() as sess:
                print(sess.run(batch_features))
        #End - Uncomment for testing only -----------------------------------------------------<
        return add_engineered(batch_features), batch_labels
    return _input_fn

Przykładowe użycie w TF.estimator:

train_spec = tf.estimator.TrainSpec(input_fn = read_dataset(
                                                filename = train_file,
                                                mode = tf.estimator.ModeKeys.TRAIN,
                                                batch_size = 128), 
                                      max_steps = num_train_steps)
Hasan Rafiq
źródło
0

2.0 Zgodne rozwiązanie : Ta odpowiedź może być dostarczona przez innych w powyższym wątku, ale podam dodatkowe linki, które pomogą społeczności.

dataset = tf.data.experimental.make_csv_dataset(
      file_path,
      batch_size=5, # Artificially small to make examples easier to show.
      label_name=LABEL_COLUMN,
      na_value="?",
      num_epochs=1,
      ignore_errors=True, 
      **kwargs)

Więcej informacji można znaleźć w tym samouczku Tensorflow .

Wsparcie Tensorflow
źródło
1
Uważam, że ta odpowiedź (oraz samouczek i dokumentacja) są całkowicie frustrujące. Wciąż staje się „częścią drogi do szkolenia na danych CSV”, jak mówi PO. Tworzy „zbiór danych” (ale jakiego typu - czy to nawet dokumentacja tf.data.Dataset? Niejasna) i wydaje się, że zbiór danych jest zorientowany na kolumny, a nie na wiersz. Większość modeli potrzebuje partii wierszy przekazywanych do nich w celu szkolenia - jak osiągnąć ten krok? Zadałem to pytanie, szukając kompleksowego przykładu.
omatai
Proszę podać końcowy przykład make_csv_dataset zamiast umieszczać tylko abstrakcyjną dokumentację poziomu!
DevLoverUmar