Skip to content

A bug of removing keys #27

@zhangchuanben

Description

@zhangchuanben

I was trying to remove keys from a group, but i found i can't. So i have to debug source code to find out what happening. The source codes is below

func (g *group) RemoveKeys(ctx context.Context, keys ...string) error {
	if len(keys) == 0 {
		return nil
	}

	g.Stats.RemoveKeysRequests.Add(1)
	g.Stats.RemovedKeys.Add(int64(len(keys)))

	keysByOwner := make(map[peer.Client][]string)
	var localKeys []string

	for _, key := range keys {
		owner, isRemote := g.instance.PickPeer(key)
		if isRemote {
			keysByOwner[owner] = append(keysByOwner[owner], key)
		} else {
			localKeys = append(localKeys, key)
		}
	}

	for _, key := range localKeys {
		g.LocalRemove(key)
	}

	multiErr := &MultiError{}
	errCh := make(chan error)

	// Send removeKeys requests to owners (parallel)
	var wg sync.WaitGroup
	for owner, ownerKeys := range keysByOwner {
		wg.Add(1)
		go func(p peer.Client, k []string) {
			errCh <- p.RemoveKeys(ctx, &pb.RemoveKeysRequest{
				Group: &g.name,
				Keys:  k,
			})
			wg.Done()
		}(owner, ownerKeys)
	}

	allPeers := g.instance.getAllPeers()
	for _, p := range allPeers {
		if p.PeerInfo().IsSelf {
			continue
		}
		if _, isOwner := keysByOwner[p]; isOwner {
			continue
		}

		wg.Add(1)
		go func(peer peer.Client) {
			errCh <- peer.RemoveKeys(ctx, &pb.RemoveKeysRequest{
				Group: &g.name,
				Keys:  keys,
			})
			wg.Done()
		}(p)
	}

	go func() {
		wg.Wait()
		close(errCh)
	}()

	for err := range errCh {
		if err != nil {
			multiErr.Add(err)
		}
	}

	return multiErr.NilOrError()
}

From the above code, it's seem that local cache will not be removed if i request a key that not belong to this Peer. I don't known if i am right, is anyone can help?

When i reference source codes of Remove, everything looks good, because it remove local cache first and then try to request to remove remote cache.

func (g *group) Remove(ctx context.Context, key string) error {
	_, err := g.removeGroup.Do(key, func() (interface{}, error) {

		// Remove from key owner first
		owner, isRemote := g.instance.PickPeer(key)
		if isRemote {
			if err := g.removeFromPeer(ctx, owner, key); err != nil {
				return nil, err
			}
		}
		// Remove from our cache next
		g.LocalRemove(key)
		wg := sync.WaitGroup{}
		errCh := make(chan error)

		// Asynchronously clear the key from all hot and main caches of peers
		for _, p := range g.instance.getAllPeers() {
			// avoid deleting from owner a second time
			if p == owner {
				continue
			}

			wg.Add(1)
			go func(p peer.Client) {
				errCh <- g.removeFromPeer(ctx, p, key)
				wg.Done()
			}(p)
		}
		go func() {
			wg.Wait()
			close(errCh)
		}()

		m := &MultiError{}
		for err := range errCh {
			m.Add(err)
		}

		return nil, m.NilOrError()
	})
	return err
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions