Skip to content

Commit ca9abc2

Browse files
committed
Add ORC reader and writer
1 parent cc40bce commit ca9abc2

4 files changed

Lines changed: 294 additions & 0 deletions

File tree

big-data/orc-demo/pom.xml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.javahelps.orc</groupId>
8+
<artifactId>orc-demo</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<build>
12+
<plugins>
13+
<plugin>
14+
<groupId>org.apache.maven.plugins</groupId>
15+
<artifactId>maven-compiler-plugin</artifactId>
16+
<configuration>
17+
<source>14</source>
18+
<target>14</target>
19+
</configuration>
20+
</plugin>
21+
</plugins>
22+
</build>
23+
24+
<dependencies>
25+
<dependency>
26+
<groupId>org.apache.orc</groupId>
27+
<artifactId>orc-core</artifactId>
28+
<version>1.6.3</version>
29+
</dependency>
30+
31+
<dependency>
32+
<groupId>org.apache.orc</groupId>
33+
<artifactId>orc-tools</artifactId>
34+
<version>1.6.3</version>
35+
</dependency>
36+
37+
<dependency>
38+
<groupId>org.apache.orc</groupId>
39+
<artifactId>orc-mapreduce</artifactId>
40+
<version>1.6.3</version>
41+
</dependency>
42+
43+
<dependency>
44+
<groupId>org.apache.hadoop</groupId>
45+
<artifactId>hadoop-common</artifactId>
46+
<version>3.3.0</version>
47+
</dependency>
48+
49+
<dependency>
50+
<groupId>com.google.guava</groupId>
51+
<artifactId>guava</artifactId>
52+
<version>29.0-jre</version>
53+
</dependency>
54+
</dependencies>
55+
56+
</project>
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.javahelps.orc;
2+
3+
import org.apache.hadoop.conf.Configuration;
4+
import org.apache.hadoop.fs.Path;
5+
import org.apache.hadoop.hive.ql.exec.vector.*;
6+
import org.apache.orc.OrcFile;
7+
import org.apache.orc.Reader;
8+
import org.apache.orc.RecordReader;
9+
import org.apache.orc.TypeDescription;
10+
11+
import java.io.IOException;
12+
import java.util.*;
13+
import java.util.function.BiFunction;
14+
15+
public class OrcFileReader {
16+
private static final int BATCH_SIZE = 2048;
17+
18+
public static void main(String[] args) throws IOException {
19+
List<Map<String, Object>> rows = read(new Configuration(), "orders.orc");
20+
for (Map<String, Object> row : rows) {
21+
System.out.println(row);
22+
}
23+
}
24+
25+
public static List<Map<String, Object>> read(Configuration configuration, String path)
26+
throws IOException {
27+
// Create a list to collect rows
28+
List<Map<String, Object>> rows = new LinkedList<>();
29+
30+
// Create an ORC reader using the Hadoop fileSystem and path
31+
try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
32+
// Extract the schema and metadata from the reader
33+
TypeDescription schema = reader.getSchema();
34+
List<String> fieldNames = schema.getFieldNames();
35+
List<TypeDescription> columnTypes = schema.getChildren();
36+
37+
// Select only order_id and price
38+
List<Integer> selectedColumns = new ArrayList<>();
39+
boolean[] columnsToRead = createColumnsToRead(schema, Set.of("order_id", "item_name", "price"), selectedColumns);
40+
41+
// Get the column vector references
42+
int size = fieldNames.size();
43+
BiFunction<ColumnVector, Integer, Object>[] mappers = new BiFunction[size];
44+
for (int i : selectedColumns) {
45+
TypeDescription type = columnTypes.get(i);
46+
mappers[i] = createColumnReader(type);
47+
}
48+
49+
// Pass the columnsToRead to the reader to read only the selected columns
50+
try (RecordReader records = reader.rows(reader.options().include(columnsToRead))) {
51+
// Read rows in batch for better performance.
52+
VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
53+
while (records.nextBatch(batch)) {
54+
for (int row = 0; row < batch.size; row++) {
55+
// Read rows from the batch
56+
Map<String, Object> map = new HashMap<>(selectedColumns.size());
57+
for (int col : selectedColumns) {
58+
ColumnVector columnVector = batch.cols[col];
59+
if (columnVector.isNull[row]) {
60+
map.put(fieldNames.get(col), null);
61+
} else {
62+
Object value = mappers[col].apply(columnVector, row);
63+
map.put(fieldNames.get(col), value);
64+
}
65+
}
66+
rows.add(map);
67+
}
68+
}
69+
}
70+
}
71+
return rows;
72+
}
73+
74+
public static boolean[] createColumnsToRead(TypeDescription schema, Set<String> columns, List<Integer> indices) {
75+
// Create an array of boolean
76+
boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1];
77+
List<String> fieldNames = schema.getFieldNames();
78+
List<TypeDescription> columnTypes = schema.getChildren();
79+
for (int i = 0; i < fieldNames.size(); i++) {
80+
if (columns.contains(fieldNames.get(i))) {
81+
indices.add(i);
82+
TypeDescription type = columnTypes.get(i);
83+
for (int id = type.getId(); id <= type.getMaximumId(); id++) {
84+
columnsToRead[id] = true;
85+
}
86+
}
87+
}
88+
return columnsToRead;
89+
}
90+
91+
public static BiFunction<ColumnVector, Integer, Object> createColumnReader(TypeDescription description) {
92+
// Reference: https://orc.apache.org/docs/core-java.html
93+
String type = description.getCategory().getName();
94+
BiFunction<ColumnVector, Integer, Object> mapper;
95+
if ("tinyint".equals(type)) {
96+
mapper = (columnVector, row) -> (byte) ((LongColumnVector) columnVector).vector[row];
97+
} else if ("smallint".equals(type)) {
98+
mapper = (columnVector, row) -> (short) ((LongColumnVector) columnVector).vector[row];
99+
} else if ("int".equals(type) || "date".equals(type)) {
100+
// Date is represented as int epoch days
101+
mapper = (columnVector, row) -> (int) ((LongColumnVector) columnVector).vector[row];
102+
} else if ("bigint".equals(type)) {
103+
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row];
104+
} else if ("boolean".equals(type)) {
105+
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row] == 1;
106+
} else if ("float".equals(type)) {
107+
mapper = (columnVector, row) -> (float) ((DoubleColumnVector) columnVector).vector[row];
108+
} else if ("double".equals(type)) {
109+
mapper = (columnVector, row) -> ((DoubleColumnVector) columnVector).vector[row];
110+
} else if ("decimal".equals(type)) {
111+
mapper = (columnVector, row) -> ((DecimalColumnVector) columnVector).vector[row].getHiveDecimal().bigDecimalValue();
112+
} else if ("string".equals(type) || type.startsWith("varchar")) {
113+
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row);
114+
} else if ("char".equals(type)) {
115+
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row).charAt(0);
116+
} else if ("timestamp".equals(type)) {
117+
mapper = (columnVector, row) -> ((TimestampColumnVector) columnVector).getTimestampAsLong(row);
118+
} else {
119+
throw new RuntimeException("Unsupported type " + type);
120+
}
121+
return mapper;
122+
}
123+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package com.javahelps.orc;
2+
3+
import org.apache.hadoop.conf.Configuration;
4+
import org.apache.hadoop.fs.Path;
5+
import org.apache.hadoop.hive.common.type.HiveDecimal;
6+
import org.apache.hadoop.hive.ql.exec.vector.*;
7+
import org.apache.orc.OrcFile;
8+
import org.apache.orc.TypeDescription;
9+
import org.apache.orc.Writer;
10+
11+
import java.io.IOException;
12+
import java.math.BigDecimal;
13+
import java.nio.charset.StandardCharsets;
14+
import java.sql.Timestamp;
15+
import java.util.ArrayList;
16+
import java.util.LinkedList;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.function.BiConsumer;
20+
21+
public class OrcFileWriter {
22+
public static void main(String[] args) throws IOException {
23+
List<Map<String, Object>> data = new LinkedList<>();
24+
data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f));
25+
data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f));
26+
data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f));
27+
28+
write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data);
29+
30+
System.out.println("Done");
31+
}
32+
33+
public static void write(Configuration configuration, String path, String struct, List<Map<String, Object>> data) throws IOException {
34+
// Create the schemas and extract metadata from the schema
35+
TypeDescription schema = TypeDescription.fromString(struct);
36+
List<String> fieldNames = schema.getFieldNames();
37+
List<TypeDescription> columnTypes = schema.getChildren();
38+
39+
// Create a row batch
40+
VectorizedRowBatch batch = schema.createRowBatch();
41+
42+
// Get the column vector references
43+
List<BiConsumer<Integer, Object>> consumers = new ArrayList<>(columnTypes.size());
44+
for (int i = 0; i < columnTypes.size(); i++) {
45+
TypeDescription type = columnTypes.get(i);
46+
ColumnVector vector = batch.cols[i];
47+
consumers.add(createColumnWriter(type, vector));
48+
}
49+
50+
// Open a writer to write the data to an ORC fle
51+
try (Writer writer = OrcFile.createWriter(new Path(path),
52+
OrcFile.writerOptions(configuration)
53+
.setSchema(schema))) {
54+
for (Map<String, Object> row : data) {
55+
// batch.size should be increased externally
56+
int rowNum = batch.size++;
57+
58+
// Write each column to the associated column vector
59+
for (int i = 0; i < fieldNames.size(); i++) {
60+
consumers.get(i).accept(rowNum, row.get(fieldNames.get(i)));
61+
}
62+
63+
// If the buffer is full, write it to disk
64+
if (batch.size == batch.getMaxSize()) {
65+
writer.addRowBatch(batch);
66+
batch.reset();
67+
}
68+
}
69+
70+
// Check unwritten rows before closing
71+
if (batch.size != 0) {
72+
writer.addRowBatch(batch);
73+
}
74+
}
75+
}
76+
77+
public static BiConsumer<Integer, Object> createColumnWriter(TypeDescription description, ColumnVector columnVector) {
78+
String type = description.getCategory().getName();
79+
BiConsumer<Integer, Object> consumer;
80+
if ("tinyint".equals(type)) {
81+
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
82+
} else if ("smallint".equals(type)) {
83+
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
84+
} else if ("int".equals(type) || "date".equals(type)) {
85+
// Date is represented as int epoch days
86+
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
87+
} else if ("bigint".equals(type)) {
88+
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
89+
} else if ("boolean".equals(type)) {
90+
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = (Boolean) val ? 1 : 0;
91+
} else if ("float".equals(type)) {
92+
consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).floatValue();
93+
} else if ("double".equals(type)) {
94+
consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).doubleValue();
95+
} else if ("decimal".equals(type)) {
96+
consumer = (row, val) -> ((DecimalColumnVector) columnVector).vector[row].set(HiveDecimal.create((BigDecimal) val));
97+
} else if ("string".equals(type) || type.startsWith("varchar") || "char".equals(type)) {
98+
consumer = (row, val) -> {
99+
byte[] buffer = val.toString().getBytes(StandardCharsets.UTF_8);
100+
((BytesColumnVector) columnVector).setRef(row, buffer, 0, buffer.length);
101+
};
102+
} else if ("timestamp".equals(type)) {
103+
consumer = (row, val) -> ((TimestampColumnVector) columnVector).set(row, (Timestamp) val);
104+
} else {
105+
throw new RuntimeException("Unsupported type " + type);
106+
}
107+
return consumer;
108+
}
109+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
log4j.rootLogger=ERROR, CA
2+
3+
log4j.appender.CA=org.apache.log4j.ConsoleAppender
4+
5+
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
6+
log4j.appender.CA.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

0 commit comments

Comments
 (0)