-
Notifications
You must be signed in to change notification settings - Fork 14
bgpstatus: replace netlink collector with gNMI Get #3674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
juan-malbeclabs
wants to merge
2
commits into
main
Choose a base branch
from
jo/bgpstatus_gnmi
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,14 +6,11 @@ import ( | |
| "fmt" | ||
| "log/slog" | ||
| "net" | ||
| "strconv" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/gagliardetto/solana-go" | ||
| "github.com/jonboulle/clockwork" | ||
| "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" | ||
| "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" | ||
| ) | ||
|
|
||
|
|
@@ -35,20 +32,18 @@ type ServiceabilityClient interface { | |
| GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) | ||
| } | ||
|
|
||
| // NamespaceCollector collects BGP session state and local network interfaces | ||
| // from a single Linux VRF network namespace. It returns the set of remote IP | ||
| // strings with ESTABLISHED BGP sessions, the local interfaces, and any error. | ||
| // Implement with DefaultCollector for production; use a mock in tests. | ||
| type NamespaceCollector func(ctx context.Context, namespace string) (established map[string]struct{}, ifaces []netutil.Interface, err error) | ||
| // BGPCollector returns the set of remote IP strings with ESTABLISHED BGP | ||
| // sessions, across all network instances visible to the device. It is called | ||
| // once per tick. Implement with GNMICollector for production; use a mock in tests. | ||
| type BGPCollector func(ctx context.Context) (established map[string]struct{}, err error) | ||
|
|
||
| // Config holds all parameters for the BGP status submitter. | ||
| type Config struct { | ||
| Log *slog.Logger | ||
| Executor BGPStatusExecutor | ||
| ServiceabilityClient ServiceabilityClient | ||
| Collector NamespaceCollector | ||
| Collector BGPCollector | ||
| LocalDevicePK solana.PublicKey | ||
| BGPNamespace string | ||
| Interval time.Duration // default: 60s | ||
| PeriodicRefreshInterval time.Duration // default: 6h | ||
| DownGracePeriod time.Duration // default: 0 | ||
|
|
@@ -71,9 +66,6 @@ func (c *Config) validate() error { | |
| if c.LocalDevicePK.IsZero() { | ||
| return errors.New("local device pubkey is required") | ||
| } | ||
| if c.BGPNamespace == "" { | ||
| return errors.New("bgp namespace is required") | ||
| } | ||
| if c.Interval <= 0 { | ||
| c.Interval = defaultInterval | ||
| } | ||
|
|
@@ -99,7 +91,7 @@ type submitTask struct { | |
| status serviceability.BGPStatus | ||
| } | ||
|
|
||
| // Submitter collects BGP socket state on each tick, determines per-user BGP | ||
| // Submitter collects BGP session state on each tick, determines per-user BGP | ||
| // status, and submits SetUserBGPStatus onchain via a non-blocking worker. | ||
| type Submitter struct { | ||
| cfg Config | ||
|
|
@@ -125,8 +117,7 @@ func NewSubmitter(cfg Config) (*Submitter, error) { | |
| } | ||
|
|
||
| // Start launches the submitter in the background and returns a channel that | ||
| // receives a fatal error (or is closed on clean shutdown). It mirrors the | ||
| // state.Collector.Start pattern. | ||
| // receives a fatal error (or is closed on clean shutdown). | ||
| func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan error { | ||
| errCh := make(chan error, 1) | ||
| go func() { | ||
|
|
@@ -141,8 +132,8 @@ func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan | |
| } | ||
|
|
||
| // userStateFor returns or creates the per-user tracking entry (caller must hold s.mu). | ||
| // initialStatus is used only when creating a new entry; it seeds lastOnchainStatus so | ||
| // that a restarted submitter correctly handles users whose onchain state is already Up. | ||
| // initialStatus seeds lastOnchainStatus so a restarted submitter correctly handles | ||
| // users whose onchain state is already Up. | ||
| func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPStatus) *userState { | ||
| us, ok := s.userState[key] | ||
| if !ok { | ||
|
|
@@ -152,35 +143,27 @@ func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPSta | |
| return us | ||
| } | ||
|
|
||
| // bgpSocket is the minimal BGP socket representation used by the pure helpers. | ||
| // The Linux-specific submitter.go converts state.BGPSocketState to this type. | ||
| type bgpSocket struct { | ||
| RemoteIP string | ||
| State string | ||
| } | ||
|
|
||
| // --- Pure helpers (no Linux syscalls; fully testable on all platforms) --- | ||
|
|
||
| // buildEstablishedIPSet returns a set of remote IP strings for BGP sessions | ||
| // that are currently in the ESTABLISHED state. | ||
| func buildEstablishedIPSet(sockets []bgpSocket) map[string]struct{} { | ||
| m := make(map[string]struct{}, len(sockets)) | ||
| for _, sock := range sockets { | ||
| if sock.State == "ESTABLISHED" { | ||
| m[sock.RemoteIP] = struct{}{} | ||
| } | ||
| } | ||
| return m | ||
| } | ||
| // --- Pure helpers (no platform-specific code; fully testable on all platforms) --- | ||
|
|
||
| // tunnelNetToIPNet parses the onchain [5]byte tunnel-net encoding into a | ||
| // *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length]. | ||
| // *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length]. | ||
| func tunnelNetToIPNet(b [5]byte) *net.IPNet { | ||
| ip := net.IPv4(b[0], b[1], b[2], b[3]) | ||
| mask := net.CIDRMask(int(b[4]), 32) | ||
| return &net.IPNet{IP: ip.To4(), Mask: mask} | ||
| } | ||
|
|
||
| // peerIPsFor31 returns both host IPs in a /31 network. Since tunnel IPs are | ||
| // globally unique (onchain-allocated), exactly one of the two will be the | ||
| // BGP neighbor address for a given user on this device. | ||
| func peerIPsFor31(tunnelNet *net.IPNet) (net.IP, net.IP) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if the caller sends something other than a /31? |
||
| ip0 := tunnelNet.IP.To4() | ||
| ip1 := make(net.IP, 4) | ||
| copy(ip1, ip0) | ||
| ip1[3] ^= 1 | ||
| return ip0, ip1 | ||
| } | ||
|
|
||
| // computeEffectiveStatus derives the BGP status to report, applying the down | ||
| // grace period: if observedUp is false but the user was last seen Up within | ||
| // gracePeriod, we still report Up to avoid transient flaps. | ||
|
|
@@ -202,48 +185,6 @@ func computeEffectiveStatus( | |
| return serviceability.BGPStatusDown | ||
| } | ||
|
|
||
| // rootNamespace is the sentinel passed to DefaultCollector / RunInNamespace | ||
| // to indicate the root (global) Linux network namespace. Arista EOS places | ||
| // the default VRF in the root namespace rather than a named namespace under | ||
| // /var/run/netns/, so there is no file to open with netns.GetFromName. | ||
| // RunInNamespace treats "" as "execute in the current namespace" (no switching). | ||
| const rootNamespace = "" | ||
|
|
||
| // vrfNamespaces builds the list of Linux network namespaces to check for BGP | ||
| // sockets and tunnel interfaces. The base namespace is always included first. | ||
| // Additional namespaces are derived from two sources: | ||
| // - Tenant VRF IDs (non-zero): replaces the trailing numeric suffix of base | ||
| // (e.g. "ns-vrf1") with each tenant's VrfId, giving e.g. "ns-vrf2". | ||
| // - Multicast users: GRE tunnels for multicast users live in the global VRF | ||
| // (the root network namespace), not in a per-tenant namespace. rootNamespace | ||
| // is appended if any user in the provided slice has UserTypeMulticast. | ||
| func vrfNamespaces(base string, tenants []serviceability.Tenant, users []serviceability.User) []string { | ||
| prefix := strings.TrimRight(base, "0123456789") | ||
| seen := map[string]struct{}{base: {}} | ||
| nss := []string{base} | ||
| for _, t := range tenants { | ||
| if t.VrfId == 0 { | ||
| continue | ||
| } | ||
| ns := prefix + strconv.FormatUint(uint64(t.VrfId), 10) | ||
| if _, ok := seen[ns]; !ok { | ||
| seen[ns] = struct{}{} | ||
| nss = append(nss, ns) | ||
| } | ||
| } | ||
| // Multicast users' GRE tunnels live in the root network namespace (global VRF). | ||
| if _, ok := seen[rootNamespace]; !ok { | ||
| for _, u := range users { | ||
| if u.UserType == serviceability.UserTypeMulticast { | ||
| seen[rootNamespace] = struct{}{} | ||
| nss = append(nss, rootNamespace) | ||
| break | ||
| } | ||
| } | ||
| } | ||
| return nss | ||
| } | ||
|
|
||
| // shouldSubmit returns true when a submission is warranted: either the status | ||
| // has changed from what was last confirmed onchain, or it is time for a | ||
| // periodic keepalive write. | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also hard-coded on line 549 below. Share the constant?