读取和写入数据

The Arrow IPC 格式 定义了两种用于序列化 Arrow 数据的二进制格式:流格式和文件格式(或随机访问格式)。这些文件在读取时可以直接内存映射。

写入

写入文件和流格式都使用相同的 API。

写入随机访问文件

写入 - 输出到文件

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;

try (BufferAllocator allocator = new RootAllocator()) {
    Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
    Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
    Schema schemaPerson = new Schema(asList(name, age));
    try(
        VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, allocator)
    ){
        VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
        nameVector.allocateNew(3);
        nameVector.set(0, "David".getBytes());
        nameVector.set(1, "Gladis".getBytes());
        nameVector.set(2, "Juan".getBytes());
        IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
        ageVector.allocateNew(3);
        ageVector.set(0, 10);
        ageVector.set(1, 20);
        ageVector.set(2, 30);
        vectorSchemaRoot.setRowCount(3);
        File file = new File("randon_access_to_file.arrow");
        try (
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel())
        ) {
            writer.start();
            writer.writeBatch();
            writer.end();
            System.out.println("Record batches written: " + writer.getRecordBlocks().size() + ". Number of rows written: " + vectorSchemaRoot.getRowCount());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Record batches written: 1. Number of rows written: 3

写入 - 输出到缓冲区

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;

try (BufferAllocator allocator = new RootAllocator()) {
    Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
    Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
    Schema schemaPerson = new Schema(asList(name, age));
    try(
        VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, allocator)
    ){
        VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
        nameVector.allocateNew(3);
        nameVector.set(0, "David".getBytes());
        nameVector.set(1, "Gladis".getBytes());
        nameVector.set(2, "Juan".getBytes());
        IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
        ageVector.allocateNew(3);
        ageVector.set(0, 10);
        ageVector.set(1, 20);
        ageVector.set(2, 30);
        vectorSchemaRoot.setRowCount(3);
        try (
            ByteArrayOutputStream out = new ByteArrayOutputStream();
             ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, null, Channels.newChannel(out))
        ) {
            writer.start();
            writer.writeBatch();

            System.out.println("Record batches written: " + writer.getRecordBlocks().size() +
                    ". Number of rows written: " + vectorSchemaRoot.getRowCount());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Record batches written: 1. Number of rows written: 3

写入流格式

写入 - 输出到文件

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;

try (BufferAllocator rootAllocator = new RootAllocator()) {
    Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
    Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
    Schema schemaPerson = new Schema(asList(name, age));
    try(
        VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, rootAllocator)
    ){
        VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
        nameVector.allocateNew(3);
        nameVector.set(0, "David".getBytes());
        nameVector.set(1, "Gladis".getBytes());
        nameVector.set(2, "Juan".getBytes());
        IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
        ageVector.allocateNew(3);
        ageVector.set(0, 10);
        ageVector.set(1, 20);
        ageVector.set(2, 30);
        vectorSchemaRoot.setRowCount(3);
        File file = new File("streaming_to_file.arrow");
        try (
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, fileOutputStream.getChannel())
        ){
            writer.start();
            writer.writeBatch();
            System.out.println("Number of rows written: " + vectorSchemaRoot.getRowCount());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Number of rows written: 3

写入 - 输出到缓冲区

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.VectorSchemaRoot;
import static java.util.Arrays.asList;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;

try (BufferAllocator rootAllocator = new RootAllocator()) {
    Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
    Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
    Schema schemaPerson = new Schema(asList(name, age));
    try(
        VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, rootAllocator)
    ){
        VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name");
        nameVector.allocateNew(3);
        nameVector.set(0, "David".getBytes());
        nameVector.set(1, "Gladis".getBytes());
        nameVector.set(2, "Juan".getBytes());
        IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
        ageVector.allocateNew(3);
        ageVector.set(0, 10);
        ageVector.set(1, 20);
        ageVector.set(2, 30);
        vectorSchemaRoot.setRowCount(3);
        try (
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, Channels.newChannel(out))
        ){
            writer.start();
            writer.writeBatch();
            System.out.println("Number of rows written: " + vectorSchemaRoot.getRowCount());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Number of rows written: 3

读取

读取随机访问格式和流格式都提供相同的 API,不同之处在于随机访问文件还提供按索引访问任何记录批次的访问方式。

读取随机访问文件

读取 - 从文件

我们提供了一个带有自动生成的 Arrow 文件的路径,用于测试目的,您可以根据需要更改它。

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.VectorSchemaRoot;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

File file = new File("./thirdpartydeps/arrowfiles/random_access.arrow");
try(
    BufferAllocator rootAllocator = new RootAllocator();
    FileInputStream fileInputStream = new FileInputStream(file);
    ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator)
){
    System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
    for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
        reader.loadRecordBatch(arrowBlock);
        VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
        System.out.print(vectorSchemaRootRecover.contentToTSVString());
    }
} catch (IOException e) {
    e.printStackTrace();
}
Record batches in file: 3
name    age
David    10
Gladis    20
Juan    30
name    age
Nidia    15
Alexa    20
Mara    15
name    age
Raul    34
Jhon    29
Thomy    33

读取 - 从缓冲区

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

Path path = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow");
try(
    BufferAllocator rootAllocator = new RootAllocator();
    ArrowFileReader reader = new ArrowFileReader(new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(
                                        Files.readAllBytes(path))), rootAllocator)
) {
    System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
    for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
        reader.loadRecordBatch(arrowBlock);
        VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
        System.out.print(vectorSchemaRootRecover.contentToTSVString());
    }
} catch (IOException e) {
    e.printStackTrace();
}
Record batches in file: 3
name    age
David    10
Gladis    20
Juan    30
name    age
Nidia    15
Alexa    20
Mara    15
name    age
Raul    34
Jhon    29
Thomy    33

