Skip to content

Commit b48d08e

Browse files
committed
Add new sort order for int96 timestamps
1 parent 65f7ade commit b48d08e

13 files changed

Lines changed: 586 additions & 11 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class ParquetProperties {
6666
public static final int DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER = 5;
6767
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
6868
public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true;
69+
public static final boolean DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED = false;
6970

7071
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
7172

@@ -120,6 +121,7 @@ public static WriterVersion fromString(String name) {
120121
private final int statisticsTruncateLength;
121122
private final boolean statisticsEnabled;
122123
private final boolean sizeStatisticsEnabled;
124+
private final boolean int96TimestampStatisticsEnabled;
123125

124126
// The expected NDV (number of distinct values) for each columns
125127
private final ColumnProperty<Long> bloomFilterNDVs;
@@ -154,6 +156,7 @@ private ParquetProperties(Builder builder) {
154156
this.statisticsTruncateLength = builder.statisticsTruncateLength;
155157
this.statisticsEnabled = builder.statisticsEnabled;
156158
this.sizeStatisticsEnabled = builder.sizeStatisticsEnabled;
159+
this.int96TimestampStatisticsEnabled = builder.int96TimestampStatisticsEnabled;
157160
this.bloomFilterNDVs = builder.bloomFilterNDVs.build();
158161
this.bloomFilterFPPs = builder.bloomFilterFPPs.build();
159162
this.bloomFilterEnabled = builder.bloomFilterEnabled.build();
@@ -370,6 +373,10 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) {
370373
return sizeStatisticsEnabled;
371374
}
372375

376+
public boolean getInt96TimestampStatisticsEnabled() {
377+
return int96TimestampStatisticsEnabled;
378+
}
379+
373380
@Override
374381
public String toString() {
375382
return "Parquet page size to " + getPageSizeThreshold() + '\n'
@@ -406,6 +413,7 @@ public static class Builder {
406413
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
407414
private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED;
408415
private boolean sizeStatisticsEnabled = DEFAULT_SIZE_STATISTICS_ENABLED;
416+
private boolean int96TimestampStatisticsEnabled = DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED;
409417
private final ColumnProperty.Builder<Long> bloomFilterNDVs;
410418
private final ColumnProperty.Builder<Double> bloomFilterFPPs;
411419
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
@@ -756,6 +764,19 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) {
756764
return this;
757765
}
758766

767+
/**
768+
* Sets whether min/max statistics are collected and written for INT96 columns using the
769+
* chronological INT96_TIMESTAMP_ORDER column order (disabled by default). When enabled, INT96
770+
* columns are tagged with INT96_TIMESTAMP_ORDER in the file footer.
771+
*
772+
* @param enabled whether to collect and write INT96 timestamp statistics
773+
* @return this builder for method chaining
774+
*/
775+
public Builder withInt96TimestampStatisticsEnabled(boolean enabled) {
776+
this.int96TimestampStatisticsEnabled = enabled;
777+
return this;
778+
}
779+
759780
public ParquetProperties build() {
760781
ParquetProperties properties = new ParquetProperties(this);
761782
// we pass a constructed but uninitialized factory to ParquetProperties above as currently

parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,18 @@ public enum ColumnOrderName {
3636
/**
3737
* Type defined order meaning that the comparison order of the elements are based on its type.
3838
*/
39-
TYPE_DEFINED_ORDER
39+
TYPE_DEFINED_ORDER,
40+
/**
41+
* Chronological order for INT96 timestamps: values are compared by the Julian day (the last 4
42+
* bytes, as a little-endian signed int32), then by the nanoseconds within the day (the first 8
43+
* bytes, as a little-endian signed int64). Only supported for the INT96 physical type.
44+
*/
45+
INT96_TIMESTAMP_ORDER
4046
}
4147

4248
private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED);
4349
private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER);
50+
private static final ColumnOrder INT96_TIMESTAMP_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.INT96_TIMESTAMP_ORDER);
4451

4552
/**
4653
* @return a {@link ColumnOrder} instance representing an undefined order
@@ -58,6 +65,14 @@ public static ColumnOrder typeDefined() {
5865
return TYPE_DEFINED_COLUMN_ORDER;
5966
}
6067

68+
/**
69+
* @return a {@link ColumnOrder} instance representing the chronological order of INT96 timestamps
70+
* @see ColumnOrderName#INT96_TIMESTAMP_ORDER
71+
*/
72+
public static ColumnOrder int96TimestampOrder() {
73+
return INT96_TIMESTAMP_COLUMN_ORDER;
74+
}
75+
6176
private final ColumnOrderName columnOrderName;
6277

6378
private ColumnOrder(ColumnOrderName columnOrderName) {

parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.Serializable;
2222
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
2324
import java.util.Comparator;
2425
import org.apache.parquet.io.api.Binary;
2526

@@ -293,4 +294,36 @@ public String toString() {
293294
return "BINARY_AS_FLOAT16_COMPARATOR";
294295
}
295296
};
297+
298+
/*
299+
* Comparator for two timestamps encoded as INT96 (12-byte little-endian) binary.
300+
* Layout: first 8 bytes = nanoseconds within the day, last 4 bytes = Julian day.
301+
*
302+
* Two-level comparison, matching the INT96 timestamp sort order:
303+
* 1. Compare the last 4 bytes (Julian day) as a signed little-endian int32.
304+
* 2. If equal, compare the first 8 bytes (nanos) as a signed little-endian int64.
305+
*/
306+
static final PrimitiveComparator<Binary> BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() {
307+
@Override
308+
int compareBinary(Binary b1, Binary b2) {
309+
if (b1.length() != 12 || b2.length() != 12) {
310+
throw new IllegalArgumentException(
311+
"INT96 binary length must be 12, got " + b1.length() + " and " + b2.length());
312+
}
313+
314+
ByteBuffer bb1 = b1.toByteBuffer().slice();
315+
ByteBuffer bb2 = b2.toByteBuffer().slice();
316+
bb1.order(ByteOrder.LITTLE_ENDIAN);
317+
bb2.order(ByteOrder.LITTLE_ENDIAN);
318+
319+
int result = Integer.compare(bb1.getInt(8), bb2.getInt(8));
320+
if (result != 0) return result;
321+
return Long.compare(bb1.getLong(0), bb2.getLong(0));
322+
}
323+
324+
@Override
325+
public String toString() {
326+
return "BINARY_AS_INT96_TIMESTAMP_COMPARATOR";
327+
}
328+
};
296329
}

parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,9 +622,15 @@ public PrimitiveType(
622622
private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
623623
if (primitive == PrimitiveTypeName.INT96) {
624624
Preconditions.checkArgument(
625-
columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
625+
columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED
626+
|| columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER,
626627
"The column order %s is not supported by INT96",
627628
columnOrder);
629+
} else {
630+
Preconditions.checkArgument(
631+
columnOrder.getColumnOrderName() != ColumnOrderName.INT96_TIMESTAMP_ORDER,
632+
"The column order %s is only supported by INT96",
633+
columnOrder);
628634
}
629635
if (getLogicalTypeAnnotation() != null) {
630636
Preconditions.checkArgument(
@@ -655,6 +661,15 @@ public PrimitiveType withLogicalTypeAnnotation(LogicalTypeAnnotation logicalType
655661
return new PrimitiveType(getRepetition(), primitive, length, getName(), logicalType, getId());
656662
}
657663

664+
/**
665+
* @param columnOrder the column order
666+
* @return a new PrimitiveType with the same fields and the given column order
667+
*/
668+
public PrimitiveType withColumnOrder(ColumnOrder columnOrder) {
669+
return new PrimitiveType(
670+
getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), getId(), columnOrder);
671+
}
672+
658673
/**
659674
* @return the primitive type
660675
*/
@@ -869,6 +884,9 @@ protected Type union(Type toMerge, boolean strict) {
869884
*/
870885
@SuppressWarnings("unchecked")
871886
public <T> PrimitiveComparator<T> comparator() {
887+
if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER) {
888+
return (PrimitiveComparator<T>) PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR;
889+
}
872890
return (PrimitiveComparator<T>) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation());
873891
}
874892

parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.parquet.schema;
2020

2121
import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR;
22+
import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR;
2223
import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
2324
import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR;
2425
import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR;
@@ -33,8 +34,12 @@
3334

3435
import java.math.BigInteger;
3536
import java.nio.ByteBuffer;
37+
import java.time.LocalDateTime;
3638
import java.util.ArrayList;
39+
import java.util.Arrays;
3740
import java.util.List;
41+
import java.util.function.Function;
42+
import org.apache.parquet.example.data.simple.NanoTime;
3843
import org.apache.parquet.io.api.Binary;
3944
import org.junit.Test;
4045

@@ -297,6 +302,60 @@ public void testBinaryAsSignedIntegerComparatorWithEquals() {
297302
}
298303
}
299304

