diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java index dc886517476..3b9e86d4791 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java @@ -150,6 +150,7 @@ private void processBlock(PeerConnection peer, BlockCapsule block) throws P2pExc try { tronNetDelegate.processBlock(block, false); + peer.setBlockRcvTime(System.currentTimeMillis()); witnessProductBlockService.validWitnessProductTwoBlock(block); Item item = new Item(blockId, InventoryType.BLOCK); diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java index c92d53584a3..59232a8d258 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java @@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.tron.common.utils.Sha256Hash; +import org.tron.core.capsule.BlockCapsule.BlockId; import org.tron.core.config.args.Args; import org.tron.core.exception.P2pException; import org.tron.core.exception.P2pException.TypeEnum; @@ -44,7 +45,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep peer.getAdvInvReceive().put(item, System.currentTimeMillis()); advService.addInv(item); if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) { - peer.setLastInteractiveTime(System.currentTimeMillis()); + long headNum = tronNetDelegate.getHeadBlockId().getNum(); + if (new BlockId(id).getNum() > headNum) { + peer.setLastInteractiveTime(System.currentTimeMillis()); + } } } } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 253502bc3a1..8d7818d1608 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -88,6 +88,10 @@ public class PeerConnection { @Setter private volatile long lastInteractiveTime; + @Setter + @Getter + private volatile long blockRcvTime; + @Getter @Setter private volatile TronState tronState = TronState.INIT; diff --git a/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java index b99b5b52bad..8abb8404cf3 100644 --- a/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java +++ b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java @@ -3,8 +3,10 @@ import static org.tron.common.math.Maths.ceil; import static org.tron.common.math.Maths.max; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -44,7 +46,7 @@ public class ResilienceService { @Autowired private ChainBaseManager chainBaseManager; - + public void init() { if (Args.getInstance().isOpenFullTcpDisconnect) { executor.scheduleWithFixedDelay(() -> { @@ -86,6 +88,7 @@ private void disconnectRandom() { .collect(Collectors.toList()); if (peers.size() >= minBroadcastPeerSize) { + peers = getRandomDisconnectionPeers(peers); long now = System.currentTimeMillis(); Map weights = new HashMap<>(); peers.forEach(peer -> { @@ -121,6 +124,14 @@ private void disconnectRandom() { } + private List getRandomDisconnectionPeers(List peers) { + Map snapshot = new IdentityHashMap<>(peers.size()); + peers.forEach(p -> snapshot.put(p, p.getBlockRcvTime())); + List sorted = new ArrayList<>(peers); + sorted.sort(Comparator.comparingLong(snapshot::get)); + return sorted.subList(0, sorted.size() / 2); + } + private void disconnectLan() { if (!isLanNode()) { return; diff --git a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java index 32230612743..0ffe69db097 100644 --- a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java +++ b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java @@ -337,6 +337,7 @@ private void processSyncBlock(BlockCapsule block, PeerConnection peerConnection) try { tronNetDelegate.validSignature(block); tronNetDelegate.processBlock(block, true); + peerConnection.setBlockRcvTime(System.currentTimeMillis()); pbftDataSyncHandler.processPBFTCommitData(block); } catch (P2pException p2pException) { logger.error("Process sync block {} failed, type: {}", diff --git a/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java index 792fb82c2c6..c8c4d974d8e 100644 --- a/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java +++ b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashSet; +import java.util.List; import java.util.Set; import javax.annotation.Resource; import org.junit.After; @@ -97,6 +98,57 @@ public void testDisconnectRandom() { Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size()); } + @Test + public void testDisconnectRandomPreservesRecentBlockRcvTimePeer() { + int maxConnection = 30; + Assert.assertEquals(0, PeerManager.getPeers().size()); + + ApplicationContext ctx = (ApplicationContext) ReflectUtils.getFieldObject(p2pEventHandler, + "ctx"); + + // Create maxConnection + 1 peers (triggers disconnectRandom) + for (int i = 0; i < maxConnection + 1; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("202.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + PeerManager.add(ctx, c1); + } + + // Set first minBroadcastPeerSize peers as broadcast-state + List peers = PeerManager.getPeers(); + for (PeerConnection peer : peers.subList(0, ResilienceService.minBroadcastPeerSize)) { + peer.setNeedSyncFromPeer(false); + peer.setNeedSyncFromUs(false); + peer.setLastInteractiveTime(System.currentTimeMillis() - 1000); + } + for (PeerConnection peer : peers.subList(ResilienceService.minBroadcastPeerSize, + maxConnection + 1)) { + peer.setNeedSyncFromPeer(false); + peer.setNeedSyncFromUs(true); + } + + // Give the LAST broadcast peer a very recent blockRcvTime — it must NOT be disconnected + PeerConnection bestPeer = peers.stream() + .filter(p -> !p.isNeedSyncFromUs() && !p.isNeedSyncFromPeer()) + .reduce((a, b) -> b) // last broadcast peer + .orElseThrow(() -> new AssertionError("no broadcast peer")); + bestPeer.setBlockRcvTime(System.currentTimeMillis()); + + InetSocketAddress bestPeerAddress = bestPeer.getChannel().getInetSocketAddress(); + + // With minBroadcastPeerSize=3 broadcast peers, getRandomDisconnectionPeers returns + // the 1 peer with oldest blockRcvTime (0). bestPeer has most recent time → exempt. + ReflectUtils.invokeMethod(service, "disconnectRandom"); + + boolean bestPeerStillConnected = PeerManager.getPeers().stream() + .anyMatch(p -> p.getChannel().getInetSocketAddress().equals(bestPeerAddress)); + Assert.assertTrue("Peer with most recent blockRcvTime should not be disconnected", + bestPeerStillConnected); + } + @Test public void testDisconnectLan() { int minConnection = 8;