读取流格式

读取 - 从文件

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.VectorSchemaRoot;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

File file = new File("./thirdpartydeps/arrowfiles/streaming.arrow");
try(
    BufferAllocator rootAllocator = new RootAllocator();
    FileInputStream fileInputStreamForStream = new FileInputStream(file);
    ArrowStreamReader reader = new ArrowStreamReader(fileInputStreamForStream, rootAllocator)
) {
    while (reader.loadNextBatch()) {
        VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
        System.out.print(vectorSchemaRootRecover.contentToTSVString());
    }
} catch (IOException e) {
    e.printStackTrace();
}
name    age
David    10
Gladis    20
Juan    30
name    age
Nidia    15
Alexa    20
Mara    15
name    age
Raul    34
Jhon    29
Thomy    33

读取 - 从缓冲区

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

Path path = Paths.get("./thirdpartydeps/arrowfiles/streaming.arrow");
try(
    BufferAllocator rootAllocator = new RootAllocator();
    ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(
                                    Files.readAllBytes(path)), rootAllocator)
) {
    while(reader.loadNextBatch()){
        System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
    }
} catch (IOException e) {
    e.printStackTrace();
}
name    age
David    10
Gladis    20
Juan    30
name    age
Nidia    15
Alexa    20
Mara    15
name    age
Raul    34
Jhon    29
Thomy    33

读取 Parquet 文件

请查看 数据集

处理使用字典的数据

读取和写入字典编码数据需要单独跟踪字典。

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryEncoder;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.FieldType;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

