讀取/寫入 IPC 格式#
Arrow 定義了兩種二進制格式來序列化記錄批次
串流格式:用於發送任意數量的記錄批次。此格式必須從頭到尾處理,且不支援隨機存取
檔案或隨機存取格式:用於序列化固定數量的記錄批次。它支援隨機存取,因此與記憶體映射一起使用時非常有用
寫入和讀取串流格式#
首先,讓我們用一小批記錄填充 VectorSchemaRoot
BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
for (int i = 0; i < 10; i++) {
bitVector.setSafe(i, i % 2 == 0 ? 0 : 1);
varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(10);
varCharVector.setValueCount(10);
List<Field> fields = Arrays.asList(bitVector.getField(), varCharVector.getField());
List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
現在,我們可以開始寫入包含一些批次的串流。為此,我們使用 ArrowStreamWriter
(用於任何字典編碼向量的 DictionaryProvider 是可選的,可以為 null))
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
// ... do write into the ArrowStreamWriter
}
這裡我們使用了記憶體內串流,但這也可能是 socket 或其他 IO 串流。然後我們可以執行
writer.start();
// write the first batch
writer.writeBatch();
// write another four batches.
for (int i = 0; i < 4; i++) {
// populate VectorSchemaRoot data and write the second batch
BitVector childVector1 = (BitVector)root.getVector(0);
VarCharVector childVector2 = (VarCharVector)root.getVector(1);
childVector1.reset();
childVector2.reset();
// ... do some populate work here, could be different for each batch
writer.writeBatch();
}
writer.end();
請注意,由於 writer 中的 VectorSchemaRoot
是一個可以容納批次的容器,因此批次會作為管道的一部分流經 VectorSchemaRoot
,因此我們需要在 writeBatch
之前填充資料,以便後續批次可以覆蓋先前的批次。
現在 ByteArrayOutputStream
包含完整的串流,其中包含 5 個記錄批次。我們可以使用 ArrowStreamReader
讀取這樣的串流。請注意,reader 內的 VectorSchemaRoot
將在每次呼叫 loadNextBatch()
時載入新值
try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
// This will be loaded with new values on every call to loadNextBatch
VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
Schema schema = readRoot.getSchema();
for (int i = 0; i < 5; i++) {
reader.loadNextBatch();
// ... do something with readRoot
}
}
這裡我們也提供一個使用字典編碼向量的簡單範例
// create provider
DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
try (
final VarCharVector dictVector = new VarCharVector("dict", allocator);
final VarCharVector vector = new VarCharVector("vector", allocator);
) {
// create dictionary vector
dictVector.allocateNewSafe();
dictVector.setSafe(0, "aa".getBytes());
dictVector.setSafe(1, "bb".getBytes());
dictVector.setSafe(2, "cc".getBytes());
dictVector.setValueCount(3);
// create dictionary
Dictionary dictionary =
new Dictionary(dictVector, new DictionaryEncoding(1L, false, /*indexType=*/null));
provider.put(dictionary);
// create original data vector
vector.allocateNewSafe();
vector.setSafe(0, "bb".getBytes());
vector.setSafe(1, "bb".getBytes());
vector.setSafe(2, "cc".getBytes());
vector.setSafe(3, "aa".getBytes());
vector.setValueCount(4);
// get the encoded vector
IntVector encodedVector = (IntVector) DictionaryEncoder.encode(vector, dictionary);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// create VectorSchemaRoot
List<Field> fields = Arrays.asList(encodedVector.getField());
List<FieldVector> vectors = Arrays.asList(encodedVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) {
// write data
ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out));
writer.start();
writer.writeBatch();
writer.end();
}
// read data
try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
reader.loadNextBatch();
VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
// get the encoded vector
IntVector intVector = (IntVector) readRoot.getVector(0);
// get dictionaries and decode the vector
Map<Long, Dictionary> dictionaryMap = reader.getDictionaryVectors();
long dictionaryId = intVector.getField().getDictionary().getId();
try (VarCharVector varCharVector =
(VarCharVector) DictionaryEncoder.decode(intVector, dictionaryMap.get(dictionaryId))) {
// ... use decoded vector
}
}
}
寫入和讀取隨機存取檔案#
ArrowFileWriter
具有與 ArrowStreamWriter
相同的 API
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowFileWriter writer = new ArrowFileWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
writer.start();
// write the first batch
writer.writeBatch();
// write another four batches.
for (int i = 0; i < 4; i++) {
// ... do populate work
writer.writeBatch();
}
writer.end();
}
ArrowFileReader
和 ArrowStreamReader
之間的區別在於,輸入源必須具有用於隨機存取的 seek
方法。由於我們可以存取整個 payload,因此我們知道檔案中的記錄批次數量,並且可以隨機讀取任何批次
try (ArrowFileReader reader = new ArrowFileReader(
new ByteArrayReadableSeekableByteChannel(out.toByteArray()), allocator)) {
// read the 4-th batch
ArrowBlock block = reader.getRecordBlocks().get(3);
reader.loadRecordBatch(block);
VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
}