-
Notifications
You must be signed in to change notification settings - Fork 125
Expand file tree
/
Copy pathDirectBufferProxy.java
More file actions
196 lines (175 loc) · 6.54 KB
/
DirectBufferProxy.java
File metadata and controls
196 lines (175 loc) · 6.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
* Copyright © 2016-2025 The LmdbJava Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lmdbjava;
import static java.lang.ThreadLocal.withInitial;
import static java.nio.ByteBuffer.allocateDirect;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.util.Objects.requireNonNull;
import static org.lmdbjava.UnsafeAccess.UNSAFE;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayDeque;
import java.util.Comparator;
import jnr.ffi.Pointer;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
/**
* A buffer proxy backed by Agrona's {@link DirectBuffer}.
*
* <p>This class requires {@link UnsafeAccess} and Agrona must be in the classpath.
*/
public final class DirectBufferProxy extends BufferProxy<DirectBuffer> {
/**
* The {@link MutableDirectBuffer} proxy. Guaranteed to never be null, although a class
* initialization exception will occur if an attempt is made to access this field when unsafe or
* Agrona is unavailable.
*/
public static final BufferProxy<DirectBuffer> PROXY_DB = new DirectBufferProxy();
/**
* A thread-safe pool for a given length. If the buffer found is valid (ie not of a negative
* length) then that buffer is used. If no valid buffer is found, a new buffer is created.
*/
private static final ThreadLocal<ArrayDeque<DirectBuffer>> BUFFERS =
withInitial(() -> new ArrayDeque<>(16));
private static final ByteOrder NATIVE_ORDER = ByteOrder.nativeOrder();
private DirectBufferProxy() {}
/**
* Lexicographically compare two buffers.
*
* @param o1 left operand (required)
* @param o2 right operand (required)
* @return as specified by {@link Comparable} interface
*/
public static int compareLexicographically(final DirectBuffer o1, final DirectBuffer o2) {
requireNonNull(o1);
requireNonNull(o2);
final int minLength = Math.min(o1.capacity(), o2.capacity());
final int minWords = minLength / Long.BYTES;
for (int i = 0; i < minWords * Long.BYTES; i += Long.BYTES) {
final long lw = o1.getLong(i, BIG_ENDIAN);
final long rw = o2.getLong(i, BIG_ENDIAN);
final int diff = Long.compareUnsigned(lw, rw);
if (diff != 0) {
return diff;
}
}
for (int i = minWords * Long.BYTES; i < minLength; i++) {
final int lw = Byte.toUnsignedInt(o1.getByte(i));
final int rw = Byte.toUnsignedInt(o2.getByte(i));
final int result = Integer.compareUnsigned(lw, rw);
if (result != 0) {
return result;
}
}
return o1.capacity() - o2.capacity();
}
/**
* Buffer comparator specifically for 4/8 byte keys that are unsigned ints/longs, i.e. when using
* MDB_INTEGER_KEY/MDB_INTEGERDUP. Compares the buffers numerically.
*
* <p>Both buffer must have 4 or 8 bytes remaining
*
* @param o1 left operand (required)
* @param o2 right operand (required)
* @return as specified by {@link Comparable} interface
*/
public static int compareAsIntegerKeys(final DirectBuffer o1, final DirectBuffer o2) {
requireNonNull(o1);
requireNonNull(o2);
// Both buffers should be same len
final int len1 = o1.capacity();
final int len2 = o2.capacity();
if (len1 != len2) {
throw new RuntimeException(
"Length mismatch, len1: "
+ len1
+ ", len2: "
+ len2
+ ". Lengths must be identical and either 4 or 8 bytes.");
}
if (len1 == 8) {
final long lw = o1.getLong(0, NATIVE_ORDER);
final long rw = o2.getLong(0, NATIVE_ORDER);
return Long.compareUnsigned(lw, rw);
} else if (len1 == 4) {
final int lw = o1.getInt(0, NATIVE_ORDER);
final int rw = o2.getInt(0, NATIVE_ORDER);
return Integer.compareUnsigned(lw, rw);
} else {
// size_t and int are likely to be 8bytes and 4bytes respectively on 64bit.
// If 32bit then would be 4/2 respectively.
// Short.compareUnsigned is not available in Java8.
// For now just fall back to our standard comparator
return compareLexicographically(o1, o2);
}
}
@Override
protected DirectBuffer allocate() {
final ArrayDeque<DirectBuffer> q = BUFFERS.get();
final DirectBuffer buffer = q.poll();
if (buffer != null && buffer.capacity() >= 0) {
return buffer;
} else {
final ByteBuffer bb = allocateDirect(0);
return new UnsafeBuffer(bb);
}
}
@Override
public Comparator<DirectBuffer> getComparator(final DbiFlagSet dbiFlagSet) {
if (dbiFlagSet.areAnySet(DbiFlagSet.INTEGER_KEY_FLAGS)) {
return DirectBufferProxy::compareAsIntegerKeys;
} else {
return DirectBufferProxy::compareLexicographically;
}
}
@Override
protected void deallocate(final DirectBuffer buff) {
final ArrayDeque<DirectBuffer> q = BUFFERS.get();
q.offer(buff);
}
@Override
protected byte[] getBytes(final DirectBuffer buffer) {
final byte[] dest = new byte[buffer.capacity()];
buffer.getBytes(0, dest, 0, buffer.capacity());
return dest;
}
@Override
protected Pointer in(final DirectBuffer buffer, final Pointer ptr) {
final long ptrAddr = ptr.address();
final long addr = buffer.addressOffset();
final long size = buffer.capacity();
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA, addr);
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE, size);
return null;
}
@Override
protected Pointer in(final DirectBuffer buffer, final int size, final Pointer ptr) {
final long ptrAddr = ptr.address();
final long addr = buffer.addressOffset();
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA, addr);
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE, size);
return null;
}
@Override
protected DirectBuffer out(final DirectBuffer buffer, final Pointer ptr) {
final long ptrAddr = ptr.address();
final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA);
final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE);
buffer.wrap(addr, (int) size);
return buffer;
}
}