|
24 | 24 | import java.util.Map; |
25 | 25 | import java.util.Objects; |
26 | 26 | import java.util.Random; |
| 27 | +import java.util.Timer; |
| 28 | +import java.util.TimerTask; |
27 | 29 | import java.util.concurrent.ConcurrentHashMap; |
28 | 30 |
|
| 31 | +import org.apache.commons.lang3.StringUtils; |
29 | 32 | import org.apache.servicecomb.http.client.task.AbstractTask; |
30 | 33 | import org.apache.servicecomb.http.client.task.Task; |
31 | 34 | import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent; |
@@ -104,6 +107,8 @@ public static class SubscriptionValue { |
104 | 107 |
|
105 | 108 | private final Random random = new Random(); |
106 | 109 |
|
| 110 | + private Timer timer; |
| 111 | + |
107 | 112 | public ServiceCenterDiscovery(ServiceCenterClient serviceCenterClient, EventBus eventBus) { |
108 | 113 | super("service-center-discovery-task"); |
109 | 114 | this.serviceCenterClient = serviceCenterClient; |
@@ -153,7 +158,60 @@ public void onPullInstanceEvent(PullInstanceEvent event) { |
153 | 158 | return; |
154 | 159 | } |
155 | 160 | pullInstanceTaskOnceInProgress = true; |
156 | | - startTask(new PullInstanceOnceTask()); |
| 161 | + if (StringUtils.isEmpty(event.getAppId()) || StringUtils.isEmpty(event.getServiceName())) { |
| 162 | + // If the application or service name cannot be resolved, pulled all services. |
| 163 | + startTask(new PullInstanceOnceTask()); |
| 164 | + return; |
| 165 | + } |
| 166 | + try { |
| 167 | + String appId = event.getAppId(); |
| 168 | + String serviceName = event.getServiceName(); |
| 169 | + if (!refreshTargetServiceSuccess(appId, serviceName)) { |
| 170 | + int positive = random.nextInt(300); |
| 171 | + int sign = random.nextBoolean() ? 1 : -1; |
| 172 | + long delayTime = 2000L + sign * positive; |
| 173 | + if (timer == null) { |
| 174 | + timer = new Timer("event-retry-pull-task"); |
| 175 | + } |
| 176 | + timer.schedule(new PullTargetServiceTask(appId, serviceName), delayTime); |
| 177 | + } |
| 178 | + } finally { |
| 179 | + pullInstanceTaskOnceInProgress = false; |
| 180 | + } |
| 181 | + } |
| 182 | + |
| 183 | + class PullTargetServiceTask extends TimerTask { |
| 184 | + private final String appId; |
| 185 | + |
| 186 | + private final String serviceName; |
| 187 | + |
| 188 | + public PullTargetServiceTask(String appId, String serviceName) { |
| 189 | + this.appId = appId; |
| 190 | + this.serviceName = serviceName; |
| 191 | + } |
| 192 | + |
| 193 | + @Override |
| 194 | + public void run() { |
| 195 | + refreshTargetServiceSuccess(appId, serviceName); |
| 196 | + } |
| 197 | + } |
| 198 | + |
| 199 | + private boolean refreshTargetServiceSuccess(String appId, String serviceName) { |
| 200 | + SubscriptionKey currentKey = new SubscriptionKey(appId, serviceName); |
| 201 | + if (instancesCache.get(currentKey) == null) { |
| 202 | + // No pull during the service startup phase. |
| 203 | + return true; |
| 204 | + } |
| 205 | + if (LOGGER.isDebugEnabled()) { |
| 206 | + LOGGER.debug("pull [{}#{}] instances from service center", appId, serviceName); |
| 207 | + } |
| 208 | + String originRev = instancesCache.get(currentKey).revision; |
| 209 | + pullInstance(currentKey, instancesCache.get(currentKey), true); |
| 210 | + String currentRev = instancesCache.get(currentKey).revision; |
| 211 | + if (LOGGER.isDebugEnabled()) { |
| 212 | + LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev); |
| 213 | + } |
| 214 | + return !originRev.equals(currentRev); |
157 | 215 | } |
158 | 216 |
|
159 | 217 | private List<SubscriptionKey> pullInstance(SubscriptionKey k, SubscriptionValue v, boolean sendChangedEvent) { |
@@ -265,7 +323,7 @@ private static String instanceToString(List<MicroserviceInstance> instances) { |
265 | 323 | sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint); |
266 | 324 | sb.append("|"); |
267 | 325 | } |
268 | | - sb.append(instance.getServiceName()); |
| 326 | + sb.append(instance.getStatus()); |
269 | 327 | sb.append("|"); |
270 | 328 | } |
271 | 329 | sb.append("#"); |
|
0 commit comments