From 9467f61720ef79826aca15c8506e9b94854fe00e Mon Sep 17 00:00:00 2001 From: Min Date: Fri, 23 Sep 2016 21:51:53 +0900 Subject: [PATCH] resetDecoder should be called once per decoding --- .../org/msgpack/core/MessageUnpacker.java | 10 +-- .../buffer/SequenceMessageBufferInput.java | 81 +++++++++++++++++++ .../msgpack/core/MessageUnpackerTest.scala | 45 ++++++++++- 3 files changed, 126 insertions(+), 10 deletions(-) create mode 100644 msgpack-core/src/main/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java index 399e126dc..0696cd54c 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java @@ -940,12 +940,13 @@ public String unpackString() if (len > stringSizeLimit) { throw new MessageSizeException(String.format("cannot unpack a String of size larger than %,d: %,d", stringSizeLimit, len), len); } + + resetDecoder(); // should be invoked only once per value + if (buffer.size() - position >= len) { return decodeStringFastPath(len); } - resetDecoder(); - try { int rawRemaining = len; while (rawRemaining > 0) { @@ -1039,10 +1040,7 @@ private String decodeStringFastPath(int length) return s; } else { - resetDecoder(); - ByteBuffer bb = buffer.sliceAsByteBuffer(); - bb.limit(position + length); - bb.position(position); + ByteBuffer bb = buffer.sliceAsByteBuffer(position, length); CharBuffer cb; try { cb = decoder.decode(bb); diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java new file mode 100644 index 000000000..59d52acdc --- /dev/null +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java @@ -0,0 +1,81 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.core.buffer; + +import java.io.IOException; +import java.util.Enumeration; + +import static org.msgpack.core.Preconditions.checkNotNull; + +/** + * {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration + */ +public class SequenceMessageBufferInput + implements MessageBufferInput +{ + private Enumeration sequence; + private MessageBufferInput input; + + public SequenceMessageBufferInput(Enumeration sequence) + { + this.sequence = checkNotNull(sequence, "input sequence is null"); + try { + nextInput(); + } + catch (IOException ignore) { + } + } + + @Override + public MessageBuffer next() throws IOException + { + if (input == null) { + return null; + } + MessageBuffer buffer = input.next(); + if (buffer == null) { + nextInput(); + return next(); + } + + return buffer; + } + + private void nextInput() throws IOException + { + if (input != null) { + input.close(); + } + + if (sequence.hasMoreElements()) { + input = sequence.nextElement(); + if (input == null) { + throw new NullPointerException(); + } + } + else { + input = null; + } + } + + @Override + public void close() throws IOException + { + do { + nextInput(); + } while (input != null); + } +} diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 1c8864c7c..f2ecb8f28 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -17,11 +17,13 @@ package org.msgpack.core import java.io._ import java.nio.ByteBuffer +import java.util.Collections import org.msgpack.core.buffer._ import org.msgpack.value.ValueType import xerial.core.io.IOUtil._ +import scala.collection.JavaConversions._ import scala.util.Random object MessageUnpackerTest { @@ -205,6 +207,34 @@ class MessageUnpackerTest extends MessagePackSpec { builder.result() } + def sequenceUnpackers(data: Array[Byte], size: Int) : Seq[MessageUnpacker] = { + val seqBytes = Seq.newBuilder[MessageBufferInput] + val seqByteBuffers = Seq.newBuilder[MessageBufferInput] + val seqDirectBuffers = Seq.newBuilder[MessageBufferInput] + var left = data.length + var position = 0 + while (left > 0) { + val length = Math.min(size, left) + seqBytes += new ArrayBufferInput(data, position, length); + val bb = ByteBuffer.allocate(length) + val db = ByteBuffer.allocateDirect(length) + bb.put(data, position, length).flip() + db.put(data, position, length).flip() + seqByteBuffers += new ByteBufferInput(bb); + seqDirectBuffers += new ByteBufferInput(db); + left -= length + position += length + } + val builder = Seq.newBuilder[MessageUnpacker] + builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqBytes.result()))) + builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqByteBuffers.result()))) + if (!universal) { + builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqDirectBuffers.result()))) + } + + builder.result() + } + "MessageUnpacker" should { "parse message packed data" taggedAs ("unpack") in { @@ -330,21 +360,28 @@ class MessageUnpackerTest extends MessagePackSpec { new SplitTest {val data = testData3(30)}.run } - "read numeric data at buffer boundary" taggedAs("boundary2") in { + "read data at buffer boundary" taggedAs("boundary2") in { val packer = MessagePack.newDefaultBufferPacker() (0 until 1170).foreach{i => packer.packLong(0x0011223344556677L) - packer.packString("hello") + packer.packString("hello world") } packer.close val data = packer.toByteArray - val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192)) + var unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192)) (0 until 1170).foreach { i => unpacker.unpackLong() shouldBe 0x0011223344556677L - unpacker.unpackString() shouldBe "hello" + unpacker.unpackString() shouldBe "hello world" } unpacker.close() + + for (unpacker <- sequenceUnpackers(data, 32)) { + (0 until 1170).foreach { i => + unpacker.unpackLong() shouldBe 0x0011223344556677L + unpacker.unpackString() shouldBe "hello world" + } + } } "be faster then msgpack-v6 skip" taggedAs ("cmp-skip") in {