Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -169,10 +170,10 @@ public class ConsumeKinesis extends AbstractProcessor {
private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = Duration.ofMinutes(1);

/**
* Using a large enough value to ensure we don't wait infinitely for the initialization.
* Actual initialization shouldn't take that long.
* How long to wait for a Scheduler initialization to complete in the OnScheduled method.
* If the initialization takes longer than this, the processor will continue initialization checks in the onTrigger method.
*/
private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = Duration.ofMinutes(15);
private static final Duration KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofMinutes(3);

static final PropertyDescriptor STREAM_NAME = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -339,6 +340,9 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio

private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;

private volatile Future<InitializationResult> initializationResultFuture;
private final AtomicBoolean initialized = new AtomicBoolean();

// An instance filed, so that it can be read in getRelationships.
private volatile ProcessingStrategy processingStrategy = ProcessingStrategy.from(
PROCESSING_STRATEGY.getDefaultValue());
Expand Down Expand Up @@ -418,6 +422,8 @@ public void setup(final ProcessContext context) {
final RetrievalSpecificConfig retrievalSpecificConfig = configureRetrievalSpecificConfig(context, kinesisClient, streamName, applicationName);

final InitializationStateChangeListener initializationListener = new InitializationStateChangeListener(getLogger());
initialized.set(false);
initializationResultFuture = initializationListener.result();

kinesisScheduler = new Scheduler(
configsBuilder.checkpointConfig(),
Expand All @@ -435,34 +441,20 @@ public void setup(final ProcessContext context) {
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the onStopped method.

final InitializationResult result;
try {
result = initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
final InitializationResult result = initializationResultFuture.get(
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
checkInitializationResult(result);
} catch (final TimeoutException e) {
// During a first run the processor will take more time to initialize. We return from OnSchedule and continue waiting in the onTrigger method.
getLogger().warn("Kinesis Scheduler initialization may take up to 10 minutes on a first run, which is caused by AWS resources initialization");
} catch (final InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cleanUpState();
throw new ProcessException("Initialization failed for stream [%s]".formatted(streamName), e);
}

switch (result) {
case InitializationResult.Success ignored ->
getLogger().info(
"Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]",
streamName, applicationName, workerId);
case InitializationResult.Failure failure -> {
cleanUpState();

final ProcessException ex = failure.error()
.map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err))
// This branch is active only when a scheduler was shutdown, but no initialization error was provided.
// This behavior isn't typical and wasn't observed.
.orElseGet(() -> new ProcessException(("Initialization failed for stream [%s]").formatted(streamName)));

throw ex;
}
}
}

/**
Expand Down Expand Up @@ -575,6 +567,9 @@ private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
@OnStopped
public void onStopped() {
cleanUpState();

initialized.set(false);
initializationResultFuture = null;
}

private void cleanUpState() {
Expand Down Expand Up @@ -633,6 +628,16 @@ private void shutdownScheduler() {

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (!initialized.get()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short-circuiting by checking a bool variable, so we don't have to inspect the content of the future after initialization completed.

if (!initializationResultFuture.isDone()) {
getLogger().debug("Waiting for Kinesis Scheduler to finish initialization");
context.yield();
return;
}

checkInitializationResult(initializationResultFuture.resultNow());
}

final Optional<Lease> leaseAcquired = recordBuffer.acquireBufferLease();

leaseAcquired.ifPresentOrElse(
Expand All @@ -641,6 +646,30 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
);
}

private void checkInitializationResult(final InitializationResult initializationResult) {
switch (initializationResult) {
case InitializationResult.Success ignored -> {
final boolean wasInitialized = initialized.getAndSet(true);
if (!wasInitialized) {
getLogger().info(
"Started Kinesis Scheduler for stream [{}] with application name [{}] and workerId [{}]",
streamName, kinesisScheduler.applicationName(), kinesisScheduler.leaseManagementConfig().workerIdentifier());
}
}
case InitializationResult.Failure failure -> {
cleanUpState();

final ProcessException ex = failure.error()
.map(err -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName), err))
// This branch is active only when a scheduler was shutdown, but no initialization error was provided.
// This behavior isn't typical and wasn't observed.
.orElseGet(() -> new ProcessException("Initialization failed for stream [%s]".formatted(streamName)));

throw ex;
}
}
}

private void processRecordsFromBuffer(final ProcessSession session, final Lease lease) {
try {
final List<KinesisClientRecord> records = recordBuffer.consumeRecords(lease);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.nifi.processors.aws.kinesis;

import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.region.RegionUtil;
import org.apache.nifi.reporting.InitializationException;
Expand All @@ -34,8 +33,6 @@
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_SUCCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ConsumeKinesisTest {

Expand Down Expand Up @@ -64,19 +61,6 @@ void getRelationshipsForRecordProcessingStrategy() {
assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
}

@Test
void failInitializationWithInvalidValues() {
// KCL Scheduler initialization will fail, as the runner is configured with placeholder credentials.

// Using the processor object to avoid error wrapping by testRunner.
final ConsumeKinesis consumeKinesis = (ConsumeKinesis) testRunner.getProcessor();
final ProcessException ex = assertThrows(
ProcessException.class,
() -> consumeKinesis.setup(testRunner.getProcessContext()));

assertNotNull(ex.getCause(), "The initialization exception is expected to have a cause");
}

private static TestRunner createTestRunner() {
final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesis.class);

Expand Down
Loading