DictionaryEncoding dictionaryEncoding = new DictionaryEncoding(
        /*id=*/666L, /*ordered=*/false, /*indexType=*/
        new ArrowType.Int(8, true)
);
try (BufferAllocator root = new RootAllocator();
     VarCharVector countries = new VarCharVector("country-dict", root);
     VarCharVector appUserCountriesUnencoded = new VarCharVector(
             "app-use-country-dict",
             new FieldType(true, Types.MinorType.VARCHAR.getType(), dictionaryEncoding),
             root)
) {
    countries.allocateNew(10);
    countries.set(0, "Andorra".getBytes(StandardCharsets.UTF_8));
    countries.set(1, "Cuba".getBytes(StandardCharsets.UTF_8));
    countries.set(2, "Grecia".getBytes(StandardCharsets.UTF_8));
    countries.set(3, "Guinea".getBytes(StandardCharsets.UTF_8));
    countries.set(4, "Islandia".getBytes(StandardCharsets.UTF_8));
    countries.set(5, "Malta".getBytes(StandardCharsets.UTF_8));
    countries.set(6, "Tailandia".getBytes(StandardCharsets.UTF_8));
    countries.set(7, "Uganda".getBytes(StandardCharsets.UTF_8));
    countries.set(8, "Yemen".getBytes(StandardCharsets.UTF_8));
    countries.set(9, "Zambia".getBytes(StandardCharsets.UTF_8));
    countries.setValueCount(10);

    Dictionary countriesDictionary = new Dictionary(countries, dictionaryEncoding);
    System.out.println("Dictionary: " + countriesDictionary);

    appUserCountriesUnencoded.allocateNew(5);
    appUserCountriesUnencoded.set(0, "Andorra".getBytes(StandardCharsets.UTF_8));
    appUserCountriesUnencoded.set(1, "Guinea".getBytes(StandardCharsets.UTF_8));
    appUserCountriesUnencoded.set(2, "Islandia".getBytes(StandardCharsets.UTF_8));
    appUserCountriesUnencoded.set(3, "Malta".getBytes(StandardCharsets.UTF_8));
    appUserCountriesUnencoded.set(4, "Uganda".getBytes(StandardCharsets.UTF_8));
    appUserCountriesUnencoded.setValueCount(5);
    System.out.println("Unencoded data: " + appUserCountriesUnencoded);

    File file = new File("random_access_file_with_dictionary.arrow");
    DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
    provider.put(countriesDictionary);
    try (FieldVector appUseCountryDictionaryEncoded = (FieldVector) DictionaryEncoder
            .encode(appUserCountriesUnencoded, countriesDictionary);
         VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.of(appUseCountryDictionaryEncoded);
         FileOutputStream fileOutputStream = new FileOutputStream(file);
         ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, provider, fileOutputStream.getChannel())
    ) {
        System.out.println("Dictionary-encoded data: " +appUseCountryDictionaryEncoded);
        System.out.println("Dictionary-encoded ID: " +appUseCountryDictionaryEncoded.getField().getDictionary().getId());
        writer.start();
        writer.writeBatch();
        writer.end();
        System.out.println("Record batches written: " + writer.getRecordBlocks().size() + ". Number of rows written: " + vectorSchemaRoot.getRowCount());
        try(
            BufferAllocator rootAllocator = new RootAllocator();
            FileInputStream fileInputStream = new FileInputStream(file);
            ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator)
        ){
            for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
                reader.loadRecordBatch(arrowBlock);
                FieldVector appUseCountryDictionaryEncodedRead = reader.getVectorSchemaRoot().getVector("app-use-country-dict");
                DictionaryEncoding dictionaryEncodingRead = appUseCountryDictionaryEncodedRead.getField().getDictionary();
                System.out.println("Dictionary-encoded ID recovered: " + dictionaryEncodingRead.getId());
                Dictionary appUseCountryDictionaryRead = reader.getDictionaryVectors().get(dictionaryEncodingRead.getId());
                System.out.println("Dictionary-encoded data recovered: " + appUseCountryDictionaryEncodedRead);
                System.out.println("Dictionary recovered: " + appUseCountryDictionaryRead);
                try (ValueVector readVector = DictionaryEncoder.decode(appUseCountryDictionaryEncodedRead, appUseCountryDictionaryRead)) {
                    System.out.println("Decoded data: " + readVector);
                }
            }
        }
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
Dictionary: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia]
Unencoded data: [Andorra, Guinea, Islandia, Malta, Uganda]
Dictionary-encoded data: [0, 3, 4, 5, 7]
Dictionary-encoded ID: 666
Record batches written: 1. Number of rows written: 5
Dictionary-encoded ID recovered: 666
Dictionary-encoded data recovered: [0, 3, 4, 5, 7]
Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia]
Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda]