Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler {

private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();

private volatile boolean isClosed = false;
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private final String trxEsName = "trx-msg-handler";
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
Expand All @@ -58,8 +60,14 @@ public void init() {
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
isClosed = true;
// Stop the scheduler first so no new tasks are drained from smartContractQueue.
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
// Then shutdown the worker pool to finish already-submitted tasks.
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
Comment thread
0xbigapple marked this conversation as resolved.
Comment thread
0xbigapple marked this conversation as resolved.
// Discard any remaining items and release references.
smartContractQueue.clear();
Comment thread
0xbigapple marked this conversation as resolved.
queue.clear();
}

public boolean isBusy() {
Expand All @@ -68,6 +76,10 @@ public boolean isBusy() {

@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
if (isClosed) {
logger.info("TransactionsMsgHandler is closed, drop message");
return;
}
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
Comment thread
0xbigapple marked this conversation as resolved.
check(peer, transactionsMessage);
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
Expand All @@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
int trxHandlePoolQueueSize = 0;
int dropSmartContractCount = 0;
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
if (isClosed) {
logger.info("TransactionsMsgHandler is closed during processing, stop submit");
break;
}
int type = trx.getRawData().getContract(0).getType().getNumber();
if (type == ContractType.TriggerSmartContract_VALUE
|| type == ContractType.CreateSmartContract_VALUE) {
Expand All @@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
dropSmartContractCount++;
}
} else {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
try {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
} catch (RejectedExecutionException e) {
logger.warn("Submit task to {} failed", trxEsName);
Comment thread
0xbigapple marked this conversation as resolved.
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

import lombok.Getter;
import org.joda.time.DateTime;
Expand All @@ -20,7 +23,10 @@
import org.tron.common.TestConstants;
import org.tron.common.runtime.TvmTestUtils;
import org.tron.common.utils.ByteArray;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
import org.tron.core.exception.P2pException.TypeEnum;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.TransactionMessage;
import org.tron.core.net.message.adv.TransactionsMessage;
Expand Down Expand Up @@ -80,7 +86,6 @@ public void testProcessMessage() {
transactionsMsgHandler.processMessage(peer, new TransactionsMessage(transactionList));
Assert.assertNull(advInvRequest.get(item));
//Thread.sleep(10);
transactionsMsgHandler.close();
BlockingQueue<TrxEvent> smartContractQueue =
new LinkedBlockingQueue(2);
smartContractQueue.offer(new TrxEvent(null, null));
Expand Down Expand Up @@ -132,6 +137,159 @@ public void testProcessMessage() {
}
}

@Test
public void testProcessMessageAfterClose() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
handler.init();
handler.close();

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);

handler.processMessage(peer, msg);

Mockito.verify(msg, Mockito.never()).getTransactions();
Comment thread
0xbigapple marked this conversation as resolved.
Mockito.verifyNoInteractions(peer);
}

@Test
public void testRejectedExecution() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
ExecutorService mockPool = Mockito.mock(ExecutorService.class);
Mockito.when(mockPool.submit(Mockito.any(Runnable.class)))
.thenThrow(new RejectedExecutionException("pool closed"));
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
poolField.setAccessible(true);
poolField.set(handler, mockPool);

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = buildTransferMessage(2);
stubAdvInvRequest(peer, msg);
// 2 transfer transactions, submit throws on the first → catch + break, only called once
handler.processMessage(peer, msg);

Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
} finally {
handler.close();
}
}

@Test
public void testCloseDuringProcessing() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
Field closedField = TransactionsMsgHandler.class.getDeclaredField("isClosed");
closedField.setAccessible(true);

ExecutorService mockPool = Mockito.mock(ExecutorService.class);
// on the first submit, flip isClosed to true so the second iteration breaks
Mockito.when(mockPool.submit(Mockito.any(Runnable.class))).thenAnswer(inv -> {
closedField.set(handler, true);
return null;
});
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
poolField.setAccessible(true);
poolField.set(handler, mockPool);

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = buildTransferMessage(2);
stubAdvInvRequest(peer, msg);
handler.processMessage(peer, msg);

Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
} finally {
handler.close();
}
}

private TransactionsMessage buildTransferMessage(int count) {
List<Protocol.Transaction> txs = new ArrayList<>();
for (int i = 0; i < count; i++) {
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
.setAmount(10 + i)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
txs.add(Protocol.Transaction.newBuilder().setRawData(
Protocol.Transaction.raw.newBuilder()
.setTimestamp(1_700_000_000_000L + i)
.setRefBlockNum(1)
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(tc)).build()).build())
.build());
}
return new TransactionsMessage(txs);
}

private void stubAdvInvRequest(PeerConnection peer, TransactionsMessage msg) {
Map<Item, Long> advInvRequest = new ConcurrentHashMap<>();
for (Protocol.Transaction trx : msg.getTransactions().getTransactionsList()) {
Item item = new Item(new TransactionMessage(trx).getMessageId(),
Protocol.Inventory.InventoryType.TRX);
advInvRequest.put(item, 0L);
}
Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest);
}

@Test
public void testHandleTransaction() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
AdvService advService = Mockito.mock(AdvService.class);
ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class);

Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
f1.setAccessible(true);
f1.set(handler, tronNetDelegate);
Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService");
f2.setAccessible(true);
f2.set(handler, advService);
Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager");
f3.setAccessible(true);
f3.set(handler, chainBaseManager);

PeerConnection peer = Mockito.mock(PeerConnection.class);

BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
.setAmount(10)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
long now = System.currentTimeMillis();
Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData(
Protocol.Transaction.raw.newBuilder()
.setTimestamp(now)
.setExpiration(now + 60_000)
.setRefBlockNum(1)
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(tc)).build()).build())
.build();
TransactionMessage trxMsg = new TransactionMessage(trx);

Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod(
"handleTransaction", PeerConnection.class, TransactionMessage.class);
handleTx.setAccessible(true);

// happy path → push and broadcast
Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now);
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(advService).broadcast(trxMsg);

// P2pException BAD_TRX → disconnect
Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad"))
.when(tronNetDelegate).pushTransaction(Mockito.any());
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(peer).setBadPeer(true);
Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX);
} finally {
handler.close();
}
}

class TrxEvent {

@Getter
Expand Down
Loading