Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 24 additions & 40 deletions src/main/java/org/lmdbjava/ByteBufProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@

import static io.netty.buffer.PooledByteBufAllocator.DEFAULT;
import static java.lang.Class.forName;
import static java.lang.ThreadLocal.withInitial;
import static org.lmdbjava.UnsafeAccess.UNSAFE;

import java.lang.reflect.Field;
import java.util.ArrayDeque;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import jnr.ffi.Pointer;

/**
Expand All @@ -47,37 +46,34 @@ public final class ByteBufProxy extends BufferProxy<ByteBuf> {
*/
public static final BufferProxy<ByteBuf> PROXY_NETTY = new ByteBufProxy();

private static final long ADDRESS_OFFSET;

/**
* A thread-safe pool for a given length. If the buffer found is bigger then
* the buffer in the pool creates a new buffer. If no buffer is found creates
* a new buffer.
*/
private static final ThreadLocal<ArrayDeque<ByteBuf>> BUFFERS = withInitial(()
-> new ArrayDeque<>(16));

private static final int BUFFER_RETRIES = 10;
private static final String FIELD_NAME_ADDRESS = "memoryAddress";
private static final String FIELD_NAME_LENGTH = "length";
private static final long LENGTH_OFFSET;
private static final String NAME = "io.netty.buffer.PooledUnsafeDirectByteBuf";
private final long lengthOffset;
private final long addressOffset;

private final PooledByteBufAllocator nettyAllocator;

private ByteBufProxy() {
this(DEFAULT);
}

public ByteBufProxy(final PooledByteBufAllocator allocator) {
this.nettyAllocator = allocator;

static {
try {
createBuffer();
final ByteBuf initBuf = this.allocate();
initBuf.release();
final Field address = findField(NAME, FIELD_NAME_ADDRESS);
final Field length = findField(NAME, FIELD_NAME_LENGTH);
ADDRESS_OFFSET = UNSAFE.objectFieldOffset(address);
LENGTH_OFFSET = UNSAFE.objectFieldOffset(length);
addressOffset = UNSAFE.objectFieldOffset(address);
lengthOffset = UNSAFE.objectFieldOffset(length);
} catch (final SecurityException e) {
throw new LmdbException("Field access error", e);
}
}

private ByteBufProxy() {
}

static Field findField(final String c, final String name) {
Class<?> clazz;
try {
Expand All @@ -97,39 +93,27 @@ static Field findField(final String c, final String name) {
throw new LmdbException(name + " not found");
}

private static ByteBuf createBuffer() {
@Override
protected ByteBuf allocate() {
for (int i = 0; i < BUFFER_RETRIES; i++) {
final ByteBuf bb = DEFAULT.directBuffer(0);
final ByteBuf bb = nettyAllocator.directBuffer();
if (NAME.equals(bb.getClass().getName())) {
return bb;
} else {
bb.release();
}
}
throw new IllegalStateException("Netty buffer must be " + NAME);
}

@Override
protected ByteBuf allocate() {
final ArrayDeque<ByteBuf> queue = BUFFERS.get();
final ByteBuf buffer = queue.poll();

if (buffer != null && buffer.capacity() >= 0) {
return buffer;
} else {
return createBuffer();
}
}

@Override
protected int compare(final ByteBuf o1, final ByteBuf o2) {
return o1.compareTo(o2);
}

@Override
protected void deallocate(final ByteBuf buff) {
final ArrayDeque<ByteBuf> queue = BUFFERS.get();
if (!queue.offer(buff)) {
buff.release();
}
buff.release();
}

@Override
Expand Down Expand Up @@ -161,8 +145,8 @@ protected ByteBuf out(final ByteBuf buffer, final Pointer ptr,
final long ptrAddr) {
final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA);
final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE);
UNSAFE.putLong(buffer, ADDRESS_OFFSET, addr);
UNSAFE.putInt(buffer, LENGTH_OFFSET, (int) size);
UNSAFE.putLong(buffer, addressOffset, addr);
UNSAFE.putInt(buffer, lengthOffset, (int) size);
buffer.writerIndex((int) size).readerIndex(0);
return buffer;
}
Expand Down