讀取和寫入 CSV 檔案#

Arrow 提供快速的 CSV 讀取器,允許擷取外部資料以建立 Arrow 表格或 Arrow RecordBatch 的串流。

讀取 CSV 檔案#

CSV 檔案中的資料可以使用 TableReader 讀取為單個 Arrow 表格,或使用 StreamingReader 串流為 RecordBatch。 有關這兩種方法之間權衡的討論,請參閱 權衡

這些讀取器都需要一個 arrow::io::InputStream 實例,表示輸入檔案。 它們的行為可以使用 ReadOptionsParseOptionsConvertOptions 的組合來自訂。

TableReader#

#include "arrow/csv/api.h"

{
   // ...
   arrow::io::IOContext io_context = arrow::io::default_io_context();
   std::shared_ptr<arrow::io::InputStream> input = ...;

   auto read_options = arrow::csv::ReadOptions::Defaults();
   auto parse_options = arrow::csv::ParseOptions::Defaults();
   auto convert_options = arrow::csv::ConvertOptions::Defaults();

   // Instantiate TableReader from input stream and options
   auto maybe_reader =
     arrow::csv::TableReader::Make(io_context,
                                   input,
                                   read_options,
                                   parse_options,
                                   convert_options);
   if (!maybe_reader.ok()) {
     // Handle TableReader instantiation error...
   }
   std::shared_ptr<arrow::csv::TableReader> reader = *maybe_reader;

   // Read table from CSV file
   auto maybe_table = reader->Read();
   if (!maybe_table.ok()) {
     // Handle CSV read error
     // (for example a CSV syntax error or failed type conversion)
   }
   std::shared_ptr<arrow::Table> table = *maybe_table;
}

StreamingReader#

#include "arrow/csv/api.h"

{
   // ...
   arrow::io::IOContext io_context = arrow::io::default_io_context();
   std::shared_ptr<arrow::io::InputStream> input = ...;

   auto read_options = arrow::csv::ReadOptions::Defaults();
   auto parse_options = arrow::csv::ParseOptions::Defaults();
   auto convert_options = arrow::csv::ConvertOptions::Defaults();

   // Instantiate StreamingReader from input stream and options
   auto maybe_reader =
     arrow::csv::StreamingReader::Make(io_context,
                                       input,
                                       read_options,
                                       parse_options,
                                       convert_options);
   if (!maybe_reader.ok()) {
     // Handle StreamingReader instantiation error...
   }
   std::shared_ptr<arrow::csv::StreamingReader> reader = *maybe_reader;

   // Set aside a RecordBatch pointer for re-use while streaming
   std::shared_ptr<RecordBatch> batch;

   while (true) {
       // Attempt to read the first RecordBatch
       arrow::Status status = reader->ReadNext(&batch);

       if (!status.ok()) {
         // Handle read error
       }

       if (batch == NULL) {
         // Handle end of file
         break;
       }

       // Do something with the batch
   }
}

權衡#

使用 TableReaderStreamingReader 之間的選擇最終將取決於使用案例,但有一些權衡需要注意

  1. 記憶體使用量: TableReader 一次將所有資料載入記憶體,並且根據資料量,可能需要比 StreamingReader 更多的記憶體,後者一次僅載入一個 RecordBatch。 這很可能是使用者最重要的權衡。

  2. 速度: 在讀取 CSV 的全部內容時,TableReader 通常會比 StreamingReader 更快,因為它可以更好地利用可用的核心。 有關更多詳細資訊,請參閱 效能

  3. 彈性: StreamingReader 可能被認為不如 TableReader 靈活,因為它僅對讀取的第一個區塊執行類型推斷,之後類型會被凍結,並且後續區塊中任何無法轉換為這些類型的資料都會導致錯誤。 請注意,可以透過將 ReadOptions::block_size 設定為足夠大的值,或使用 ConvertOptions::column_types 明確設定所需的資料類型來補救此問題。

寫入 CSV 檔案#

CSV 檔案會寫入到 OutputStream

#include <arrow/csv/api.h>
{
    // Oneshot write
    // ...
    std::shared_ptr<arrow::io::OutputStream> output = ...;
    auto write_options = arrow::csv::WriteOptions::Defaults();
    if (WriteCSV(table, write_options, output.get()).ok()) {
        // Handle writer error...
    }
}
{
    // Write incrementally
    // ...
    std::shared_ptr<arrow::io::OutputStream> output = ...;
    auto write_options = arrow::csv::WriteOptions::Defaults();
    auto maybe_writer = arrow::csv::MakeCSVWriter(output, schema, write_options);
    if (!maybe_writer.ok()) {
        // Handle writer instantiation error...
    }
    std::shared_ptr<arrow::ipc::RecordBatchWriter> writer = *maybe_writer;

    // Write batches...
    if (!writer->WriteRecordBatch(*batch).ok()) {
        // Handle write error...
    }

    if (!writer->Close().ok()) {
        // Handle close error...
    }
    if (!output->Close().ok()) {
        // Handle file close error...
    }
}