305+
private static Binary int96(int julianDay, long nanosOfDay) {
306+
return new NanoTime(julianDay, nanosOfDay).toBinary();
307+
}
308+
309+
private static Binary timestampToInt96(String timestamp) {
310+
LocalDateTime dt = LocalDateTime.parse(timestamp);
311+
int julianDay = (int) (dt.toLocalDate().toEpochDay() + 2440588);
312+
return new NanoTime(julianDay, dt.toLocalTime().toNanoOfDay()).toBinary();
313+
}
314+
315+
@Test
316+
public void testInt96TimestampComparator() {
317+
Binary[] valuesInAscendingOrder = {
318+
int96(Integer.MIN_VALUE, 0), // most negative julian day
319+
int96(-1, 86_399_999_999_999L), // negative julian days sort before day 0
320+
int96(0, 0), // start of the julian period
321+
int96(0, 86_399_999_999_999L), // same day, later time of day
322+
timestampToInt96("1968-05-23T00:00:00.000000123"), // pre-epoch but positive julian day
323+
timestampToInt96("2020-01-01T12:00:00"),
324+
timestampToInt96("2020-02-01T11:00:00"), // later day even though earlier time of day
325+
timestampToInt96("2020-02-01T11:00:00.000000001"), // nanos tie-break
326+
int96(Integer.MAX_VALUE, 86_399_999_999_999L)
327+
};
328+
329+
// The same value in different Binary representations must compare identically; the offset
330+
// variant guards against absolute reads not being relative to the value's start
331+
List<Function<Binary, Binary>> representations = List.of(
332+
b -> b,
333+
b -> Binary.fromReusedByteArray(b.getBytes()),
334+
b -> Binary.fromConstantByteArray(b.getBytes()),
335+
b -> {
336+
byte[] bytes = b.getBytes();
337+
byte[] padded = new byte[bytes.length + 20];
338+
Arrays.fill(padded, (byte) 0xAA);
339+
System.arraycopy(bytes, 0, padded, 10, bytes.length);
340+
return Binary.fromReusedByteArray(padded, 10, bytes.length);
341+
});
342+
343+
for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
344+
for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
345+
for (Function<Binary, Binary> fi : representations) {
346+
for (Function<Binary, Binary> fj : representations) {
347+
Binary bi = fi.apply(valuesInAscendingOrder[i]);
348+
Binary bj = fj.apply(valuesInAscendingOrder[j]);
349+
assertEquals(
350+
"comparing value " + i + " to value " + j,
351+
Integer.signum(Integer.compare(i, j)),
352+
Integer.signum(BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(bi, bj)));
353+
}
354+
}
355+
}
356+
}
357+
}
358+
300359
@Test
301360
public void testFloat16Comparator() {
302361
Binary[] valuesInAscendingOrder = {

parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
2626
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT;
2727
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
28+
import static org.apache.parquet.hadoop.ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED;
2829
import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
2930
import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
3031
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
@@ -291,6 +292,10 @@ public Builder(ParquetConfiguration conf) {
291292
if (badRecordThresh != null) {
292293
set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
293294
}
295+
String readInt96TimestampStats = conf.get(INT96_TIMESTAMP_STATISTICS_READING_ENABLED);
296+
if (readInt96TimestampStats != null) {
297+
set(INT96_TIMESTAMP_STATISTICS_READING_ENABLED, readInt96TimestampStats);
298+
}
294299
}
295300

296301
public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {

parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.apache.parquet.format.GeographyType;
8787
import org.apache.parquet.format.GeometryType;
8888
import org.apache.parquet.format.GeospatialStatistics;
89+
import org.apache.parquet.format.Int96TimestampOrder;
8990
import org.apache.parquet.format.IntType;
9091
import org.apache.parquet.format.KeyValue;
9192
import org.apache.parquet.format.LogicalType;
@@ -111,6 +112,7 @@
111112
import org.apache.parquet.format.Uncompressed;
112113
import org.apache.parquet.format.VariantType;
113114
import org.apache.parquet.format.XxHash;
115+
import org.apache.parquet.hadoop.ParquetInputFormat;
114116
import org.apache.parquet.hadoop.metadata.BlockMetaData;
115117
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
116118
import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -143,6 +145,7 @@
143145
public class ParquetMetadataConverter {
144146

145147
private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new TypeDefinedOrder();
148+
private static final Int96TimestampOrder INT96_TIMESTAMP_ORDER = new Int96TimestampOrder();
146149
public static final MetadataFilter NO_FILTER = new NoFilter();
147150
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
148151
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
@@ -278,11 +281,16 @@ public FileMetaData toParquetMetadata(
278281

279282
private List<ColumnOrder> getColumnOrders(MessageType schema) {
280283
List<ColumnOrder> columnOrders = new ArrayList<>();
281-
// Currently, only TypeDefinedOrder is supported, so we create a column order for each columns with
282-
// TypeDefinedOrder even if some types (e.g. INT96) have undefined column orders.
283-
for (int i = 0, n = schema.getPaths().size(); i < n; ++i) {
284+
// Columns with the INT96_TIMESTAMP_ORDER column order are tagged as such; all other columns are
285+
// tagged with TypeDefinedOrder even if some types have undefined column orders.
286+
for (String[] path : schema.getPaths()) {
284287
ColumnOrder columnOrder = new ColumnOrder();
285-
columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER);
288+
if (schema.getType(path).asPrimitiveType().columnOrder().getColumnOrderName() ==
289+
ColumnOrderName.INT96_TIMESTAMP_ORDER) {
290+
columnOrder.setINT96_TIMESTAMP_ORDER(INT96_TIMESTAMP_ORDER);
291+
} else {
292+
columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER);
293+
}
286294
columnOrders.add(columnOrder);
287295
}
288296
return columnOrders;
@@ -891,7 +899,9 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength,
891899
}
892900

893901
private static boolean isMinMaxStatsSupported(PrimitiveType type) {
894-
return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER;
902+
ColumnOrderName name = type.columnOrder().getColumnOrderName();
903+
return name == ColumnOrderName.TYPE_DEFINED_ORDER
904+
|| name == ColumnOrderName.INT96_TIMESTAMP_ORDER;
895905
}
896906

897907
/**
@@ -2034,6 +2044,11 @@ private void buildChildren(
20342044
|| schemaElement.converted_type == ConvertedType.INTERVAL)) {
20352045
columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
20362046
}
2047+
// INT96_TIMESTAMP_ORDER is only valid for INT96 columns; ignore it anywhere else
2048+
if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER
2049+
&& schemaElement.type != Type.INT96) {
2050+
columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
2051+
}
20372052
primitiveBuilder.columnOrder(columnOrder);
20382053
}
20392054
childBuilder = primitiveBuilder;
@@ -2086,14 +2101,22 @@ Repetition fromParquetRepetition(FieldRepetitionType repetition) {
20862101
return Repetition.valueOf(repetition.name());
20872102
}
20882103

2089-
private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) {
2104+
private org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) {
20902105
if (columnOrder.isSetTYPE_ORDER()) {
20912106
return org.apache.parquet.schema.ColumnOrder.typeDefined();
20922107
}
2108+
if (columnOrder.isSetINT96_TIMESTAMP_ORDER() && readInt96TimestampStatisticsEnabled()) {
2109+
return org.apache.parquet.schema.ColumnOrder.int96TimestampOrder();
2110+
}
20932111
// The column order is not yet supported by this API
20942112
return org.apache.parquet.schema.ColumnOrder.undefined();
20952113
}
20962114

2115+
private boolean readInt96TimestampStatisticsEnabled() {
2116+
return options == null
2117+
|| options.isEnabled(ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED, true);
2118+
}
2119+
20972120
@Deprecated
20982121
public void writeDataPageHeader(
20992122
int uncompressedSize,

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public InternalParquetRecordWriter(
8989
ParquetProperties props) {
9090
this.parquetFileWriter = parquetFileWriter;
9191
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
92-
this.schema = schema;
92+
this.schema = ParquetFileWriter.applyInt96TimestampOrder(schema, props);
9393
this.extraMetaData = extraMetaData;
9494
this.rowGroupSizeThreshold = rowGroupSize;
9595
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();

0 commit comments

Comments
 (0)