From 48291ba17afa98d6316715d1be19301bdd2ea580 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alaksiej=20=C5=A0=C4=8Darbaty?= Date: Tue, 16 Dec 2025 17:04:39 +0100 Subject: [PATCH 1/3] NIFI-15307 ConsumeKinesis. Wait for long initialization in onTrigger --- .../aws/kinesis/ConsumeKinesis.java | 77 +++++++++++++------ .../aws/kinesis/ConsumeKinesisTest.java | 25 +++++- 2 files changed, 77 insertions(+), 25 deletions(-) diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java index 8e4342b5419d..f1ccdbafdcdf 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java @@ -100,6 +100,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -169,10 +170,10 @@ public class ConsumeKinesis extends AbstractProcessor { private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = Duration.ofMinutes(1); /** - * Using a large enough value to ensure we don't wait infinitely for the initialization. - * Actual initialization shouldn't take that long. + * How long to wait for a Scheduler initialization to complete in the OnScheduled method. + * If the initialization takes longer than this, the processor will continue initialization checks in the onTrigger method. */ - private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = Duration.ofMinutes(15); + private static final Duration KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT = Duration.ofSeconds(30); private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofMinutes(3); static final PropertyDescriptor STREAM_NAME = new PropertyDescriptor.Builder() @@ -339,6 +340,9 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio private volatile @Nullable ReaderRecordProcessor readerRecordProcessor; + private volatile Future initializationResultFuture; + private volatile AtomicBoolean initialized; + // An instance filed, so that it can be read in getRelationships. private volatile ProcessingStrategy processingStrategy = ProcessingStrategy.from( PROCESSING_STRATEGY.getDefaultValue()); @@ -418,6 +422,8 @@ public void setup(final ProcessContext context) { final RetrievalSpecificConfig retrievalSpecificConfig = configureRetrievalSpecificConfig(context, kinesisClient, streamName, applicationName); final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener(getLogger()); + initialized = new AtomicBoolean(false); + initializationResultFuture = initializationListener.result(); kinesisScheduler = new Scheduler( configsBuilder.checkpointConfig(), @@ -435,34 +441,20 @@ public void setup(final ProcessContext context) { schedulerThread.start(); // The thread is stopped when kinesisScheduler is shutdown in the onStopped method. - final InitializationResult result; try { - result = initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS); - } catch (final InterruptedException | ExecutionException | TimeoutException e) { + final InitializationResult result = initializationResultFuture.get( + KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS); + checkInitializationResult(result); + } catch (final TimeoutException e) { + // During a first run the processor will take more time to initialize. We return from OnSchedule and continue waiting in the onTrigger method. + getLogger().warn("Kinesis Scheduler initialization may take up to 10 minutes on a first run, which is caused by AWS resources initialization"); + } catch (final InterruptedException | ExecutionException e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } cleanUpState(); throw new ProcessException("Initialization failed for stream [%s]".formatted(streamName), e); } - - switch (result) { - case InitializationResult.Success ignored -> - getLogger().info( - "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]", - streamName, applicationName, workerId); - case InitializationResult.Failure failure -> { - cleanUpState(); - - final ProcessException ex = failure.error() - .map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err)) - // This branch is active only when a scheduler was shutdown, but no initialization error was provided. - // This behavior isn't typical and wasn't observed. - .orElseGet(() -> new ProcessException(("Initialization failed for stream [%s]").formatted(streamName))); - - throw ex; - } - } } /** @@ -575,6 +567,9 @@ private static RetrievalSpecificConfig configureRetrievalSpecificConfig( @OnStopped public void onStopped() { cleanUpState(); + + initialized = null; + initializationResultFuture = null; } private void cleanUpState() { @@ -633,6 +628,16 @@ private void shutdownScheduler() { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (!initialized.get()) { + if (!initializationResultFuture.isDone()) { + getLogger().debug("Waiting for Kinesis Scheduler to finish initialization"); + context.yield(); + return; + } + + checkInitializationResult(initializationResultFuture.resultNow()); + } + final Optional leaseAcquired = recordBuffer.acquireBufferLease(); leaseAcquired.ifPresentOrElse( @@ -641,6 +646,30 @@ public void onTrigger(final ProcessContext context, final ProcessSession session ); } + private void checkInitializationResult(final InitializationResult initializationResult) { + switch (initializationResult) { + case InitializationResult.Success ignored -> { + boolean wasInitialized = initialized.getAndSet(true); + if (!wasInitialized) { + getLogger().info( + "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]", + streamName, kinesisScheduler.applicationName(), kinesisScheduler.leaseManagementConfig().workerIdentifier()); + } + } + case InitializationResult.Failure failure -> { + cleanUpState(); + + final ProcessException ex = failure.error() + .map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err)) + // This branch is active only when a scheduler was shutdown, but no initialization error was provided. + // This behavior isn't typical and wasn't observed. + .orElseGet(() -> new ProcessException(( "Initialization failed for stream [%s]").formatted(streamName))); + + throw ex; + } + } + } + private void processRecordsFromBuffer(final ProcessSession session, final Lease lease) { try { final List records = recordBuffer.consumeRecords(lease); diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java index 0b8323dd1c62..f788e7b4e22a 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java @@ -16,17 +16,25 @@ */ package org.apache.nifi.processors.aws.kinesis; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.region.RegionUtil; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.SharedSessionState; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import java.time.Duration; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY; import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE; @@ -36,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Timeout.ThreadMode.SEPARATE_THREAD; class ConsumeKinesisTest { @@ -65,14 +74,28 @@ void getRelationshipsForRecordProcessingStrategy() { } @Test + // It takes around 30 seconds for a scheduler to fail in this test. + @Timeout(value = 3, unit = TimeUnit.MINUTES, threadMode = SEPARATE_THREAD) void failInitializationWithInvalidValues() { // KCL Scheduler initialization will fail, as the runner is configured with placeholder credentials. // Using the processor object to avoid error wrapping by testRunner. final ConsumeKinesis consumeKinesis = (ConsumeKinesis) testRunner.getProcessor(); + + final ProcessContext context = testRunner.getProcessContext(); + final ProcessSession session = new MockProcessSession(new SharedSessionState(consumeKinesis, new AtomicLong()), consumeKinesis); + final ProcessException ex = assertThrows( ProcessException.class, - () -> consumeKinesis.setup(testRunner.getProcessContext())); + () -> { + // The error might occur either in @OnScheduled... + consumeKinesis.setup(context); + while (true) { + // ... or in onTrigger, if the initialization takes longer. + consumeKinesis.onTrigger(context, session); + Thread.sleep(Duration.ofSeconds(1)); + } + }); assertNotNull(ex.getCause(), "The initialization exception is expected to have a cause"); } From fc1e58e9275c62fb5e3bfbf7e01069d0ed478e84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alaksiej=20=C5=A0=C4=8Darbaty?= Date: Mon, 5 Jan 2026 11:13:00 +0100 Subject: [PATCH 2/3] NIFI-15307 Remove long unit test --- .../aws/kinesis/ConsumeKinesis.java | 6 +-- .../aws/kinesis/ConsumeKinesisTest.java | 39 ------------------- 2 files changed, 3 insertions(+), 42 deletions(-) diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java index f1ccdbafdcdf..ae750585b9b9 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java @@ -341,7 +341,7 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio private volatile @Nullable ReaderRecordProcessor readerRecordProcessor; private volatile Future initializationResultFuture; - private volatile AtomicBoolean initialized; + private final AtomicBoolean initialized = new AtomicBoolean(); // An instance filed, so that it can be read in getRelationships. private volatile ProcessingStrategy processingStrategy = ProcessingStrategy.from( @@ -422,7 +422,7 @@ public void setup(final ProcessContext context) { final RetrievalSpecificConfig retrievalSpecificConfig = configureRetrievalSpecificConfig(context, kinesisClient, streamName, applicationName); final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener(getLogger()); - initialized = new AtomicBoolean(false); + initialized.set(false); initializationResultFuture = initializationListener.result(); kinesisScheduler = new Scheduler( @@ -568,7 +568,7 @@ private static RetrievalSpecificConfig configureRetrievalSpecificConfig( public void onStopped() { cleanUpState(); - initialized = null; + initialized.set(false); initializationResultFuture = null; } diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java index f788e7b4e22a..3d105cb0bf19 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java @@ -16,25 +16,16 @@ */ package org.apache.nifi.processors.aws.kinesis; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.region.RegionUtil; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.MockProcessSession; -import org.apache.nifi.util.SharedSessionState; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import java.time.Duration; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY; import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE; @@ -42,9 +33,6 @@ import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE; import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_SUCCESS; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Timeout.ThreadMode.SEPARATE_THREAD; class ConsumeKinesisTest { @@ -73,33 +61,6 @@ void getRelationshipsForRecordProcessingStrategy() { assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships); } - @Test - // It takes around 30 seconds for a scheduler to fail in this test. - @Timeout(value = 3, unit = TimeUnit.MINUTES, threadMode = SEPARATE_THREAD) - void failInitializationWithInvalidValues() { - // KCL Scheduler initialization will fail, as the runner is configured with placeholder credentials. - - // Using the processor object to avoid error wrapping by testRunner. - final ConsumeKinesis consumeKinesis = (ConsumeKinesis) testRunner.getProcessor(); - - final ProcessContext context = testRunner.getProcessContext(); - final ProcessSession session = new MockProcessSession(new SharedSessionState(consumeKinesis, new AtomicLong()), consumeKinesis); - - final ProcessException ex = assertThrows( - ProcessException.class, - () -> { - // The error might occur either in @OnScheduled... - consumeKinesis.setup(context); - while (true) { - // ... or in onTrigger, if the initialization takes longer. - consumeKinesis.onTrigger(context, session); - Thread.sleep(Duration.ofSeconds(1)); - } - }); - - assertNotNull(ex.getCause(), "The initialization exception is expected to have a cause"); - } - private static TestRunner createTestRunner() { final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesis.class); From b6fcbcff88385cca266a988f24a9280cc1f702f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alaksiej=20=C5=A0=C4=8Darbaty?= Date: Mon, 5 Jan 2026 12:17:59 +0100 Subject: [PATCH 3/3] NIFI-15307 Checkstyle --- .../apache/nifi/processors/aws/kinesis/ConsumeKinesis.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java index ae750585b9b9..751ac6feef6b 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java @@ -649,7 +649,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session private void checkInitializationResult(final InitializationResult initializationResult) { switch (initializationResult) { case InitializationResult.Success ignored -> { - boolean wasInitialized = initialized.getAndSet(true); + final boolean wasInitialized = initialized.getAndSet(true); if (!wasInitialized) { getLogger().info( "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]", @@ -663,7 +663,7 @@ private void checkInitializationResult(final InitializationResult initialization .map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err)) // This branch is active only when a scheduler was shutdown, but no initialization error was provided. // This behavior isn't typical and wasn't observed. - .orElseGet(() -> new ProcessException(( "Initialization failed for stream [%s]").formatted(streamName))); + .orElseGet(() -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName))); throw ex; }