讀取與寫入資料¶
Arrow IPC 格式 定義了序列化 Arrow 資料的兩種二進位格式類型:串流格式和檔案格式 (或隨機存取格式)。此類檔案在讀取時可直接記憶體對映。
寫入¶
檔案寫入和串流格式寫入都使用相同的 API。
寫入隨機存取檔案¶
寫入—輸出至檔案¶
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
try (BufferAllocator allocator = new RootAllocator()) {
Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
Schema schemaPerson = new Schema(asList(name, age));
try(
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, allocator)
){
VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
nameVector.allocateNew(3);
nameVector.set(0, "David".getBytes());
nameVector.set(1, "Gladis".getBytes());
nameVector.set(2, "Juan".getBytes());
IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
vectorSchemaRoot.setRowCount(3);
File file = new File("randon_access_to_file.arrow");
try (
FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel())
) {
writer.start();
writer.writeBatch();
writer.end();
System.out.println("Record batches written: " + writer.getRecordBlocks().size() + ". Number of rows written: " + vectorSchemaRoot.getRowCount());
} catch (IOException e) {
e.printStackTrace();
}
}
}
Record batches written: 1. Number of rows written: 3
寫入—輸出至緩衝區¶
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
try (BufferAllocator allocator = new RootAllocator()) {
Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
Schema schemaPerson = new Schema(asList(name, age));
try(
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, allocator)
){
VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
nameVector.allocateNew(3);
nameVector.set(0, "David".getBytes());
nameVector.set(1, "Gladis".getBytes());
nameVector.set(2, "Juan".getBytes());
IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
vectorSchemaRoot.setRowCount(3);
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, null, Channels.newChannel(out))
) {
writer.start();
writer.writeBatch();
System.out.println("Record batches written: " + writer.getRecordBlocks().size() +
". Number of rows written: " + vectorSchemaRoot.getRowCount());
} catch (IOException e) {
e.printStackTrace();
}
}
}
Record batches written: 1. Number of rows written: 3
寫入串流格式¶
寫入—輸出至檔案¶
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
try (BufferAllocator rootAllocator = new RootAllocator()) {
Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
Schema schemaPerson = new Schema(asList(name, age));
try(
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, rootAllocator)
){
VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
nameVector.allocateNew(3);
nameVector.set(0, "David".getBytes());
nameVector.set(1, "Gladis".getBytes());
nameVector.set(2, "Juan".getBytes());
IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
vectorSchemaRoot.setRowCount(3);
File file = new File("streaming_to_file.arrow");
try (
FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, fileOutputStream.getChannel())
){
writer.start();
writer.writeBatch();
System.out.println("Number of rows written: " + vectorSchemaRoot.getRowCount());
} catch (IOException e) {
e.printStackTrace();
}
}
}
Number of rows written: 3
寫入—輸出至緩衝區¶
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
try (BufferAllocator rootAllocator = new RootAllocator()) {
Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
Schema schemaPerson = new Schema(asList(name, age));
try(
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, rootAllocator)
){
VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
nameVector.allocateNew(3);
nameVector.set(0, "David".getBytes());
nameVector.set(1, "Gladis".getBytes());
nameVector.set(2, "Juan".getBytes());
IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
vectorSchemaRoot.setRowCount(3);
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, Channels.newChannel(out))
){
writer.start();
writer.writeBatch();
System.out.println("Number of rows written: " + vectorSchemaRoot.getRowCount());
} catch (IOException e) {
e.printStackTrace();
}
}
}
Number of rows written: 3
讀取¶
讀取隨機存取格式和串流格式都提供相同的 API,不同之處在於隨機存取檔案也提供透過索引存取任何記錄批次。
讀取隨機存取檔案¶
讀取—從檔案¶
我們提供一個路徑包含自動產生的 arrow 檔案用於測試目的,若方便的話可自行變更。
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.VectorSchemaRoot;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
File file = new File("./thirdpartydeps/arrowfiles/random_access.arrow");
try(
BufferAllocator rootAllocator = new RootAllocator();
FileInputStream fileInputStream = new FileInputStream(file);
ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator)
){
System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
System.out.print(vectorSchemaRootRecover.contentToTSVString());
}
} catch (IOException e) {
e.printStackTrace();
}
Record batches in file: 3
name age
David 10
Gladis 20
Juan 30
name age
Nidia 15
Alexa 20
Mara 15
name age
Raul 34
Jhon 29
Thomy 33
讀取—從緩衝區¶
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Path path = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow");
try(
BufferAllocator rootAllocator = new RootAllocator();
ArrowFileReader reader = new ArrowFileReader(new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(
Files.readAllBytes(path))), rootAllocator)
) {
System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
System.out.print(vectorSchemaRootRecover.contentToTSVString());
}
} catch (IOException e) {
e.printStackTrace();
}
Record batches in file: 3
name age
David 10
Gladis 20
Juan 30
name age
Nidia 15
Alexa 20
Mara 15
name age
Raul 34
Jhon 29
Thomy 33
讀取串流格式¶
讀取—從檔案¶
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.VectorSchemaRoot;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
File file = new File("./thirdpartydeps/arrowfiles/streaming.arrow");
try(
BufferAllocator rootAllocator = new RootAllocator();
FileInputStream fileInputStreamForStream = new FileInputStream(file);
ArrowStreamReader reader = new ArrowStreamReader(fileInputStreamForStream, rootAllocator)
) {
while (reader.loadNextBatch()) {
VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
System.out.print(vectorSchemaRootRecover.contentToTSVString());
}
} catch (IOException e) {
e.printStackTrace();
}
name age
David 10
Gladis 20
Juan 30
name age
Nidia 15
Alexa 20
Mara 15
name age
Raul 34
Jhon 29
Thomy 33
讀取 - 從 Buffer¶
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Path path = Paths.get("./thirdpartydeps/arrowfiles/streaming.arrow");
try(
BufferAllocator rootAllocator = new RootAllocator();
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(
Files.readAllBytes(path)), rootAllocator)
) {
while(reader.loadNextBatch()){
System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
}
} catch (IOException e) {
e.printStackTrace();
}
name age
David 10
Gladis 20
Juan 30
name age
Nidia 15
Alexa 20
Mara 15
name age
Raul 34
Jhon 29
Thomy 33
讀取 Parquet 檔案¶
請檢查 資料集
處理有字典的資料¶
要讀取和寫入編碼的字典資料需要分別追蹤字典。
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryEncoder;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.FieldType;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
DictionaryEncoding dictionaryEncoding = new DictionaryEncoding(
/*id=*/666L, /*ordered=*/false, /*indexType=*/
new ArrowType.Int(8, true)
);
try (BufferAllocator root = new RootAllocator();
VarCharVector countries = new VarCharVector("country-dict", root);
VarCharVector appUserCountriesUnencoded = new VarCharVector(
"app-use-country-dict",
new FieldType(true, Types.MinorType.VARCHAR.getType(), dictionaryEncoding),
root)
) {
countries.allocateNew(10);
countries.set(0, "Andorra".getBytes(StandardCharsets.UTF_8));
countries.set(1, "Cuba".getBytes(StandardCharsets.UTF_8));
countries.set(2, "Grecia".getBytes(StandardCharsets.UTF_8));
countries.set(3, "Guinea".getBytes(StandardCharsets.UTF_8));
countries.set(4, "Islandia".getBytes(StandardCharsets.UTF_8));
countries.set(5, "Malta".getBytes(StandardCharsets.UTF_8));
countries.set(6, "Tailandia".getBytes(StandardCharsets.UTF_8));
countries.set(7, "Uganda".getBytes(StandardCharsets.UTF_8));
countries.set(8, "Yemen".getBytes(StandardCharsets.UTF_8));
countries.set(9, "Zambia".getBytes(StandardCharsets.UTF_8));
countries.setValueCount(10);
Dictionary countriesDictionary = new Dictionary(countries, dictionaryEncoding);
System.out.println("Dictionary: " + countriesDictionary);
appUserCountriesUnencoded.allocateNew(5);
appUserCountriesUnencoded.set(0, "Andorra".getBytes(StandardCharsets.UTF_8));
appUserCountriesUnencoded.set(1, "Guinea".getBytes(StandardCharsets.UTF_8));
appUserCountriesUnencoded.set(2, "Islandia".getBytes(StandardCharsets.UTF_8));
appUserCountriesUnencoded.set(3, "Malta".getBytes(StandardCharsets.UTF_8));
appUserCountriesUnencoded.set(4, "Uganda".getBytes(StandardCharsets.UTF_8));
appUserCountriesUnencoded.setValueCount(5);
System.out.println("Unencoded data: " + appUserCountriesUnencoded);
File file = new File("random_access_file_with_dictionary.arrow");
DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
provider.put(countriesDictionary);
try (FieldVector appUseCountryDictionaryEncoded = (FieldVector) DictionaryEncoder
.encode(appUserCountriesUnencoded, countriesDictionary);
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.of(appUseCountryDictionaryEncoded);
FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, provider, fileOutputStream.getChannel())
) {
System.out.println("Dictionary-encoded data: " +appUseCountryDictionaryEncoded);
System.out.println("Dictionary-encoded ID: " +appUseCountryDictionaryEncoded.getField().getDictionary().getId());
writer.start();
writer.writeBatch();
writer.end();
System.out.println("Record batches written: " + writer.getRecordBlocks().size() + ". Number of rows written: " + vectorSchemaRoot.getRowCount());
try(
BufferAllocator rootAllocator = new RootAllocator();
FileInputStream fileInputStream = new FileInputStream(file);
ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator)
){
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
FieldVector appUseCountryDictionaryEncodedRead = reader.getVectorSchemaRoot().getVector("app-use-country-dict");
DictionaryEncoding dictionaryEncodingRead = appUseCountryDictionaryEncodedRead.getField().getDictionary();
System.out.println("Dictionary-encoded ID recovered: " + dictionaryEncodingRead.getId());
Dictionary appUseCountryDictionaryRead = reader.getDictionaryVectors().get(dictionaryEncodingRead.getId());
System.out.println("Dictionary-encoded data recovered: " + appUseCountryDictionaryEncodedRead);
System.out.println("Dictionary recovered: " + appUseCountryDictionaryRead);
try (ValueVector readVector = DictionaryEncoder.decode(appUseCountryDictionaryEncodedRead, appUseCountryDictionaryRead)) {
System.out.println("Decoded data: " + readVector);
}
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
Dictionary: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia]
Unencoded data: [Andorra, Guinea, Islandia, Malta, Uganda]
Dictionary-encoded data: [0, 3, 4, 5, 7]
Dictionary-encoded ID: 666
Record batches written: 1. Number of rows written: 5
Dictionary-encoded ID recovered: 666
Dictionary-encoded data recovered: [0, 3, 4, 5, 7]
Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia]
Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda]