Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 14 additions & 28 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,36 +627,29 @@ func (s *Service) handleIncoming(stream network.Stream) {

overlay := i.BzzAddress.Overlay

if err = handshakeStream.FullClose(); err != nil {
s.logger.Debug("stream handler: could not close stream", "peer_address", overlay, "error", err)
s.logger.Error(nil, "stream handler: unable to handshake with peer", "peer_address", overlay)
_ = s.Disconnect(overlay, "could not fully close stream on handshake")
return
}

blocked, err := s.blocklist.Exists(overlay)
if err != nil {
s.logger.Debug("stream handler: blocklisting: exists failed", "peer_address", overlay, "error", err)
s.logger.Error(nil, "stream handler: internal error while connecting with peer", "peer_address", overlay)
_ = handshakeStream.Reset()
_ = stream.Conn().Close()
return
}

if blocked {
s.logger.Error(nil, "stream handler: blocked connection from blocklisted peer", "peer_address", overlay)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}

if exists := s.peers.addIfNotExists(stream.Conn(), overlay, i.FullNode); exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider (not sure if this is problematic):
if you close the stream earlier before the peer has been added to this peers struct, the other peer will progress with the handshake protocol (continues execution) and may fire a request before the peer was added (the call on this line). i'm not sure if this can cause downstream problems (maybe a protocol already tries to send a message, and some check on the libp2p abstraction that tries to check if the peer exists on this list happens before the peer was added

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, interesting observation. The question is, is handshake stream keeping this synchronization. I am not sure that I can see that by keeping the handshake stream active, other protocols are blocked from sending messages. Am interested to see if I am missing something. Could you elaborate more on your assumptions, or try to see if there could be problems?

I even cannot see why handshake stream is even created earlier than it is needed. This can also be changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc the FullClose usage was meant as a means of synchronizing both peers and making sure both peers are at the same stage of the handshake. Previously they would be reading on the stream, waiting for something, and when we would close the stream they could then continue execution.

For example on the connecting side, the protocols start firing here.
The receiver here.

The question is whether under some lock contention on that peer registry or under circumstances will we see one peer finishing the handshake on the other before the other one making it to the line where that peer gets onto the peer registry. If yes then there might be some downstream errors or some weird halfway state (peer is "halfway" connected). Whether the current handshake protocol prevents that fully - can't say, need to have a further look.

s.logger.Debug("stream handler: peer already exists", "peer_address", overlay)
if err = handshakeStream.FullClose(); err != nil {
s.logger.Debug("stream handler: could not close stream", "peer_address", overlay, "error", err)
s.logger.Error(nil, "stream handler: unable to handshake with peer", "peer_address", overlay)
_ = stream.Conn().Close()
}
return
}

if err = handshakeStream.FullClose(); err != nil {
s.logger.Debug("stream handler: could not close stream", "peer_address", overlay, "error", err)
s.logger.Error(nil, "stream handler: unable to handshake with peer", "peer_address", overlay)
_ = s.Disconnect(overlay, "could not fully close stream on handshake")
return
}

Expand Down Expand Up @@ -1079,19 +1072,22 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
return nil, fmt.Errorf("handshake: %w", err)
}

overlay := i.BzzAddress.Overlay

if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(overlay, "could not fully close handshake stream after connect")
return nil, fmt.Errorf("connect full close %w", err)
}

if !i.FullNode {
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(info.ID)
return nil, p2p.ErrDialLightNode
}

overlay := i.BzzAddress.Overlay

blocked, err := s.blocklist.Exists(overlay)
if err != nil {
s.logger.Debug("blocklisting: exists failed", "peer_id", info.ID, "error", err)
s.logger.Error(nil, "internal error while connecting with peer", "peer_id", info.ID)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(info.ID)
return nil, err
}
Expand All @@ -1104,19 +1100,9 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
}

if exists := s.peers.addIfNotExists(stream.Conn(), overlay, i.FullNode); exists {
if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(overlay, "failed closing handshake stream after connect")
return nil, fmt.Errorf("peer exists, full close: %w", err)
}

return i.BzzAddress, nil
}

if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(overlay, "could not fully close handshake stream after connect")
return nil, fmt.Errorf("connect full close %w", err)
}

if !s.peers.Exists(overlay) {
return nil, p2p.ErrPeerNotFound
}
Expand Down
Loading