Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces initial support for Database Change Protocol (DCP) in the codebase, enabling applications to stream database mutations and events in real-time. DCP allows clients to subscribe to data changes at the vbucket level and receive notifications about mutations, deletions, and other database events.
Key changes:
- Added DCP configuration options and event handlers throughout the client architecture
- Implemented DCP stream management with
DcpStreamSetfor opening/closing vbucket streams - Created router abstractions (static and dynamic) for handling DCP events
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| multikvendpointclientmanager.go | Added DCP options and handlers to manager configuration |
| mock_kvclient_test.go | Generated mock implementations for new DCP client methods |
| kvendpointclientmanager.go | Threaded DCP options through endpoint client manager |
| kvclientpool.go | Updated client pool to propagate DCP configuration |
| kvclientbabysitter.go | Added DCP options to client babysitter initialization |
| kvclient_ops.go | Implemented DCP stream request and close stream operations |
| kvclient_dcp.go | Core DCP bootstrapping logic with feature negotiation |
| kvclient.go | Integrated DCP state management and unsolicited packet handling |
| dcpstreamset.go | Manages multiple DCP streams across vbuckets |
| dcpstreamrouter_static.go | Static router for dispatching DCP events to handlers |
| dcpstreamrouter_dyn.go | Dynamic router with stream registration/unregistration |
| dcpevents.go | Defined DCP event handler types |
| dcpcomponent.go | Stream set manager for creating DCP stream sets |
| agent_ops.go | Agent-level API for creating DCP stream sets |
| agent_dcp_int_test.go | Integration test verifying basic DCP functionality |
| agent.go | Initialized DCP component in agent structure |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return memdClient_SimpleCall(ctx, c, memdx.OpsDcp{ | ||
| ExtFramesEnabled: c.HasFeature(memdx.HelloFeatureAltRequests), | ||
| CollectionsEnabled: c.HasFeature(memdx.HelloFeatureCollections), | ||
| StreamIdsEnabled: false, |
There was a problem hiding this comment.
The StreamIdsEnabled field is hardcoded to false, but DCP options include an EnableStreamIds setting. Consider using c.dcpState.streamIdsEnabled or a similar mechanism to dynamically set this value based on the negotiated DCP state, ensuring consistency between the client's DCP configuration and its operational behavior.
| StreamIdsEnabled: false, | |
| StreamIdsEnabled: c.dcpState.streamIdsEnabled, |
| } | ||
|
|
||
| func (r *DcpStreamRouterDyn) getHandlers(streamId uint16) (DcpEventsHandlers, bool) { | ||
| state := r.state.Load() |
There was a problem hiding this comment.
Potential nil pointer dereference if state.Load() returns nil. This can happen when getHandlers is called before any streams are registered and rebuildStateLocked has not been invoked. Add a nil check for state before accessing state.streams.
| state := r.state.Load() | |
| state := r.state.Load() | |
| if state == nil { | |
| return DcpEventsHandlers{}, false | |
| } |
No description provided.