diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java index 8e4342b5419d..751ac6feef6b 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java @@ -100,6 +100,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -169,10 +170,10 @@ public class ConsumeKinesis extends AbstractProcessor { private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = Duration.ofMinutes(1); /** - * Using a large enough value to ensure we don't wait infinitely for the initialization. - * Actual initialization shouldn't take that long. + * How long to wait for a Scheduler initialization to complete in the OnScheduled method. + * If the initialization takes longer than this, the processor will continue initialization checks in the onTrigger method. */ - private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = Duration.ofMinutes(15); + private static final Duration KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT = Duration.ofSeconds(30); private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofMinutes(3); static final PropertyDescriptor STREAM_NAME = new PropertyDescriptor.Builder() @@ -339,6 +340,9 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio private volatile @Nullable ReaderRecordProcessor readerRecordProcessor; + private volatile Future initializationResultFuture; + private final AtomicBoolean initialized = new AtomicBoolean(); + // An instance filed, so that it can be read in getRelationships. private volatile ProcessingStrategy processingStrategy = ProcessingStrategy.from( PROCESSING_STRATEGY.getDefaultValue()); @@ -418,6 +422,8 @@ public void setup(final ProcessContext context) { final RetrievalSpecificConfig retrievalSpecificConfig = configureRetrievalSpecificConfig(context, kinesisClient, streamName, applicationName); final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener(getLogger()); + initialized.set(false); + initializationResultFuture = initializationListener.result(); kinesisScheduler = new Scheduler( configsBuilder.checkpointConfig(), @@ -435,34 +441,20 @@ public void setup(final ProcessContext context) { schedulerThread.start(); // The thread is stopped when kinesisScheduler is shutdown in the onStopped method. - final InitializationResult result; try { - result = initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS); - } catch (final InterruptedException | ExecutionException | TimeoutException e) { + final InitializationResult result = initializationResultFuture.get( + KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS); + checkInitializationResult(result); + } catch (final TimeoutException e) { + // During a first run the processor will take more time to initialize. We return from OnSchedule and continue waiting in the onTrigger method. + getLogger().warn("Kinesis Scheduler initialization may take up to 10 minutes on a first run, which is caused by AWS resources initialization"); + } catch (final InterruptedException | ExecutionException e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } cleanUpState(); throw new ProcessException("Initialization failed for stream [%s]".formatted(streamName), e); } - - switch (result) { - case InitializationResult.Success ignored -> - getLogger().info( - "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]", - streamName, applicationName, workerId); - case InitializationResult.Failure failure -> { - cleanUpState(); - - final ProcessException ex = failure.error() - .map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err)) - // This branch is active only when a scheduler was shutdown, but no initialization error was provided. - // This behavior isn't typical and wasn't observed. - .orElseGet(() -> new ProcessException(("Initialization failed for stream [%s]").formatted(streamName))); - - throw ex; - } - } } /** @@ -575,6 +567,9 @@ private static RetrievalSpecificConfig configureRetrievalSpecificConfig( @OnStopped public void onStopped() { cleanUpState(); + + initialized.set(false); + initializationResultFuture = null; } private void cleanUpState() { @@ -633,6 +628,16 @@ private void shutdownScheduler() { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (!initialized.get()) { + if (!initializationResultFuture.isDone()) { + getLogger().debug("Waiting for Kinesis Scheduler to finish initialization"); + context.yield(); + return; + } + + checkInitializationResult(initializationResultFuture.resultNow()); + } + final Optional leaseAcquired = recordBuffer.acquireBufferLease(); leaseAcquired.ifPresentOrElse( @@ -641,6 +646,30 @@ public void onTrigger(final ProcessContext context, final ProcessSession session ); } + private void checkInitializationResult(final InitializationResult initializationResult) { + switch (initializationResult) { + case InitializationResult.Success ignored -> { + final boolean wasInitialized = initialized.getAndSet(true); + if (!wasInitialized) { + getLogger().info( + "Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]", + streamName, kinesisScheduler.applicationName(), kinesisScheduler.leaseManagementConfig().workerIdentifier()); + } + } + case InitializationResult.Failure failure -> { + cleanUpState(); + + final ProcessException ex = failure.error() + .map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err)) + // This branch is active only when a scheduler was shutdown, but no initialization error was provided. + // This behavior isn't typical and wasn't observed. + .orElseGet(() -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName))); + + throw ex; + } + } + } + private void processRecordsFromBuffer(final ProcessSession session, final Lease lease) { try { final List records = recordBuffer.consumeRecords(lease); diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java index 0b8323dd1c62..3d105cb0bf19 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.aws.kinesis; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.region.RegionUtil; import org.apache.nifi.reporting.InitializationException; @@ -34,8 +33,6 @@ import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE; import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_SUCCESS; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; class ConsumeKinesisTest { @@ -64,19 +61,6 @@ void getRelationshipsForRecordProcessingStrategy() { assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships); } - @Test - void failInitializationWithInvalidValues() { - // KCL Scheduler initialization will fail, as the runner is configured with placeholder credentials. - - // Using the processor object to avoid error wrapping by testRunner. - final ConsumeKinesis consumeKinesis = (ConsumeKinesis) testRunner.getProcessor(); - final ProcessException ex = assertThrows( - ProcessException.class, - () -> consumeKinesis.setup(testRunner.getProcessContext())); - - assertNotNull(ex.getCause(), "The initialization exception is expected to have a cause"); - } - private static TestRunner createTestRunner() { final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesis.class);