Skip to content

Commit bbf6856

Browse files
committed
Fixed JournalTransactionStore#checkpoint,thanks to @platformregister
1 parent 858be79 commit bbf6856

2 files changed

Lines changed: 29 additions & 28 deletions

File tree

metamorphosis-server/src/main/java/com/taobao/metamorphosis/server/transaction/store/JournalTransactionStore.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,11 @@ public void prepare(final TransactionId txid) throws IOException {
305305
}
306306
final TransactionOperation to =
307307
TransactionOperation.newBuilder().setType(TransactionType.XA_PREPARE)
308-
.setTransactionId(txid.getTransactionKey()).setWasPrepared(false).build();
308+
.setTransactionId(txid.getTransactionKey()).setWasPrepared(false).build();
309309
// prepare,必须force
310310
final TxCommand msg =
311311
TxCommand.newBuilder().setCmdType(TxCommandType.TX_OP).setCmdContent(to.toByteString()).setForce(true)
312-
.build();
312+
.build();
313313
this.journalStore.write(msg, null, tx.location, false);
314314

315315
synchronized (this.preparedTransactions) {
@@ -432,25 +432,25 @@ public void appendComplete(final Location location) {
432432
final int attachmentLen = localtionBytes.remaining();
433433
if (txid.isXATransaction()) {
434434
final TransactionOperation to = TransactionOperation.newBuilder() //
435-
.setType(TransactionType.XA_COMMIT) //
436-
.setTransactionId(txid.getTransactionKey()) //
437-
.setWasPrepared(wasPrepared) //
438-
.setDataLength(attachmentLen) // 设置附加数据长度
439-
.build();
435+
.setType(TransactionType.XA_COMMIT) //
436+
.setTransactionId(txid.getTransactionKey()) //
437+
.setWasPrepared(wasPrepared) //
438+
.setDataLength(attachmentLen) // 设置附加数据长度
439+
.build();
440440
msg =
441441
TxCommand.newBuilder().setCmdType(TxCommandType.TX_OP)
442-
.setCmdContent(to.toByteString()).build();
442+
.setCmdContent(to.toByteString()).build();
443443
}
444444
else {
445445
final TransactionOperation to = TransactionOperation.newBuilder() //
446-
.setType(TransactionType.LOCAL_COMMIT) //
447-
.setTransactionId(txid.getTransactionKey()) //
448-
.setWasPrepared(wasPrepared) //
449-
.setDataLength(attachmentLen)// 设置附加数据长度
450-
.build();
446+
.setType(TransactionType.LOCAL_COMMIT) //
447+
.setTransactionId(txid.getTransactionKey()) //
448+
.setWasPrepared(wasPrepared) //
449+
.setDataLength(attachmentLen)// 设置附加数据长度
450+
.build();
451451
msg =
452452
TxCommand.newBuilder().setCmdType(TxCommandType.TX_OP)
453-
.setCmdContent(to.toByteString()).build();
453+
.setCmdContent(to.toByteString()).build();
454454
}
455455
// 记录commit日志,并附加位置信息
456456
try {
@@ -497,20 +497,20 @@ public void rollback(final TransactionId txid) throws IOException {
497497
if (tx != null) {
498498
if (txid.isXATransaction()) {
499499
final TransactionOperation to = TransactionOperation.newBuilder() //
500-
.setType(TransactionType.XA_ROLLBACK) //
501-
.setTransactionId(txid.getTransactionKey()) //
502-
.setWasPrepared(false) //
503-
.build();
500+
.setType(TransactionType.XA_ROLLBACK) //
501+
.setTransactionId(txid.getTransactionKey()) //
502+
.setWasPrepared(false) //
503+
.build();
504504
final TxCommand msg =
505505
TxCommand.newBuilder().setCmdType(TxCommandType.TX_OP).setCmdContent(to.toByteString()).build();
506506
this.journalStore.write(msg, null, tx.location, true);
507507
}
508508
else {
509509
final TransactionOperation to = TransactionOperation.newBuilder() //
510-
.setType(TransactionType.LOCAL_ROLLBACK) //
511-
.setTransactionId(txid.getTransactionKey()) //
512-
.setWasPrepared(false) //
513-
.build();
510+
.setType(TransactionType.LOCAL_ROLLBACK) //
511+
.setTransactionId(txid.getTransactionKey()) //
512+
.setWasPrepared(false) //
513+
.build();
514514
final TxCommand msg =
515515
TxCommand.newBuilder().setCmdType(TxCommandType.TX_OP).setCmdContent(to.toByteString()).build();
516516
this.journalStore.write(msg, null, tx.location, true);
@@ -593,10 +593,10 @@ public void addMessage(final MessageStore store, final long msgId, final PutComm
593593
// 非重放,添加put日志
594594
final AppendMessageCommand appendCmd =
595595
AppendMessageCommand.newBuilder().setMessageId(msgId)
596-
.setPutCommand(ByteString.copyFrom(putCmd.encode().array())).build();
596+
.setPutCommand(ByteString.copyFrom(putCmd.encode().array())).build();
597597
final TxCommand txCommand =
598598
TxCommand.newBuilder().setCmdType(TxCommandType.APPEND_MSG).setCmdContent(appendCmd.toByteString())
599-
.build();
599+
.build();
600600
final Tx tx = this.getInflyTx(putCmd.getTransactionId());
601601
if (tx != null) {
602602
location = this.journalStore.write(txCommand, null, tx.location, false);
@@ -624,7 +624,7 @@ public JournalLocation checkpoint() throws IOException {
624624
for (final Iterator<Tx> iter = this.inflightTransactions.values().iterator(); iter.hasNext();) {
625625
final Tx tx = iter.next();
626626
final JournalLocation location = tx.location;
627-
if (rc == null || rc.compareTo(location) < 0) {
627+
if (rc == null || rc.compareTo(location) > 0) {
628628
rc = location;
629629
}
630630
}
@@ -633,7 +633,7 @@ public JournalLocation checkpoint() throws IOException {
633633
for (final Iterator<Tx> iter = this.preparedTransactions.values().iterator(); iter.hasNext();) {
634634
final Tx tx = iter.next();
635635
final JournalLocation location = tx.location;
636-
if (rc == null || rc.compareTo(location) < 0) {
636+
if (rc == null || rc.compareTo(location) > 0) {
637637
rc = location;
638638
}
639639
}

metamorphosis-server/src/test/java/com/taobao/metamorphosis/server/transaction/store/JournalTransactionStoreUnitTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ public void recover(final XATransactionId id, final PutCommand[] addedMessages)
300300
}
301301

302302

303+
@Test
303304
public void testCheckpoint() throws Exception {
304305
// 事务1
305306
final LocalTransactionId xid1 = new LocalTransactionId("session1", 1);
@@ -308,13 +309,13 @@ public void testCheckpoint() throws Exception {
308309
null);
309310

310311
// 事务2
311-
final LocalTransactionId xid2 = new LocalTransactionId("session1", 1);
312+
final LocalTransactionId xid2 = new LocalTransactionId("session2", 1);
312313
final MessageStore store2 = this.messageStoreManager.getOrCreateMessageStore("topic1", 3);
313314
this.transactionStore.addMessage(store2, 1, new PutCommand("topic1", 3, ("msg" + 3).getBytes(), xid2, 0, 1),
314315
null);
315316

316317
// 事务3,已经提交
317-
final LocalTransactionId xid3 = new LocalTransactionId("session1", 1);
318+
final LocalTransactionId xid3 = new LocalTransactionId("session3", 1);
318319
final MessageStore store3 = this.messageStoreManager.getOrCreateMessageStore("topic1", 0);
319320
this.transactionStore.addMessage(store3, 1, new PutCommand("topic1", 0, ("msg" + 0).getBytes(), xid3, 0, 1),
320321
null);

0 commit comments

Comments
 (0)