注意

寫入器尚不支援所有 Arrow 類型。

欄名#

有三種可能的方法可以從 CSV 檔案推斷欄名

  • 預設情況下,欄名是從 CSV 檔案的第一列讀取的

  • 如果設定了 ReadOptions::column_names,它會強制表格中的欄名為這些值(CSV 檔案中的第一列會被讀取為資料)

  • 如果 ReadOptions::autogenerate_column_names 為 true,則欄名將會使用模式 "f0"、"f1"... 自動產生(CSV 檔案中的第一列會被讀取為資料)

欄位選擇#

預設情況下,Arrow 會讀取 CSV 檔案中的所有欄位。 您可以使用 ConvertOptions::include_columns 選項來縮小欄位的選擇範圍。 如果 ConvertOptions::include_columns 中的某些欄位在 CSV 檔案中遺失,則會發出錯誤,除非 ConvertOptions::include_missing_columns 為 true,在這種情況下,遺失的欄位會被假定為包含全 null 值。

與欄名的互動#

如果同時指定了 ReadOptions::column_namesConvertOptions::include_columns,則 ReadOptions::column_names 會被假定為對應到 CSV 欄位,而 ConvertOptions::include_columns 是這些欄名的一個子集,將會是 Arrow 表格的一部分。

資料類型#

預設情況下,CSV 讀取器會為每個欄位推斷最合適的資料類型。 類型推斷會依序考量以下資料類型

可以透過設定 ConvertOptions::column_types 選項來覆寫選定欄位的類型推斷。 可以從以下列表中選擇明確的資料類型

  • Null

  • 所有整數類型

  • Float32 和 Float64

  • Decimal128

  • Boolean

  • Date32 和 Date64

  • Time32 和 Time64

  • Timestamp

  • Binary 和 Large Binary

  • String 和 Large String(可選的 UTF8 輸入驗證)

  • 固定大小二進位

  • Dictionary,索引類型為 Int32,值類型為以下之一:Binary、String、LargeBinary、LargeString、Int32、UInt32、Int64、UInt64、Float32、Float64、Decimal128

其他資料類型不支援從 CSV 值轉換,並且會產生錯誤。

字典推斷#

如果啟用類型推斷並且 ConvertOptions::auto_dict_encode 為 true,則 CSV 讀取器首先嘗試將類似字串的欄位轉換為字典編碼的類似字串的陣列。 當達到 ConvertOptions::auto_dict_max_cardinality 中的閾值時,它會切換到純粹的類似字串的陣列。

時間戳記推斷/解析#

如果啟用類型推斷,則 CSV 讀取器首先嘗試將類似字串的欄位解釋為時間戳記。 如果所有列都具有某個時區偏移(例如 Z+0100),即使偏移不一致,則推斷的類型將為 UTC 時間戳記。 如果沒有列具有時區偏移,則推斷的類型將為不帶時區的時間戳記。 混合具有/不具有偏移的列將導致字串欄位。

如果類型明確指定為帶有/不帶有時區的時間戳記,則讀取器將在該欄位中沒有/帶有時區偏移的值上產生錯誤。 請注意,這表示目前無法讓讀取器將不帶時區偏移的時間戳記欄位解析為特定時區的本地時間; 相反地,請將欄位解析為不帶時區的時間戳記,然後使用 assume_timezone 運算函數在之後轉換值。

指定的類型

輸入 CSV

結果類型

(推斷)

2021-01-01T00:00:00

timestamp[s]

2021-01-01T00:00:00Z

timestamp[s, UTC]

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

字串

timestamp[s]

2021-01-01T00:00:00

timestamp[s]

2021-01-01T00:00:00Z

(錯誤)

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

timestamp[s, UTC]

2021-01-01T00:00:00

(錯誤)

2021-01-01T00:00:00Z

timestamp[s, UTC]

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

(錯誤)

timestamp[s, America/New_York]

2021-01-01T00:00:00

(錯誤)

2021-01-01T00:00:00Z

timestamp[s, America/New_York]

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

(錯誤)

Null 值#

Null 值會從儲存在 ConvertOptions::null_values 中的拼字識別。 ConvertOptions::Defaults() 工廠方法將初始化許多傳統的 null 拼字,例如 N/A

字元編碼#

CSV 檔案預期以 UTF8 編碼。 但是,Binary 欄位接受非 UTF8 資料。

寫入選項#

寫入的 CSV 檔案格式可以透過 WriteOptions 自訂。 目前只有少數選項可用; 未來的版本將會新增更多選項。

效能#

預設情況下,TableReader 將會平行化讀取,以便利用您機器上的所有 CPU 核心。 您可以在 ReadOptions::use_threads 中變更此設定。 合理的期望是在高效能桌上型或筆記型電腦上,每個核心至少達到 100 MB/秒(以來源 CSV 位元組測量,而不是目標 Arrow 資料位元組)。