表格資料集#
另請參閱
Arrow Datasets 程式庫提供有效處理表格化、可能大於記憶體和多檔案資料集的功能。這包括:
支援不同來源、檔案格式和不同檔案系統(本機、雲端)的統一介面。
來源探索(爬行目錄、處理具有各種分割方案的分割資料集、基本結構描述正規化等)
最佳化讀取,具有述詞下推(篩選列)、投影(選擇和衍生欄)以及可選的平行讀取。
目前支援的檔案格式為 Parquet、Feather / Arrow IPC、CSV 和 ORC(請注意,ORC 資料集目前只能讀取,尚無法寫入)。目標是在未來擴展對其他檔案格式和資料來源(例如,資料庫連線)的支援。
讀取資料集#
對於以下範例,讓我們建立一個小型資料集,其中包含一個目錄和兩個 Parquet 檔案
49// Generate some data for the rest of this example.
50arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
51 auto schema =
52 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
53 arrow::field("c", arrow::int64())});
54 std::shared_ptr<arrow::Array> array_a;
55 std::shared_ptr<arrow::Array> array_b;
56 std::shared_ptr<arrow::Array> array_c;
57 arrow::NumericBuilder<arrow::Int64Type> builder;
58 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
59 ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
60 builder.Reset();
61 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
62 ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
63 builder.Reset();
64 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
65 ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
66 return arrow::Table::Make(schema, {array_a, array_b, array_c});
67}
68
69// Set up a dataset by writing two Parquet files.
70arrow::Result<std::string> CreateExampleParquetDataset(
71 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
72 auto base_path = root_path + "/parquet_dataset";
73 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
74 // Create an Arrow Table
75 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
76 // Write it into two Parquet files
77 ARROW_ASSIGN_OR_RAISE(auto output,
78 filesystem->OpenOutputStream(base_path + "/data1.parquet"));
79 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
80 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
81 ARROW_ASSIGN_OR_RAISE(output,
82 filesystem->OpenOutputStream(base_path + "/data2.parquet"));
83 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
84 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
85 return base_path;
86}
(請參閱底部的完整範例:關於交易和 ACID 保證的注意事項。)
資料集探索#
arrow::dataset::Dataset
物件可以使用各種 arrow::dataset::DatasetFactory
物件建立。在這裡,我們將使用 arrow::dataset::FileSystemDatasetFactory
,它可以根據基本目錄路徑建立資料集
162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164 const std::shared_ptr<fs::FileSystem>& filesystem,
165 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166 // Create a dataset by scanning the filesystem for files
167 fs::FileSelector selector;
168 selector.base_dir = base_dir;
169 ARROW_ASSIGN_OR_RAISE(
170 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171 ds::FileSystemFactoryOptions()));
172 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173 // Print out the fragments
174 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175 for (const auto& fragment : fragments) {
176 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177 }
178 // Read the entire dataset as a Table
179 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181 return scanner->ToTable();
182}
我們還傳遞了要使用的檔案系統和用於讀取的檔案格式。這讓我們可以在(例如)讀取本機檔案或 Amazon S3 中的檔案之間,或在 Parquet 和 CSV 之間進行選擇。
除了搜尋基本目錄之外,我們還可以手動列出檔案路徑。
建立 arrow::dataset::Dataset
不會開始讀取資料本身。它只會爬行目錄以尋找所有檔案(如果需要),可以使用 arrow::dataset::FileSystemDataset::files()
檢索這些檔案
// Print out the files crawled (only for FileSystemDataset)
for (const auto& filename : dataset->files()) {
std::cout << filename << std::endl;
}
...並推斷資料集的結構描述(預設從第一個檔案)
std::cout << dataset->schema()->ToString() << std::endl;
使用 arrow::dataset::Dataset::NewScan()
方法,我們可以建構 arrow::dataset::Scanner
並使用 arrow::dataset::Scanner::ToTable()
方法將資料集(或其一部分)讀取到 arrow::Table
中
162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164 const std::shared_ptr<fs::FileSystem>& filesystem,
165 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166 // Create a dataset by scanning the filesystem for files
167 fs::FileSelector selector;
168 selector.base_dir = base_dir;
169 ARROW_ASSIGN_OR_RAISE(
170 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171 ds::FileSystemFactoryOptions()));
172 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173 // Print out the fragments
174 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175 for (const auto& fragment : fragments) {
176 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177 }
178 // Read the entire dataset as a Table
179 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181 return scanner->ToTable();
182}
注意
根據您的資料集大小,這可能需要大量記憶體;請參閱下方的 篩選資料 以了解關於篩選/投影的資訊。
讀取不同的檔案格式#
上面的範例在本機磁碟上使用 Parquet 檔案,但 Dataset API 在多種檔案格式和檔案系統中提供了統一的介面。(有關後者的更多資訊,請參閱 從雲端儲存讀取。)目前,支援 Parquet、ORC、Feather / Arrow IPC 和 CSV 檔案格式;未來計劃支援更多格式。
如果我們將表格另存為 Feather 檔案而不是 Parquet 檔案
90// Set up a dataset by writing two Feather files.
91arrow::Result<std::string> CreateExampleFeatherDataset(
92 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
93 auto base_path = root_path + "/feather_dataset";
94 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
95 // Create an Arrow Table
96 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
97 // Write it into two Feather files
98 ARROW_ASSIGN_OR_RAISE(auto output,
99 filesystem->OpenOutputStream(base_path + "/data1.feather"));
100 ARROW_ASSIGN_OR_RAISE(auto writer,
101 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
102 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
103 ARROW_RETURN_NOT_OK(writer->Close());
104 ARROW_ASSIGN_OR_RAISE(output,
105 filesystem->OpenOutputStream(base_path + "/data2.feather"));
106 ARROW_ASSIGN_OR_RAISE(writer,
107 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
108 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
109 ARROW_RETURN_NOT_OK(writer->Close());
110 return base_path;
111}
...那麼我們可以透過傳遞 arrow::dataset::IpcFileFormat
來讀取 Feather 檔案
auto format = std::make_shared<ds::ParquetFileFormat>();
// ...
auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
.ValueOrDie();
自訂檔案格式#
arrow::dataset::FileFormat
物件具有控制檔案讀取方式的屬性。例如:
auto format = std::make_shared<ds::ParquetFileFormat>();
format->reader_options.dict_columns.insert("a");
將配置欄位 "a"
在讀取時進行字典編碼。同樣地,設定 arrow::dataset::CsvFileFormat::parse_options
讓我們可以更改讀取逗號分隔或 Tab 分隔資料等設定。
此外,將 arrow::dataset::FragmentScanOptions
傳遞給 arrow::dataset::ScannerBuilder::FragmentScanOptions()
可以對資料掃描提供細緻的控制。例如,對於 CSV 檔案,我們可以更改在掃描時將哪些值轉換為布林值 true 和 false。
篩選資料#
到目前為止,我們一直在讀取整個資料集,但如果我們只需要資料的子集,這可能會浪費時間或記憶體來讀取我們不需要的資料。arrow::dataset::Scanner
提供了對要讀取哪些資料的控制。
在這個程式碼片段中,我們使用 arrow::dataset::ScannerBuilder::Project()
來選擇要讀取的欄位
186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191 const std::shared_ptr<fs::FileSystem>& filesystem,
192 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193 fs::FileSelector selector;
194 selector.base_dir = base_dir;
195 ARROW_ASSIGN_OR_RAISE(
196 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197 ds::FileSystemFactoryOptions()));
198 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199 // Read specified columns with a row filter
200 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204 return scanner->ToTable();
205}
某些格式(例如 Parquet)可以透過僅從檔案系統讀取指定的欄位來降低 I/O 成本。
可以使用 arrow::dataset::ScannerBuilder::Filter()
提供篩選器,以便不符合篩選述詞的列不會包含在傳回的表格中。同樣,某些格式(例如 Parquet)可以使用此篩選器來減少所需的 I/O 量。
186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191 const std::shared_ptr<fs::FileSystem>& filesystem,
192 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193 fs::FileSelector selector;
194 selector.base_dir = base_dir;
195 ARROW_ASSIGN_OR_RAISE(
196 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197 ds::FileSystemFactoryOptions()));
198 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199 // Read specified columns with a row filter
200 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204 return scanner->ToTable();
205}
投影欄位#
除了選擇欄位之外,arrow::dataset::ScannerBuilder::Project()
也可用於更複雜的投影,例如重新命名欄位、將其轉換為其他類型,甚至根據評估運算式衍生新的欄位。
在這種情況下,我們傳遞一個用於建構欄位值的運算式向量和一個用於欄位的名稱向量
209// Read a dataset, but with column projection.
210//
211// This is useful to derive new columns from existing data. For example, here we
212// demonstrate casting a column to a different type, and turning a numeric column into a
213// boolean column based on a predicate. You could also rename columns or perform
214// computations involving multiple columns.
215arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
216 const std::shared_ptr<fs::FileSystem>& filesystem,
217 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
218 fs::FileSelector selector;
219 selector.base_dir = base_dir;
220 ARROW_ASSIGN_OR_RAISE(
221 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
222 ds::FileSystemFactoryOptions()));
223 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
224 // Read specified columns with a row filter
225 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
226 ARROW_RETURN_NOT_OK(scan_builder->Project(
227 {
228 // Leave column "a" as-is.
229 cp::field_ref("a"),
230 // Cast column "b" to float32.
231 cp::call("cast", {cp::field_ref("b")},
232 arrow::compute::CastOptions::Safe(arrow::float32())),
233 // Derive a boolean column from "c".
234 cp::equal(cp::field_ref("c"), cp::literal(1)),
235 },
236 {"a_renamed", "b_as_float32", "c_1"}));
237 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
238 return scanner->ToTable();
239}
這也決定了欄位選擇;只有給定的欄位才會出現在結果表格中。如果您想要在現有欄位之外包含衍生欄位,您可以從資料集結構描述建構運算式
243// Read a dataset, but with column projection.
244//
245// This time, we read all original columns plus one derived column. This simply combines
246// the previous two examples: selecting a subset of columns by name, and deriving new
247// columns with an expression.
248arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
249 const std::shared_ptr<fs::FileSystem>& filesystem,
250 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
251 fs::FileSelector selector;
252 selector.base_dir = base_dir;
253 ARROW_ASSIGN_OR_RAISE(
254 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
255 ds::FileSystemFactoryOptions()));
256 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
257 // Read specified columns with a row filter
258 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
259 std::vector<std::string> names;
260 std::vector<cp::Expression> exprs;
261 // Read all the original columns.
262 for (const auto& field : dataset->schema()->fields()) {
263 names.push_back(field->name());
264 exprs.push_back(cp::field_ref(field->name()));
265 }
266 // Also derive a new column.
267 names.emplace_back("b_large");
268 exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
269 ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
270 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
271 return scanner->ToTable();
272}
注意
當結合篩選器和投影時,Arrow 將確定所有必要的欄位以進行讀取。例如,如果您篩選一個最終未被選取的欄位,Arrow 仍然會讀取該欄位以評估篩選器。
讀取和寫入分割資料#
到目前為止,我們一直在處理由包含檔案的平面目錄組成的資料集。通常,資料集會具有一個或多個經常被篩選的欄位。我們可以將檔案組織成巢狀目錄結構,而不是必須讀取然後篩選資料,我們可以定義一個分割資料集,其中子目錄名稱包含有關該目錄中儲存的資料子集的資訊。然後,我們可以更有效率地篩選資料,透過使用該資訊來避免載入不符合篩選器的檔案。
例如,按年份和月份分割的資料集可能具有以下佈局
dataset_name/
year=2007/
month=01/
data0.parquet
data1.parquet
...
month=02/
data0.parquet
data1.parquet
...
month=03/
...
year=2008/
month=01/
...
...
上面的分割方案使用 “/key=value/” 目錄名稱,如 Apache Hive 中所見。在此慣例下,dataset_name/year=2007/month=01/data0.parquet
中的檔案僅包含 year == 2007
和 month == 01
的資料。
讓我們建立一個小型分割資料集。為此,我們將使用 Arrow 的資料集寫入功能。
115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118 auto base_path = root_path + "/parquet_dataset";
119 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120 // Create an Arrow Table
121 auto schema = arrow::schema(
122 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125 arrow::NumericBuilder<arrow::Int64Type> builder;
126 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128 builder.Reset();
129 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131 builder.Reset();
132 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134 arrow::StringBuilder string_builder;
135 ARROW_RETURN_NOT_OK(
136 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138 auto table = arrow::Table::Make(schema, arrays);
139 // Write it using Datasets
140 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144 // The partition schema determines which fields are part of the partitioning.
145 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148 // We'll write Parquet files.
149 auto format = std::make_shared<ds::ParquetFileFormat>();
150 ds::FileSystemDatasetWriteOptions write_options;
151 write_options.file_write_options = format->DefaultWriteOptions();
152 write_options.filesystem = filesystem;
153 write_options.base_dir = base_path;
154 write_options.partitioning = partitioning;
155 write_options.basename_template = "part{i}.parquet";
156 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157 return base_path;
158}
上面建立了一個包含兩個子目錄(“part=a” 和 “part=b”)的目錄,並且寫入這些目錄中的 Parquet 檔案不再包含 “part” 欄位。
讀取此資料集時,我們現在指定資料集應使用類似 Hive 的分割方案
276// Read an entire dataset, but with partitioning information.
277arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
278 const std::shared_ptr<fs::FileSystem>& filesystem,
279 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
280 fs::FileSelector selector;
281 selector.base_dir = base_dir;
282 selector.recursive = true; // Make sure to search subdirectories
283 ds::FileSystemFactoryOptions options;
284 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
285 // schema.
286 options.partitioning = ds::HivePartitioning::MakeFactory();
287 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
288 filesystem, selector, format, options));
289 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
290 // Print out the fragments
291 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
292 for (const auto& fragment : fragments) {
293 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
294 std::cout << "Partition expression: "
295 << (*fragment)->partition_expression().ToString() << std::endl;
296 }
297 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
298 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
299 return scanner->ToTable();
300}
雖然分割欄位未包含在實際的 Parquet 檔案中,但掃描此資料集時,它們將被新增回結果表格
$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitioned
Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
-- field metadata --
PARQUET:field_id: '1'
b: double
-- field metadata --
PARQUET:field_id: '2'
c: int64
-- field metadata --
PARQUET:field_id: '3'
part: string
----
# snip...
我們現在可以篩選分割鍵,如果檔案不符合篩選器,則可以完全避免載入檔案
304// Read an entire dataset, but with partitioning information. Also, filter the dataset on
305// the partition values.
306arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
307 const std::shared_ptr<fs::FileSystem>& filesystem,
308 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
309 fs::FileSelector selector;
310 selector.base_dir = base_dir;
311 selector.recursive = true;
312 ds::FileSystemFactoryOptions options;
313 options.partitioning = ds::HivePartitioning::MakeFactory();
314 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
315 filesystem, selector, format, options));
316 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
317 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
318 // Filter based on the partition values. This will mean that we won't even read the
319 // files whose partition expressions don't match the filter.
320 ARROW_RETURN_NOT_OK(
321 scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
322 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
323 return scanner->ToTable();
324}
不同的分割方案#
上面的範例使用類似 Hive 的目錄方案,例如 “/year=2009/month=11/day=15”。我們透過傳遞 Hive 分割工廠來指定這一點。在這種情況下,分割鍵的類型是從檔案路徑推斷出來的。
也可以直接建構分割並明確定義分割鍵的結構描述。例如:
auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
arrow::field("year", arrow::int16()),
arrow::field("month", arrow::int8()),
arrow::field("day", arrow::int32())
}));
Arrow 支援另一種分割方案「目錄分割」,其中檔案路徑中的區段表示分割鍵的值,而不包含名稱(欄位名稱隱含在區段的索引中)。例如,給定欄位名稱 “year”、“month” 和 “day”,一個路徑可能是 “/2019/11/15”。
由於名稱未包含在檔案路徑中,因此在建構目錄分割時必須指定這些名稱
auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});
目錄分割也支援提供完整的結構描述,而不是從檔案路徑推斷類型。
分割效能考量#
分割資料集有兩個方面會影響效能:它增加了檔案數量,並圍繞檔案建立目錄結構。這兩者都有好處和成本。根據配置和資料集的大小,成本可能超過好處。
由於分割將資料集分成多個檔案,因此可以使用平行處理讀取和寫入分割資料集。但是,每個額外的檔案都會增加檔案系統互動處理的一點點額外負擔。它還增加了整體資料集大小,因為每個檔案都有一些共用的中繼資料。例如,每個 Parquet 檔案都包含結構描述和群組層級統計資訊。分割區的數量是檔案數量的下限。如果您按日期分割資料集,資料為期一年,您將至少有 365 個檔案。如果您進一步按另一個具有 1,000 個唯一值的維度進行分割,您將最多有 365,000 個檔案。這種精細的分割通常會導致小檔案,這些檔案主要由中繼資料組成。
分割資料集會建立巢狀資料夾結構,這些結構讓我們可以修剪在掃描中載入哪些檔案。但是,這會增加探索資料集中檔案的額外負擔,因為我們需要遞迴地「列出目錄」以尋找資料檔案。過於精細的分割可能會在此處造成問題:按日期分割一年份的資料的資料集將需要 365 次列表呼叫才能找到所有檔案;新增另一個基數為 1,000 的欄位將使呼叫次數達到 365,365 次。
最佳分割佈局將取決於您的資料、存取模式以及將讀取資料的系統。大多數系統(包括 Arrow)都應該可以在一系列檔案大小和分割佈局中運作,但有些極端情況您應該避免。這些指南可以幫助避免一些已知的最糟情況
避免小於 20MB 和大於 2GB 的檔案。
避免分割佈局具有超過 10,000 個不同的分割區。
對於具有檔案內群組概念的檔案格式(例如 Parquet),也適用類似的指南。列群組可以在讀取時提供平行處理,並允許根據統計資訊跳過資料,但非常小的群組可能會導致中繼資料成為檔案大小的重要部分。在大多數情況下,Arrow 的檔案寫入器為群組大小調整提供了合理的預設值。
從其他資料來源讀取#
讀取記憶體內資料#
如果您已經有想要與 Datasets API 一起使用的記憶體內資料(例如,篩選/投影資料,或將其寫出到檔案系統),您可以將其包裝在 arrow::dataset::InMemoryDataset
中
auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();
在範例中,我們使用了 InMemoryDataset 將我們的範例資料寫入本機磁碟,這在範例的其餘部分中使用
115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118 auto base_path = root_path + "/parquet_dataset";
119 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120 // Create an Arrow Table
121 auto schema = arrow::schema(
122 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125 arrow::NumericBuilder<arrow::Int64Type> builder;
126 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128 builder.Reset();
129 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131 builder.Reset();
132 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134 arrow::StringBuilder string_builder;
135 ARROW_RETURN_NOT_OK(
136 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138 auto table = arrow::Table::Make(schema, arrays);
139 // Write it using Datasets
140 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144 // The partition schema determines which fields are part of the partitioning.
145 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148 // We'll write Parquet files.
149 auto format = std::make_shared<ds::ParquetFileFormat>();
150 ds::FileSystemDatasetWriteOptions write_options;
151 write_options.file_write_options = format->DefaultWriteOptions();
152 write_options.filesystem = filesystem;
153 write_options.base_dir = base_path;
154 write_options.partitioning = partitioning;
155 write_options.basename_template = "part{i}.parquet";
156 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157 return base_path;
158}
從雲端儲存讀取#
除了本機檔案之外,Arrow Datasets 也支援從雲端儲存系統(例如 Amazon S3)讀取,方法是傳遞不同的檔案系統。
有關可用檔案系統的更多詳細資訊,請參閱 檔案系統 文件。
關於交易和 ACID 保證的注意事項#
資料集 API 不提供交易支援或任何 ACID 保證。這會影響讀取和寫入。並行讀取沒有問題。並行寫入或與讀取同時發生的寫入可能會產生非預期的行為。可以使用各種方法來避免對相同檔案進行操作,例如為每個寫入器使用唯一的基本名稱範本、用於新檔案的臨時目錄,或單獨儲存檔案清單,而不是依賴目錄探索。
在寫入過程中意外終止程序可能會使系統處於不一致的狀態。寫入呼叫通常會在要寫入的位元組完全傳遞到作業系統頁面快取後立即傳回。即使寫入操作已完成,如果在寫入呼叫後立即發生突然斷電,檔案的一部分仍有可能遺失。
大多數檔案格式都有在結尾寫入的魔術數字。這表示可以安全地偵測和捨棄部分檔案寫入。CSV 檔案格式沒有任何此類概念,部分寫入的 CSV 檔案可能會被偵測為有效。
完整範例#
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This example showcases various ways to work with Datasets. It's
19// intended to be paired with the documentation.
20
21#include <arrow/api.h>
22#include <arrow/compute/cast.h>
23#include <arrow/dataset/dataset.h>
24#include <arrow/dataset/discovery.h>
25#include <arrow/dataset/file_base.h>
26#include <arrow/dataset/file_ipc.h>
27#include <arrow/dataset/file_parquet.h>
28#include <arrow/dataset/scanner.h>
29#include <arrow/filesystem/filesystem.h>
30#include <arrow/ipc/writer.h>
31#include <arrow/util/iterator.h>
32#include <parquet/arrow/writer.h>
33#include "arrow/compute/expression.h"
34
35#include <iostream>
36#include <vector>
37
38namespace ds = arrow::dataset;
39namespace fs = arrow::fs;
40namespace cp = arrow::compute;
41
42/**
43 * \brief Run Example
44 *
45 * ./debug/dataset-documentation-example file:///<some_path>/<some_directory> parquet
46 */
47
48// (Doc section: Reading Datasets)
49// Generate some data for the rest of this example.
50arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
51 auto schema =
52 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
53 arrow::field("c", arrow::int64())});
54 std::shared_ptr<arrow::Array> array_a;
55 std::shared_ptr<arrow::Array> array_b;
56 std::shared_ptr<arrow::Array> array_c;
57 arrow::NumericBuilder<arrow::Int64Type> builder;
58 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
59 ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
60 builder.Reset();
61 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
62 ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
63 builder.Reset();
64 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
65 ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
66 return arrow::Table::Make(schema, {array_a, array_b, array_c});
67}
68
69// Set up a dataset by writing two Parquet files.
70arrow::Result<std::string> CreateExampleParquetDataset(
71 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
72 auto base_path = root_path + "/parquet_dataset";
73 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
74 // Create an Arrow Table
75 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
76 // Write it into two Parquet files
77 ARROW_ASSIGN_OR_RAISE(auto output,
78 filesystem->OpenOutputStream(base_path + "/data1.parquet"));
79 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
80 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
81 ARROW_ASSIGN_OR_RAISE(output,
82 filesystem->OpenOutputStream(base_path + "/data2.parquet"));
83 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
84 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
85 return base_path;
86}
87// (Doc section: Reading Datasets)
88
89// (Doc section: Reading different file formats)
90// Set up a dataset by writing two Feather files.
91arrow::Result<std::string> CreateExampleFeatherDataset(
92 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
93 auto base_path = root_path + "/feather_dataset";
94 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
95 // Create an Arrow Table
96 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
97 // Write it into two Feather files
98 ARROW_ASSIGN_OR_RAISE(auto output,
99 filesystem->OpenOutputStream(base_path + "/data1.feather"));
100 ARROW_ASSIGN_OR_RAISE(auto writer,
101 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
102 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
103 ARROW_RETURN_NOT_OK(writer->Close());
104 ARROW_ASSIGN_OR_RAISE(output,
105 filesystem->OpenOutputStream(base_path + "/data2.feather"));
106 ARROW_ASSIGN_OR_RAISE(writer,
107 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
108 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
109 ARROW_RETURN_NOT_OK(writer->Close());
110 return base_path;
111}
112// (Doc section: Reading different file formats)
113
114// (Doc section: Reading and writing partitioned data)
115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118 auto base_path = root_path + "/parquet_dataset";
119 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120 // Create an Arrow Table
121 auto schema = arrow::schema(
122 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125 arrow::NumericBuilder<arrow::Int64Type> builder;
126 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128 builder.Reset();
129 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131 builder.Reset();
132 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134 arrow::StringBuilder string_builder;
135 ARROW_RETURN_NOT_OK(
136 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138 auto table = arrow::Table::Make(schema, arrays);
139 // Write it using Datasets
140 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144 // The partition schema determines which fields are part of the partitioning.
145 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148 // We'll write Parquet files.
149 auto format = std::make_shared<ds::ParquetFileFormat>();
150 ds::FileSystemDatasetWriteOptions write_options;
151 write_options.file_write_options = format->DefaultWriteOptions();
152 write_options.filesystem = filesystem;
153 write_options.base_dir = base_path;
154 write_options.partitioning = partitioning;
155 write_options.basename_template = "part{i}.parquet";
156 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157 return base_path;
158}
159// (Doc section: Reading and writing partitioned data)
160
161// (Doc section: Dataset discovery)
162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164 const std::shared_ptr<fs::FileSystem>& filesystem,
165 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166 // Create a dataset by scanning the filesystem for files
167 fs::FileSelector selector;
168 selector.base_dir = base_dir;
169 ARROW_ASSIGN_OR_RAISE(
170 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171 ds::FileSystemFactoryOptions()));
172 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173 // Print out the fragments
174 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175 for (const auto& fragment : fragments) {
176 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177 }
178 // Read the entire dataset as a Table
179 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181 return scanner->ToTable();
182}
183// (Doc section: Dataset discovery)
184
185// (Doc section: Filtering data)
186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191 const std::shared_ptr<fs::FileSystem>& filesystem,
192 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193 fs::FileSelector selector;
194 selector.base_dir = base_dir;
195 ARROW_ASSIGN_OR_RAISE(
196 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197 ds::FileSystemFactoryOptions()));
198 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199 // Read specified columns with a row filter
200 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204 return scanner->ToTable();
205}
206// (Doc section: Filtering data)
207
208// (Doc section: Projecting columns)
209// Read a dataset, but with column projection.
210//
211// This is useful to derive new columns from existing data. For example, here we
212// demonstrate casting a column to a different type, and turning a numeric column into a
213// boolean column based on a predicate. You could also rename columns or perform
214// computations involving multiple columns.
215arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
216 const std::shared_ptr<fs::FileSystem>& filesystem,
217 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
218 fs::FileSelector selector;
219 selector.base_dir = base_dir;
220 ARROW_ASSIGN_OR_RAISE(
221 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
222 ds::FileSystemFactoryOptions()));
223 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
224 // Read specified columns with a row filter
225 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
226 ARROW_RETURN_NOT_OK(scan_builder->Project(
227 {
228 // Leave column "a" as-is.
229 cp::field_ref("a"),
230 // Cast column "b" to float32.
231 cp::call("cast", {cp::field_ref("b")},
232 arrow::compute::CastOptions::Safe(arrow::float32())),
233 // Derive a boolean column from "c".
234 cp::equal(cp::field_ref("c"), cp::literal(1)),
235 },
236 {"a_renamed", "b_as_float32", "c_1"}));
237 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
238 return scanner->ToTable();
239}
240// (Doc section: Projecting columns)
241
242// (Doc section: Projecting columns #2)
243// Read a dataset, but with column projection.
244//
245// This time, we read all original columns plus one derived column. This simply combines
246// the previous two examples: selecting a subset of columns by name, and deriving new
247// columns with an expression.
248arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
249 const std::shared_ptr<fs::FileSystem>& filesystem,
250 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
251 fs::FileSelector selector;
252 selector.base_dir = base_dir;
253 ARROW_ASSIGN_OR_RAISE(
254 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
255 ds::FileSystemFactoryOptions()));
256 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
257 // Read specified columns with a row filter
258 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
259 std::vector<std::string> names;
260 std::vector<cp::Expression> exprs;
261 // Read all the original columns.
262 for (const auto& field : dataset->schema()->fields()) {
263 names.push_back(field->name());
264 exprs.push_back(cp::field_ref(field->name()));
265 }
266 // Also derive a new column.
267 names.emplace_back("b_large");
268 exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
269 ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
270 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
271 return scanner->ToTable();
272}
273// (Doc section: Projecting columns #2)
274
275// (Doc section: Reading and writing partitioned data #2)
276// Read an entire dataset, but with partitioning information.
277arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
278 const std::shared_ptr<fs::FileSystem>& filesystem,
279 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
280 fs::FileSelector selector;
281 selector.base_dir = base_dir;
282 selector.recursive = true; // Make sure to search subdirectories
283 ds::FileSystemFactoryOptions options;
284 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
285 // schema.
286 options.partitioning = ds::HivePartitioning::MakeFactory();
287 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
288 filesystem, selector, format, options));
289 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
290 // Print out the fragments
291 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
292 for (const auto& fragment : fragments) {
293 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
294 std::cout << "Partition expression: "
295 << (*fragment)->partition_expression().ToString() << std::endl;
296 }
297 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
298 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
299 return scanner->ToTable();
300}
301// (Doc section: Reading and writing partitioned data #2)
302
303// (Doc section: Reading and writing partitioned data #3)
304// Read an entire dataset, but with partitioning information. Also, filter the dataset on
305// the partition values.
306arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
307 const std::shared_ptr<fs::FileSystem>& filesystem,
308 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
309 fs::FileSelector selector;
310 selector.base_dir = base_dir;
311 selector.recursive = true;
312 ds::FileSystemFactoryOptions options;
313 options.partitioning = ds::HivePartitioning::MakeFactory();
314 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
315 filesystem, selector, format, options));
316 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
317 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
318 // Filter based on the partition values. This will mean that we won't even read the
319 // files whose partition expressions don't match the filter.
320 ARROW_RETURN_NOT_OK(
321 scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
322 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
323 return scanner->ToTable();
324}
325// (Doc section: Reading and writing partitioned data #3)
326
327arrow::Status RunDatasetDocumentation(const std::string& format_name,
328 const std::string& uri, const std::string& mode) {
329 std::string base_path;
330 std::shared_ptr<ds::FileFormat> format;
331 std::string root_path;
332 ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri(uri, &root_path));
333
334 if (format_name == "feather") {
335 format = std::make_shared<ds::IpcFileFormat>();
336 ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleFeatherDataset(fs, root_path));
337 } else if (format_name == "parquet") {
338 format = std::make_shared<ds::ParquetFileFormat>();
339 ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleParquetDataset(fs, root_path));
340 } else if (format_name == "parquet_hive") {
341 format = std::make_shared<ds::ParquetFileFormat>();
342 ARROW_ASSIGN_OR_RAISE(base_path,
343 CreateExampleParquetHivePartitionedDataset(fs, root_path));
344 } else {
345 std::cerr << "Unknown format: " << format_name << std::endl;
346 std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
347 return arrow::Status::ExecutionError("Dataset creating failed.");
348 }
349
350 std::shared_ptr<arrow::Table> table;
351 if (mode == "no_filter") {
352 ARROW_ASSIGN_OR_RAISE(table, ScanWholeDataset(fs, format, base_path));
353 } else if (mode == "filter") {
354 ARROW_ASSIGN_OR_RAISE(table, FilterAndSelectDataset(fs, format, base_path));
355 } else if (mode == "project") {
356 ARROW_ASSIGN_OR_RAISE(table, ProjectDataset(fs, format, base_path));
357 } else if (mode == "select_project") {
358 ARROW_ASSIGN_OR_RAISE(table, SelectAndProjectDataset(fs, format, base_path));
359 } else if (mode == "partitioned") {
360 ARROW_ASSIGN_OR_RAISE(table, ScanPartitionedDataset(fs, format, base_path));
361 } else if (mode == "filter_partitioned") {
362 ARROW_ASSIGN_OR_RAISE(table, FilterPartitionedDataset(fs, format, base_path));
363 } else {
364 std::cerr << "Unknown mode: " << mode << std::endl;
365 std::cerr
366 << "Supported modes: no_filter, filter, project, select_project, partitioned"
367 << std::endl;
368 return arrow::Status::ExecutionError("Dataset reading failed.");
369 }
370 std::cout << "Read " << table->num_rows() << " rows" << std::endl;
371 std::cout << table->ToString() << std::endl;
372 return arrow::Status::OK();
373}
374
375int main(int argc, char** argv) {
376 if (argc < 3) {
377 // Fake success for CI purposes.
378 return EXIT_SUCCESS;
379 }
380
381 std::string uri = argv[1];
382 std::string format_name = argv[2];
383 std::string mode = argc > 3 ? argv[3] : "no_filter";
384
385 auto status = RunDatasetDocumentation(format_name, uri, mode);
386 if (!status.ok()) {
387 std::cerr << status.ToString() << std::endl;
388 return EXIT_FAILURE;
389 }
390 return EXIT_SUCCESS;
391}