From fee6364c7a9d19eb779606b190a22185f3e91bab Mon Sep 17 00:00:00 2001 From: Uwe Trottmann Date: Fri, 16 Jun 2023 19:20:29 +0200 Subject: [PATCH] CheckRequestAggregator: remove unused, leaking, out queue. Check requests are already sent when doing a request. Even if it would work on App Engine (no background threads allowed), there is no point in sending check requests again at some later point. Check requests are still cached (and expired). --- .../java/com/google/api/control/Client.java | 50 +------ .../aggregator/CheckAggregationOptions.java | 67 ++------- .../aggregator/CheckRequestAggregator.java | 135 ++---------------- .../CheckAggregationOptionsTest.java | 53 ++----- .../CheckRequestAggregatorTest.java | 48 +++---- 5 files changed, 59 insertions(+), 294 deletions(-) diff --git a/endpoints-control/src/main/java/com/google/api/control/Client.java b/endpoints-control/src/main/java/com/google/api/control/Client.java index b2ccccb..4e44707 100644 --- a/endpoints-control/src/main/java/com/google/api/control/Client.java +++ b/endpoints-control/src/main/java/com/google/api/control/Client.java @@ -1,5 +1,6 @@ /* * Copyright 2016 Google Inc. All Rights Reserved. + * Copyright 2023 Uwe Trottmann * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,7 +92,7 @@ public Client(String serviceName, CheckAggregationOptions checkOptions, ServiceControl transport, ThreadFactory threads, SchedulerFactory schedulers, int statsLogFrequency, @Nullable Ticker ticker) { ticker = ticker == null ? Ticker.systemTicker() : ticker; - this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, null, ticker); + this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, ticker); this.reportAggregator = new ReportRequestAggregator(serviceName, reportOptions, null, ticker); this.quotaAggregator = new QuotaRequestAggregator(serviceName, quotaOptions, ticker); this.serviceName = serviceName; @@ -134,6 +135,7 @@ public void run() { scheduleFlushes(); } }); + // Note: this is not supported on App Engine Standard. schedulerThread.start(); } catch (RuntimeException e) { log.atInfo().log(BACKGROUND_THREAD_ERROR); @@ -305,7 +307,6 @@ private synchronized void initializeFlushing() { this.scheduler = schedulers.create(ticker); this.scheduler.setStatistics(statistics); log.atInfo().log("scheduling the initial check, report, and quota"); - flushAndScheduleChecks(); flushAndScheduleReports(); flushAndScheduleQuota(); } @@ -323,51 +324,6 @@ private synchronized boolean resetIfStopped() { return true; } - private void flushAndScheduleChecks() { - if (resetIfStopped()) { - log.atFine().log("did not schedule check flush: client is stopped"); - return; - } - int interval = checkAggregator.getFlushIntervalMillis(); - if (interval < 0) { - log.atFine().log("did not schedule check flush: caching is disabled"); - return; // cache is disabled, so no flushing it - } - - if (isRunningSchedulerDirectly()) { - log.atFine().log("did not schedule check flush: no scheduler thread is running"); - return; - } - - log.atFine().log("flushing the check aggregator"); - Stopwatch w = Stopwatch.createUnstarted(ticker); - for (CheckRequest req : checkAggregator.flush()) { - try { - statistics.recachedChecks.incrementAndGet(); - w.reset().start(); - CheckResponse resp = transport.services().check(serviceName, req).execute(); - statistics.totalCheckTransportTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS)); - w.reset().start(); - checkAggregator.addResponse(req, resp); - statistics.totalCheckCacheUpdateTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS)); - } catch (IOException e) { - log.atSevere().withCause(e).log("direct send of a check request %s failed", req); - } - } - // copy scheduler into a local variable to avoid data races beween this method and stop() - Scheduler currentScheduler = scheduler; - if (resetIfStopped()) { - log.atFine().log("did not schedule succeeding check flush: client is stopped"); - return; - } - currentScheduler.enter(new Runnable() { - @Override - public void run() { - flushAndScheduleChecks(); // Do this again after the interval - } - }, interval, 0 /* high priority */); - } - private void flushAndScheduleReports() { if (resetIfStopped()) { log.atFine().log("did not schedule report flush: client is stopped"); diff --git a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java index 6e929fe..5b29651 100644 --- a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java +++ b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java @@ -1,5 +1,6 @@ /* * Copyright 2016 Google Inc. All Rights Reserved. + * Copyright 2023 Uwe Trottmann * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +21,7 @@ import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -42,13 +40,7 @@ public class CheckAggregationOptions { */ public static final int DEFAULT_RESPONSE_EXPIRATION_MILLIS = 4000; - /** - * The default flush cache entry interval. - */ - public static final int DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS = 2000; - private final int numEntries; - private final int flushCacheEntryIntervalMillis; private final int expirationMillis; /** @@ -58,21 +50,13 @@ public class CheckAggregationOptions { * is the maximum number of cache entries that can be kept in the * aggregation cache. The cache is disabled if this value is * negative. - * @param flushCacheEntryIntervalMillis - * the maximum interval before an aggregated check request is - * flushed to the server. The cache entry is deleted after the - * flush * @param expirationMillis * is the maximum interval in milliseconds before a cached check - * response is invalidated. This value should be greater than - * {@code flushCacheEntryIntervalMillis}. If not, it is ignored, - * and a value of {@code flushCacheEntryIntervalMillis} is used - * instead. + * response is invalidated. */ - public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis, int expirationMillis) { + public CheckAggregationOptions(int numEntries, int expirationMillis) { this.numEntries = numEntries; - this.flushCacheEntryIntervalMillis = flushCacheEntryIntervalMillis; - this.expirationMillis = Math.max(expirationMillis, flushCacheEntryIntervalMillis + 1); + this.expirationMillis = expirationMillis; } /** @@ -81,7 +65,7 @@ public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis * Creates an instance initialized with the default values. */ public CheckAggregationOptions() { - this(DEFAULT_NUM_ENTRIES, DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, DEFAULT_RESPONSE_EXPIRATION_MILLIS); + this(DEFAULT_NUM_ENTRIES, DEFAULT_RESPONSE_EXPIRATION_MILLIS); } /** @@ -92,18 +76,9 @@ public int getNumEntries() { return numEntries; } - /** - * @return the maximum interval before aggregated report requests are - * flushed to the server - */ - public int getFlushCacheEntryIntervalMillis() { - return flushCacheEntryIntervalMillis; - } - /** * @return the maximum interval before a cached check response should be - * deleted. This value will not be greater than - * {@link #getFlushCacheEntryIntervalMillis()} + * deleted. */ public int getExpirationMillis() { return expirationMillis; @@ -115,45 +90,29 @@ public int getExpirationMillis() { * @param * the type of the instance being cached * - * @param out - * a concurrent {@code Deque} to which previously cached items - * are added as they expire * @return a {@link Cache} corresponding to this instance's values or * {@code null} unless {@link #numEntries} is positive. */ @Nullable - public Cache createCache(ConcurrentLinkedDeque out) { - return createCache(out, Ticker.systemTicker()); + public Cache createCache() { + return createCache(Ticker.systemTicker()); } /** * Creates a {@link Cache} configured by this instance. * - * @param - * the type of the value stored in the Cache - * @param out - * a concurrent {@code Deque} to which the cached values are - * added as they are removed from the cache - * @param ticker - * the time source used to determine expiration + * @param the type of the value stored in the Cache + * @param ticker the time source used to determine expiration * @return a {@link Cache} corresponding to this instance's values or - * {@code null} unless {@code #numEntries} is positive. + * {@code null} unless {@code #numEntries} is positive. */ @Nullable - public Cache createCache(final ConcurrentLinkedDeque out, Ticker ticker) { - Preconditions.checkNotNull(out, "The out deque cannot be null"); + public Cache createCache(Ticker ticker) { Preconditions.checkNotNull(ticker, "The ticker cannot be null"); if (numEntries <= 0) { return null; } - final RemovalListener listener = new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - out.addFirst(notification.getValue()); - } - }; - CacheBuilder b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker) - .removalListener(listener); + CacheBuilder b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker); if (expirationMillis >= 0) { b.expireAfterWrite(expirationMillis, TimeUnit.MILLISECONDS); } diff --git a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java index be6cf81..cf0ba4d 100644 --- a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java +++ b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java @@ -1,5 +1,6 @@ /* * Copyright 2016 Google Inc. All Rights Reserved. + * Copyright 2023 Uwe Trottmann * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +17,6 @@ package com.google.api.control.aggregator; -import com.google.api.MetricDescriptor.MetricKind; import com.google.api.servicecontrol.v1.CheckRequest; import com.google.api.servicecontrol.v1.CheckResponse; import com.google.api.servicecontrol.v1.MetricValue; @@ -27,35 +27,25 @@ import com.google.common.base.Strings; import com.google.common.base.Ticker; import com.google.common.cache.Cache; -import com.google.common.collect.Lists; -import com.google.common.flogger.FluentLogger; import com.google.common.hash.HashCode; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedDeque; + import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; /** - * Caches and aggregates {@link CheckRequest}s. + * Caches {@link CheckRequest}s. */ public class CheckRequestAggregator { /** - * The flush interval returned by {@link #getFlushIntervalMillis() } when an instance is + * The flush interval returned by {@link #getExpirationMillis() } when an instance is * configured to be non-caching. */ public static final int NON_CACHING = -1; - private static final int NANOS_PER_MILLI = 1000000; - private static final CheckRequest[] NO_REQUESTS = new CheckRequest[] {}; - private static final FluentLogger log = FluentLogger.forEnclosingClass(); - private final String serviceName; private final CheckAggregationOptions options; - private final Map kinds; - private final ConcurrentLinkedDeque out; private final Cache cache; private final Ticker ticker; @@ -64,33 +54,18 @@ public class CheckRequestAggregator { * * @param serviceName the service whose {@code CheckRequest}s are being aggregated * @param options configures this instance's caching behavior - * @param kinds specifies the {@link MetricKind} for specific metric names * @param ticker the time source used to determine expiration. When not specified, this defaults * to {@link Ticker#systemTicker()} */ public CheckRequestAggregator(String serviceName, CheckAggregationOptions options, - @Nullable Map kinds, @Nullable Ticker ticker) { + @Nullable Ticker ticker) { Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceName), "service name cannot be empty"); Preconditions.checkNotNull(options, "options must be non-null"); - this.out = new ConcurrentLinkedDeque(); this.ticker = ticker == null ? Ticker.systemTicker() : ticker; - this.cache = options.createCache(out, this.ticker); + this.cache = options.createCache(this.ticker); this.serviceName = serviceName; this.options = options; - this.kinds = kinds; - } - - /** - * Constructor. - * - * @param serviceName the service whose {@code CheckRequest}s are being aggregated - * @param options configures this instances caching behavior - * @param kinds specifies the {@link MetricKind} for specific metric names - */ - public CheckRequestAggregator(String serviceName, CheckAggregationOptions options, - @Nullable Map kinds) { - this(serviceName, options, kinds, Ticker.systemTicker()); } /** @@ -100,13 +75,13 @@ public CheckRequestAggregator(String serviceName, CheckAggregationOptions option * @param options configures this instances caching behavior */ public CheckRequestAggregator(String serviceName, CheckAggregationOptions options) { - this(serviceName, options, null); + this(serviceName, options, Ticker.systemTicker()); } /** - * @return the interval in milliseconds between calls to {@link #flush} + * @return See {@link CheckAggregationOptions#getExpirationMillis()}. */ - public int getFlushIntervalMillis() { + public int getExpirationMillis() { if (cache == null) { return NON_CACHING; } else { @@ -132,36 +107,6 @@ public void clear() { } synchronized (cache) { cache.invalidateAll(); - out.clear(); - } - } - - /** - * Flushes this instance's cache. - * - * The instance's driver should call the this method every {@link #getFlushIntervalMillis()} - * milliseconds, and send the results to the check service. - * - * @return CheckRequest[] containing the CheckRequests that were pending - */ - public CheckRequest[] flush() { - if (cache == null) { - return NO_REQUESTS; - } - - // Thread safety - the current thread cleans up the cache, which may add multiple cached - // aggregated operations to the output deque. - synchronized (cache) { - cache.cleanUp(); - ArrayList reqs = Lists.newArrayList(); - for (CachedItem item : out) { - CheckRequest req = item.extractRequest(); - if (req != null) { - reqs.add(req); - } - } - out.clear(); - return reqs.toArray(new CheckRequest[reqs.size()]); } } @@ -181,7 +126,7 @@ public void addResponse(CheckRequest req, CheckResponse resp) { synchronized (cache) { CachedItem item = cache.getIfPresent(signature); if (item == null) { - cache.put(signature, new CachedItem(resp, req, kinds, now, quotaScale)); + cache.put(signature, new CachedItem(resp, now, quotaScale)); } else { item.lastCheckTimestamp = now; item.response = resp; @@ -230,37 +175,7 @@ public void addResponse(CheckRequest req, CheckResponse resp) { if (item == null) { return null; // signal caller to send the response } else { - return handleCachedResponse(req, item); - } - } - - private boolean isCurrent(CachedItem item) { - long age = ticker.read() - item.lastCheckTimestamp; - return age < (options.getFlushCacheEntryIntervalMillis() * NANOS_PER_MILLI); - } - - private CheckResponse handleCachedResponse(CheckRequest req, CachedItem item) { - if (item.response.getCheckErrorsCount() > 0) { - if (isCurrent(item)) { - return item.response; - } - - // Not current - item.lastCheckTimestamp = ticker.read(); - return null; // signal the caller to make a new check request - } else { - if (isCurrent(item)) { - return item.response; - } - item.updateRequest(req, kinds); - if (item.isFlushing) { - log.atWarning().log("latest check request has not completed"); - } - - // Not current - item.isFlushing = true; - item.lastCheckTimestamp = ticker.read(); - return null; // signal the caller to make a new check request + return item.response; } } @@ -302,42 +217,18 @@ private static class CachedItem { long lastCheckTimestamp; int quotaScale; CheckResponse response; - private final String serviceName; - - private OperationAggregator aggregator; /** * @param response the cached {@code CheckResponse} - * @param request caused {@code response} - * @param kinds the kinds of metrics * @param lastCheckTimestamp the last time the {@code CheckRequest} for tracked by this item was * checked * @param quotaScale WIP, used to track quota */ - CachedItem(CheckResponse response, CheckRequest request, Map kinds, - long lastCheckTimestamp, int quotaScale) { + CachedItem(CheckResponse response, long lastCheckTimestamp, int quotaScale) { this.response = response; - this.serviceName = request.getServiceName(); this.lastCheckTimestamp = lastCheckTimestamp; this.quotaScale = quotaScale; - this.aggregator = new OperationAggregator(request.getOperation(), kinds); - } - - public synchronized void updateRequest(CheckRequest req, Map kinds) { - if (this.aggregator == null) { - this.aggregator = new OperationAggregator(req.getOperation(), kinds); - } else { - aggregator.add(req.getOperation()); - } } - public synchronized CheckRequest extractRequest() { - if (this.aggregator == null) { - return null; - } - Operation op = this.aggregator.asOperation(); - this.aggregator = null; - return CheckRequest.newBuilder().setServiceName(this.serviceName).setOperation(op).build(); - } } } diff --git a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java index b5359eb..8d9add1 100644 --- a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java +++ b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java @@ -1,5 +1,6 @@ /* * Copyright 2016 Google Inc. All Rights Reserved. + * Copyright 2023 Uwe Trottmann * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,21 +41,10 @@ public class CheckAggregationOptionsTest { public void defaultConstructorShouldSpecifyTheDefaultValues() { CheckAggregationOptions options = new CheckAggregationOptions(); assertEquals(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, options.getNumEntries()); - assertEquals(CheckAggregationOptions.DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, - options.getFlushCacheEntryIntervalMillis()); assertEquals(CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS, options.getExpirationMillis()); } - @Test - public void constructorShouldIgnoreLowExpirationMillis() { - CheckAggregationOptions options = - new CheckAggregationOptions(-1, 1, 0 /* this is low and will be ignored */); - assertEquals(-1, options.getNumEntries()); - assertEquals(1, options.getFlushCacheEntryIntervalMillis()); - assertEquals(2 /* cache interval + 1 */, options.getExpirationMillis()); - } - @Test public void shouldFailToCreateCacheWithANullOutputDeque() { try { @@ -70,7 +60,7 @@ public void shouldFailToCreateCacheWithANullOutputDeque() { public void shouldFailToCreateACacheWithANullTicker() { try { CheckAggregationOptions options = new CheckAggregationOptions(); - options.createCache(testDeque(), null); + options.createCache(null); fail("should have raised NullPointerException"); } catch (NullPointerException e) { // expected @@ -81,12 +71,11 @@ public void shouldFailToCreateACacheWithANullTicker() { public void shouldNotCreateACacheUnlessMaxSizeIsPositive() { for (int i : new int[] {-1, 0, 1}) { CheckAggregationOptions options = new CheckAggregationOptions(i, - CheckAggregationOptions.DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, - CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS); + CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS); if (i > 0) { - assertNotNull(options.createCache(testDeque())); + assertNotNull(options.createCache()); } else { - assertNull(options.createCache(testDeque())); + assertNull(options.createCache()); } } } @@ -95,45 +84,23 @@ public void shouldNotCreateACacheUnlessMaxSizeIsPositive() { public void shouldCreateACacheEvenIfExpirationIsNotPositive() { for (int i : new int[] {-1, 0, 1}) { CheckAggregationOptions options = - new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, i - 1, i); - assertNotNull(options.createCache(testDeque())); + new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, i); + assertNotNull(options.createCache()); } } @Test - public void shouldCreateACacheThatFlushesToTheOutputDeque() { - CheckAggregationOptions options = new CheckAggregationOptions(1, - CheckAggregationOptions.DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, - CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS); - - ConcurrentLinkedDeque deque = testDeque(); - Cache cache = options.createCache(deque); - cache.put("one", 1L); - assertEquals(cache.size(), 1); - assertEquals(deque.size(), 0); - cache.put("two", 2L); - assertEquals(cache.size(), 1); - assertEquals(deque.size(), 1); - cache.put("three", 3L); - assertEquals(cache.size(), 1); - assertEquals(deque.size(), 2); - } - - @Test - public void shouldCreateACacheThatFlushesToTheOutputDequeAfterExpiration() { + public void shouldCreateACacheThatDeletesEntryAfterExpiration() { CheckAggregationOptions options = - new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, 0, 1); + new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, 1); - ConcurrentLinkedDeque deque = testDeque(); FakeTicker ticker = new FakeTicker(); - Cache cache = options.createCache(deque, ticker); + Cache cache = options.createCache(ticker); cache.put("one", 1L); assertEquals(1, cache.size()); - assertEquals(0, deque.size()); ticker.tick(1 /* expires the entry */, TimeUnit.MILLISECONDS); cache.cleanUp(); assertEquals(0, cache.size()); - assertEquals(1, deque.size()); } private static ConcurrentLinkedDeque testDeque() { diff --git a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java index df87529..a0c3c97 100644 --- a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java +++ b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java @@ -1,5 +1,6 @@ /* * Copyright 2016 Google Inc. All Rights Reserved. + * Copyright 2023 Uwe Trottmann * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +54,7 @@ public class CheckRequestAggregatorTest { private static final int TEST_EXPIRATION = TEST_FLUSH_INTERVAL + 1; private static final Timestamp EARLY = Timestamp.newBuilder().setNanos(1).setSeconds(100).build(); private CheckRequestAggregator NO_CACHE = new CheckRequestAggregator(NO_CACHE_NAME, - new CheckAggregationOptions(-1 /* disables cache */, 2, 1)); + new CheckAggregationOptions(-1 /* disables cache */, 1)); private CheckRequestAggregator DEFAULT = new CheckRequestAggregator(DEFAULT_NAME, new CheckAggregationOptions()); private FakeTicker ticker; @@ -102,20 +103,14 @@ public void signShouldChangeAsImportFieldsChange() { assertNotEquals(withMetrics, withLabels); } - @Test - public void whenNonCachingShouldHaveEmptyFlush() { - assertEquals(0, NO_CACHE.flush().length); - } - @Test public void whenNonCachingShouldHaveWellKnownFlushInterval() { - assertEquals(CheckRequestAggregator.NON_CACHING, NO_CACHE.getFlushIntervalMillis()); + assertEquals(CheckRequestAggregator.NON_CACHING, NO_CACHE.getExpirationMillis()); } @Test public void whenNonCachingShouldNotCacheResponse() { CheckRequest req = newTestRequest("service.no_cache"); - assertEquals(0, NO_CACHE.flush().length); assertEquals(null, NO_CACHE.check(req)); CheckResponse fakeResponse = fakeResponse(); @@ -152,9 +147,9 @@ public void whenCachingShouldReturnNullInitiallyAsRequestIsNotCached() { } @Test - public void whenCachingShouldHaveExpirationAsFlushInterval() { + public void whenCachingShouldHaveExpiration() { CheckRequestAggregator agg = newCachingInstance(); - assertEquals(TEST_EXPIRATION, agg.getFlushIntervalMillis()); + assertEquals(TEST_EXPIRATION, agg.getExpirationMillis()); } @Test @@ -255,26 +250,23 @@ public void shouldExtendExpirationOnReceiptOfAResponse() { assertEquals(fakeResponse, agg.check(req)); // not expired yet } - public void shouldNotFlushRequestThatHaveNotBeenUpdated() { + @Test + public void shouldExpireRequestThatHasNotBeenUpdated() { CheckRequest req = newTestRequest(CACHING_NAME); CheckRequestAggregator agg = newCachingInstance(); CheckResponse fakeResponse = fakeResponse(); assertEquals(null, agg.check(req)); agg.addResponse(req, fakeResponse); assertEquals(fakeResponse, agg.check(req)); - ticker.tick(1, TimeUnit.MILLISECONDS); - - // now past the flush interval, nothing to expire - assertEquals(0, agg.flush().length); + ticker.tick(TEST_EXPIRATION, TimeUnit.MILLISECONDS); - // now expired, confirm nothing in the cache, and nothing flushed - ticker.tick(1, TimeUnit.MILLISECONDS); + // now expired, confirm nothing in the cache assertEquals(null, agg.check(req)); assertEquals(null, agg.check(req)); - assertEquals(0, agg.flush().length); } - public void shouldFlushRequestsThatHaveBeenUpdated() { + @Test + public void shouldNotExpireRequestThatHasBeenUpdated() { CheckRequest req = newTestRequest(CACHING_NAME); CheckRequestAggregator agg = newCachingInstance(); CheckResponse fakeResponse = fakeResponse(); @@ -283,18 +275,19 @@ public void shouldFlushRequestsThatHaveBeenUpdated() { assertEquals(fakeResponse, agg.check(req)); ticker.tick(1, TimeUnit.MILLISECONDS); - // now past the flush interval, nothing to expire - assertEquals(0, agg.flush().length); + // update request + agg.addResponse(req, fakeResponse); - // now expired, flush without checking the cache gives - // the cached request + // would be expired if not updated ticker.tick(1, TimeUnit.MILLISECONDS); - assertEquals(1, agg.flush().length); + assertEquals(fakeResponse, agg.check(req)); - // flushing again immediately should result in 0 entries - assertEquals(0, agg.flush().length); + // now expired, confirm nothing in the cache + ticker.tick(TEST_EXPIRATION, TimeUnit.MILLISECONDS); + assertEquals(null, agg.check(req)); } + @Test public void shouldClearRequests() { CheckRequest req = newTestRequest(CACHING_NAME); CheckRequestAggregator agg = newCachingInstance(); @@ -304,12 +297,11 @@ public void shouldClearRequests() { assertEquals(fakeResponse, agg.check(req)); agg.clear(); assertEquals(null, agg.check(req)); - assertEquals(0, agg.flush().length); } private CheckRequestAggregator newCachingInstance() { return new CheckRequestAggregator(CACHING_NAME, - new CheckAggregationOptions(1, TEST_FLUSH_INTERVAL, TEST_EXPIRATION), null, ticker); + new CheckAggregationOptions(1, TEST_EXPIRATION), ticker); } private static CheckResponse fakeResponse() {