From cb19f5919d9200ff4cea7696fefe5a3aef094423 Mon Sep 17 00:00:00 2001 From: Maithem Date: Tue, 18 Aug 2020 01:23:15 -0700 Subject: [PATCH] Support User Defined PooledByteBufAllocator This patch allows users to pass a PooledByteBufAllocator instance if they wish to not use the DEFAULT instance. Also, this patch fixes a minor bug where a ByteBuf is created during initialization without ever being released. --- src/main/java/org/lmdbjava/ByteBufProxy.java | 64 ++++++++------------ 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/lmdbjava/ByteBufProxy.java b/src/main/java/org/lmdbjava/ByteBufProxy.java index ca580363..271c4e2f 100644 --- a/src/main/java/org/lmdbjava/ByteBufProxy.java +++ b/src/main/java/org/lmdbjava/ByteBufProxy.java @@ -22,13 +22,12 @@ import static io.netty.buffer.PooledByteBufAllocator.DEFAULT; import static java.lang.Class.forName; -import static java.lang.ThreadLocal.withInitial; import static org.lmdbjava.UnsafeAccess.UNSAFE; import java.lang.reflect.Field; -import java.util.ArrayDeque; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import jnr.ffi.Pointer; /** @@ -47,37 +46,34 @@ public final class ByteBufProxy extends BufferProxy { */ public static final BufferProxy PROXY_NETTY = new ByteBufProxy(); - private static final long ADDRESS_OFFSET; - - /** - * A thread-safe pool for a given length. If the buffer found is bigger then - * the buffer in the pool creates a new buffer. If no buffer is found creates - * a new buffer. - */ - private static final ThreadLocal> BUFFERS = withInitial(() - -> new ArrayDeque<>(16)); - private static final int BUFFER_RETRIES = 10; private static final String FIELD_NAME_ADDRESS = "memoryAddress"; private static final String FIELD_NAME_LENGTH = "length"; - private static final long LENGTH_OFFSET; private static final String NAME = "io.netty.buffer.PooledUnsafeDirectByteBuf"; + private final long lengthOffset; + private final long addressOffset; + + private final PooledByteBufAllocator nettyAllocator; + + private ByteBufProxy() { + this(DEFAULT); + } + + public ByteBufProxy(final PooledByteBufAllocator allocator) { + this.nettyAllocator = allocator; - static { try { - createBuffer(); + final ByteBuf initBuf = this.allocate(); + initBuf.release(); final Field address = findField(NAME, FIELD_NAME_ADDRESS); final Field length = findField(NAME, FIELD_NAME_LENGTH); - ADDRESS_OFFSET = UNSAFE.objectFieldOffset(address); - LENGTH_OFFSET = UNSAFE.objectFieldOffset(length); + addressOffset = UNSAFE.objectFieldOffset(address); + lengthOffset = UNSAFE.objectFieldOffset(length); } catch (final SecurityException e) { throw new LmdbException("Field access error", e); } } - private ByteBufProxy() { - } - static Field findField(final String c, final String name) { Class clazz; try { @@ -97,28 +93,19 @@ static Field findField(final String c, final String name) { throw new LmdbException(name + " not found"); } - private static ByteBuf createBuffer() { + @Override + protected ByteBuf allocate() { for (int i = 0; i < BUFFER_RETRIES; i++) { - final ByteBuf bb = DEFAULT.directBuffer(0); + final ByteBuf bb = nettyAllocator.directBuffer(); if (NAME.equals(bb.getClass().getName())) { return bb; + } else { + bb.release(); } } throw new IllegalStateException("Netty buffer must be " + NAME); } - @Override - protected ByteBuf allocate() { - final ArrayDeque queue = BUFFERS.get(); - final ByteBuf buffer = queue.poll(); - - if (buffer != null && buffer.capacity() >= 0) { - return buffer; - } else { - return createBuffer(); - } - } - @Override protected int compare(final ByteBuf o1, final ByteBuf o2) { return o1.compareTo(o2); @@ -126,10 +113,7 @@ protected int compare(final ByteBuf o1, final ByteBuf o2) { @Override protected void deallocate(final ByteBuf buff) { - final ArrayDeque queue = BUFFERS.get(); - if (!queue.offer(buff)) { - buff.release(); - } + buff.release(); } @Override @@ -161,8 +145,8 @@ protected ByteBuf out(final ByteBuf buffer, final Pointer ptr, final long ptrAddr) { final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA); final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE); - UNSAFE.putLong(buffer, ADDRESS_OFFSET, addr); - UNSAFE.putInt(buffer, LENGTH_OFFSET, (int) size); + UNSAFE.putLong(buffer, addressOffset, addr); + UNSAFE.putInt(buffer, lengthOffset, (int) size); buffer.writerIndex((int) size).readerIndex(0); return buffer; }