Skip to content

Commit a04a0e3

Browse files
committed
part of stateWatcher code is moved to stateInit
1 parent 2b85b8d commit a04a0e3

3 files changed

Lines changed: 62 additions & 27 deletions

File tree

client_stateinit.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2025 RPCPlatform Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rpcplatform
18+
19+
import (
20+
"context"
21+
"strings"
22+
"time"
23+
24+
etcd "go.etcd.io/etcd/client/v3"
25+
)
26+
27+
func (c *Client) stateInit() (map[string]string, int64, error) {
28+
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
29+
resp, err := c.etcd.Get(ctx, c.target, etcd.WithPrefix())
30+
cancel()
31+
32+
if err != nil {
33+
return nil, 0, err
34+
}
35+
36+
serverInfo := make(map[string]string, len(resp.Kvs))
37+
for _, kv := range resp.Kvs {
38+
trimKey := strings.TrimPrefix(string(kv.Key), c.target)
39+
serverInfo[trimKey] = string(kv.Value)
40+
}
41+
42+
c.updateState(true, serverInfo)
43+
return serverInfo, resp.Header.Revision, nil
44+
}

client_statewatcher.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,20 @@ import (
2020
"context"
2121
"strings"
2222
"sync"
23-
"time"
2423

2524
etcd "go.etcd.io/etcd/client/v3"
2625
)
2726

28-
func (c *Client) stateWatcher() error {
29-
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
30-
resp, err := c.etcd.Get(ctx, c.target, etcd.WithPrefix())
31-
cancel()
32-
33-
if err != nil {
34-
return err
35-
}
36-
37-
serverInfo := make(map[string]string, len(resp.Kvs))
38-
for _, kv := range resp.Kvs {
39-
trimKey := strings.TrimPrefix(string(kv.Key), c.target)
40-
serverInfo[trimKey] = string(kv.Value)
41-
}
42-
43-
c.updateState(true, serverInfo)
44-
27+
func (c *Client) stateWatcher(serverInfo map[string]string, revision int64) error {
4528
watchChan := c.etcd.Watch(context.Background(), c.target,
46-
etcd.WithPrefix(), etcd.WithRev(resp.Header.Revision+1),
29+
etcd.WithPrefix(), etcd.WithRev(revision+1),
4730
)
4831

49-
var wg sync.WaitGroup
32+
var (
33+
cancel = func() {}
34+
ctx context.Context
35+
wg sync.WaitGroup
36+
)
5037

5138
go func() {
5239
for data := range watchChan {

rpcplatform_newclient.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,23 @@ func (p *RPCPlatform) NewClient(target string, attributes *ClientAttributes) (*C
4444
return nil, err
4545
}
4646

47-
client, err := grpc.NewClient(target, options...)
48-
if err != nil {
49-
return nil, err
50-
}
51-
5247
c := &Client{
5348
target: p.config.EtcdPrefix + gears.FixPath(target) + "/",
5449
etcd: p.config.EtcdClient,
5550
resolver: resolver,
56-
client: client,
5751
}
5852

59-
if err = c.stateWatcher(); err != nil {
53+
serverInfo, revision, err := c.stateInit()
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
c.client, err = grpc.NewClient(target, options...)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
if err = c.stateWatcher(serverInfo, revision); err != nil {
6064
return nil, err
6165
}
6266

0 commit comments

Comments
 (0)