讀取/寫入 IPC 格式#

Arrow 定義了兩種二進制格式來序列化記錄批次

  • 串流格式:用於發送任意數量的記錄批次。此格式必須從頭到尾處理,且不支援隨機存取

  • 檔案或隨機存取格式:用於序列化固定數量的記錄批次。它支援隨機存取,因此與記憶體映射一起使用時非常有用

寫入和讀取串流格式#

首先,讓我們用一小批記錄填充 VectorSchemaRoot

BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
for (int i = 0; i < 10; i++) {
  bitVector.setSafe(i, i % 2 == 0 ? 0 : 1);
  varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(10);
varCharVector.setValueCount(10);

List<Field> fields = Arrays.asList(bitVector.getField(), varCharVector.getField());
List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);

現在,我們可以開始寫入包含一些批次的串流。為此,我們使用 ArrowStreamWriter (用於任何字典編碼向量的 DictionaryProvider 是可選的,可以為 null))

try (
  ByteArrayOutputStream out = new ByteArrayOutputStream();
  ArrowStreamWriter writer = new ArrowStreamWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
  // ... do write into the ArrowStreamWriter
}

這裡我們使用了記憶體內串流,但這也可能是 socket 或其他 IO 串流。然後我們可以執行

writer.start();
// write the first batch
writer.writeBatch();

// write another four batches.
for (int i = 0; i < 4; i++) {
  // populate VectorSchemaRoot data and write the second batch
  BitVector childVector1 = (BitVector)root.getVector(0);
  VarCharVector childVector2 = (VarCharVector)root.getVector(1);
  childVector1.reset();
  childVector2.reset();
  // ... do some populate work here, could be different for each batch
  writer.writeBatch();
}

writer.end();

請注意,由於 writer 中的 VectorSchemaRoot 是一個可以容納批次的容器,因此批次會作為管道的一部分流經 VectorSchemaRoot,因此我們需要在 writeBatch 之前填充資料,以便後續批次可以覆蓋先前的批次。

現在 ByteArrayOutputStream 包含完整的串流,其中包含 5 個記錄批次。我們可以使用 ArrowStreamReader 讀取這樣的串流。請注意,reader 內的 VectorSchemaRoot 將在每次呼叫 loadNextBatch() 時載入新值

try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
  // This will be loaded with new values on every call to loadNextBatch
  VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
  Schema schema = readRoot.getSchema();
  for (int i = 0; i < 5; i++) {
    reader.loadNextBatch();
    // ... do something with readRoot
  }
}

這裡我們也提供一個使用字典編碼向量的簡單範例

// create provider
DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();

try (
  final VarCharVector dictVector = new VarCharVector("dict", allocator);
  final VarCharVector vector = new VarCharVector("vector", allocator);
) {
  // create dictionary vector
  dictVector.allocateNewSafe();
  dictVector.setSafe(0, "aa".getBytes());
  dictVector.setSafe(1, "bb".getBytes());
  dictVector.setSafe(2, "cc".getBytes());
  dictVector.setValueCount(3);

  // create dictionary
  Dictionary dictionary =
      new Dictionary(dictVector, new DictionaryEncoding(1L, false, /*indexType=*/null));
  provider.put(dictionary);

  // create original data vector
  vector.allocateNewSafe();
  vector.setSafe(0, "bb".getBytes());
  vector.setSafe(1, "bb".getBytes());
  vector.setSafe(2, "cc".getBytes());
  vector.setSafe(3, "aa".getBytes());
  vector.setValueCount(4);

  // get the encoded vector
  IntVector encodedVector = (IntVector) DictionaryEncoder.encode(vector, dictionary);

  ByteArrayOutputStream out = new ByteArrayOutputStream();

  // create VectorSchemaRoot
  List<Field> fields = Arrays.asList(encodedVector.getField());
  List<FieldVector> vectors = Arrays.asList(encodedVector);
  try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) {

      // write data
      ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out));
      writer.start();
      writer.writeBatch();
      writer.end();
  }

  // read data
  try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
    reader.loadNextBatch();
    VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
    // get the encoded vector
    IntVector intVector = (IntVector) readRoot.getVector(0);

    // get dictionaries and decode the vector
    Map<Long, Dictionary> dictionaryMap = reader.getDictionaryVectors();
    long dictionaryId = intVector.getField().getDictionary().getId();
    try (VarCharVector varCharVector =
        (VarCharVector) DictionaryEncoder.decode(intVector, dictionaryMap.get(dictionaryId))) {
      // ... use decoded vector
    }
  }
}

寫入和讀取隨機存取檔案#

ArrowFileWriter 具有與 ArrowStreamWriter 相同的 API

try (
  ByteArrayOutputStream out = new ByteArrayOutputStream();
  ArrowFileWriter writer = new ArrowFileWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
  writer.start();
  // write the first batch
  writer.writeBatch();
  // write another four batches.
  for (int i = 0; i < 4; i++) {
    // ... do populate work
    writer.writeBatch();
  }
  writer.end();
}

ArrowFileReaderArrowStreamReader 之間的區別在於,輸入源必須具有用於隨機存取的 seek 方法。由於我們可以存取整個 payload,因此我們知道檔案中的記錄批次數量,並且可以隨機讀取任何批次

try (ArrowFileReader reader = new ArrowFileReader(
    new ByteArrayReadableSeekableByteChannel(out.toByteArray()), allocator)) {

  // read the 4-th batch
  ArrowBlock block = reader.getRecordBlocks().get(3);
  reader.loadRecordBatch(block);
  VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
}