From 17d5fe560ad82e1bd918490690d712277a3efcf1 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 17 Nov 2025 17:13:40 +0100 Subject: [PATCH] MINIFICPP-2680 Move Amazon Kinesis tests to modular docker tests --- .../integration/cluster/ContainerStore.py | 18 ----- .../integration/cluster/DockerTestCluster.py | 22 ------- docker/test/integration/cluster/ImageStore.py | 5 -- .../cluster/checkers/AwsChecker.py | 66 ------------------- .../containers/KinesisServerContainer.py | 41 ------------ .../cluster/containers/S3ServerContainer.py | 40 ----------- .../MiNiFi_integration_test_driver.py | 15 ----- .../test/integration/features/steps/steps.py | 40 ----------- .../minifi/processors/DeleteS3Object.py | 42 ------------ .../minifi/processors/FetchS3Object.py | 41 ------------ .../integration/minifi/processors/ListS3.py | 41 ------------ .../minifi/processors/PutKinesisStream.py | 42 ------------ .../minifi/processors/PutS3Object.py | 43 ------------ .../aws/tests}/features/kinesis.feature | 22 ++++--- .../resources/kinesis-mock/Dockerfile | 0 .../kinesis-mock/consumer/consumer.js | 0 .../kinesis-mock/consumer/package.json | 0 .../resources/kinesis-mock/server.json | 0 .../steps/kinesis_server_container.py | 50 ++++++++++++++ .../features/steps/s3_server_container.py | 4 -- extensions/aws/tests/features/steps/steps.py | 15 ++++- 21 files changed, 77 insertions(+), 470 deletions(-) delete mode 100644 docker/test/integration/cluster/checkers/AwsChecker.py delete mode 100644 docker/test/integration/cluster/containers/KinesisServerContainer.py delete mode 100644 docker/test/integration/cluster/containers/S3ServerContainer.py delete mode 100644 docker/test/integration/minifi/processors/DeleteS3Object.py delete mode 100644 docker/test/integration/minifi/processors/FetchS3Object.py delete mode 100644 docker/test/integration/minifi/processors/ListS3.py delete mode 100644 docker/test/integration/minifi/processors/PutKinesisStream.py delete mode 100644 docker/test/integration/minifi/processors/PutS3Object.py rename {docker/test/integration => extensions/aws/tests}/features/kinesis.feature (59%) rename {docker/test/integration => extensions/aws/tests/features}/resources/kinesis-mock/Dockerfile (100%) rename {docker/test/integration => extensions/aws/tests/features}/resources/kinesis-mock/consumer/consumer.js (100%) rename {docker/test/integration => extensions/aws/tests/features}/resources/kinesis-mock/consumer/package.json (100%) rename {docker/test/integration => extensions/aws/tests/features}/resources/kinesis-mock/server.json (100%) create mode 100644 extensions/aws/tests/features/steps/kinesis_server_container.py diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 10b093dee4..6d87739bb5 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -18,8 +18,6 @@ from .containers.MinifiContainer import MinifiContainer from .containers.NifiContainer import NifiContainer from .containers.NifiContainer import NiFiOptions -from .containers.KinesisServerContainer import KinesisServerContainer -from .containers.S3ServerContainer import S3ServerContainer from .containers.AzureStorageServerContainer import AzureStorageServerContainer from .containers.HttpProxyContainer import HttpProxyContainer from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer @@ -115,22 +113,6 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c network=self.network, image_store=self.image_store, command=command)) - elif engine == 's3-server': - return self.containers.setdefault(container_name, - S3ServerContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) - elif engine == 'kinesis-server': - return self.containers.setdefault(container_name, - KinesisServerContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) elif engine == 'azure-storage-server': return self.containers.setdefault(container_name, AzureStorageServerContainer(feature_context=feature_context, diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 15714f5eae..93801e83e3 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -25,7 +25,6 @@ from .ContainerStore import ContainerStore from .DockerCommunicator import DockerCommunicator from .MinifiControllerExecutor import MinifiControllerExecutor -from .checkers.AwsChecker import AwsChecker from .checkers.AzureChecker import AzureChecker from .checkers.PostgresChecker import PostgresChecker from .checkers.PrometheusChecker import PrometheusChecker @@ -40,7 +39,6 @@ def __init__(self, context, feature_id): self.vols = {} self.container_communicator = DockerCommunicator() self.container_store = ContainerStore(self.container_communicator.create_docker_network(feature_id), context.image_store, context.kubernetes_proxy, feature_id=feature_id) - self.aws_checker = AwsChecker(self.container_communicator) self.azure_checker = AzureChecker(self.container_communicator) self.postgres_checker = PostgresChecker(self.container_communicator) self.prometheus_checker = PrometheusChecker() @@ -190,26 +188,6 @@ def check_http_proxy_access(self, container_name, url): and output.count("TCP_MISS") >= output.count("TCP_DENIED")) or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output) - def check_kinesis_server_record_data(self, container_name, record_data): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.aws_checker.check_kinesis_server_record_data(container_name, record_data) - - def check_s3_server_object_data(self, container_name, test_data): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.aws_checker.check_s3_server_object_data(container_name, test_data) - - def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.aws_checker.check_s3_server_object_hash(container_name, expected_file_hash) - - def check_s3_server_object_metadata(self, container_name, content_type="application/octet-stream", metadata=dict()): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.aws_checker.check_s3_server_object_metadata(container_name, content_type, metadata) - - def is_s3_bucket_empty(self, container_name): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.aws_checker.is_s3_bucket_empty(container_name) - def check_azure_storage_server_data(self, container_name, test_data): container_name = self.container_store.get_container_name_with_postfix(container_name) return self.azure_checker.check_azure_storage_server_data(container_name, test_data) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 360af8a7cc..50025e37ab 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -65,8 +65,6 @@ def get_image(self, container_engine): image = self.__build_postgresql_server_image() elif container_engine == "mqtt-broker": image = self.__build_mqtt_broker_image() - elif container_engine == "kinesis-server": - image = self.__build_kinesis_image() else: raise Exception("There is no associated image for " + container_engine) @@ -288,9 +286,6 @@ def __build_mqtt_broker_image(self): return self.__build_image(dockerfile) - def __build_kinesis_image(self): - return self.__build_image_by_path(self.test_dir + "/resources/kinesis-mock", 'kinesis-server') - def __build_image(self, dockerfile, context_files=[]): conf_dockerfile_buffer = BytesIO() docker_context_buffer = BytesIO() diff --git a/docker/test/integration/cluster/checkers/AwsChecker.py b/docker/test/integration/cluster/checkers/AwsChecker.py deleted file mode 100644 index c9e4ea0576..0000000000 --- a/docker/test/integration/cluster/checkers/AwsChecker.py +++ /dev/null @@ -1,66 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json -from utils import retry_check - - -class AwsChecker: - def __init__(self, container_communicator): - self.container_communicator = container_communicator - - @retry_check() - def check_kinesis_server_record_data(self, container_name, record_data): - (code, output) = self.container_communicator.execute_command(container_name, ["node", "/app/consumer/consumer.js", record_data]) - return code == 0 - - @retry_check() - def check_s3_server_object_data(self, container_name, test_data): - (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"]) - if code != 0: - return False - s3_mock_dir = output.strip() - (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/binaryData"]) - return code == 0 and file_data == test_data - - @retry_check() - def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str): - (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"]) - if code != 0: - return False - dir_candidates = output.split("\n") - for candidate in dir_candidates: - if "multiparts" not in candidate: - s3_mock_dir = candidate - break - (code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"]) - if code != 0: - return False - file_hash = md5_output.split(' ')[0].strip() - return file_hash == expected_file_hash - - @retry_check() - def check_s3_server_object_metadata(self, container_name, content_type="application/octet-stream", metadata=dict()): - (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"]) - if code != 0: - return False - s3_mock_dir = output.strip() - (code, output) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/objectMetadata.json"]) - server_metadata = json.loads(output) - return code == 0 and server_metadata["contentType"] == content_type and metadata == server_metadata["userMetadata"] - - @retry_check() - def is_s3_bucket_empty(self, container_name): - (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"]) - return code == 0 and not output.strip() diff --git a/docker/test/integration/cluster/containers/KinesisServerContainer.py b/docker/test/integration/cluster/containers/KinesisServerContainer.py deleted file mode 100644 index 0bee46cef7..0000000000 --- a/docker/test/integration/cluster/containers/KinesisServerContainer.py +++ /dev/null @@ -1,41 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -from .Container import Container - - -class KinesisServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command): - super().__init__(feature_context, name, 'kinesis-server', vols, network, image_store, command) - - def get_startup_finished_log_entry(self): - return "Starting Kinesis Plain Mock Service on port 4568" - - def deploy(self): - if not self.set_deployed(): - return - - logging.info('Creating and running kinesis server docker container...') - self.client.containers.run( - self.image_store.get_image(self.get_engine()), - detach=True, - name=self.name, - network=self.network.name, - environment=["INITIALIZE_STREAMS=test_stream:3", - "LOG_LEVEL=DEBUG"], - entrypoint=self.command) - logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/cluster/containers/S3ServerContainer.py b/docker/test/integration/cluster/containers/S3ServerContainer.py deleted file mode 100644 index 1b46957597..0000000000 --- a/docker/test/integration/cluster/containers/S3ServerContainer.py +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -from .Container import Container - - -class S3ServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command): - super().__init__(feature_context, name, 's3-server', vols, network, image_store, command) - - def get_startup_finished_log_entry(self): - return "Started S3MockApplication" - - def deploy(self): - if not self.set_deployed(): - return - - logging.info('Creating and running s3 server docker container...') - self.client.containers.run( - "adobe/s3mock:3.12.0", - detach=True, - name=self.name, - network=self.network.name, - environment=["initialBuckets=test_bucket"], - entrypoint=self.command) - logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 30c3d7f721..d3dd8dd5c2 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -275,21 +275,6 @@ def __validate(self, validator): assert not self.cluster.segfault_happened() or self.cluster.log_app_output() assert validator.validate() or self.cluster.log_app_output() - def check_kinesis_server_record_data(self, kinesis_container_name, record_data): - assert self.cluster.check_kinesis_server_record_data(kinesis_container_name, record_data) or self.cluster.log_app_output() - - def check_s3_server_object_data(self, s3_container_name, object_data): - assert self.cluster.check_s3_server_object_data(s3_container_name, object_data) or self.cluster.log_app_output() - - def check_s3_server_large_object_data(self, s3_container_name: str): - assert self.cluster.check_s3_server_object_hash(s3_container_name, self.test_file_hash) or self.cluster.log_app_output() - - def check_s3_server_object_metadata(self, s3_container_name, content_type): - assert self.cluster.check_s3_server_object_metadata(s3_container_name, content_type) or self.cluster.log_app_output() - - def check_empty_s3_bucket(self, s3_container_name): - assert self.cluster.is_s3_bucket_empty(s3_container_name) or self.cluster.log_app_output() - def check_http_proxy_access(self, http_proxy_container_name, url): assert self.cluster.check_http_proxy_access(http_proxy_container_name, url) or self.cluster.log_app_output() diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 1150a569a9..9757441dd7 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -120,11 +120,8 @@ def step_impl(context, processor_type, minifi_container_name): @given("a {processor_type} processor") -@given("a {processor_type} processor set up to communicate with an s3 server") -@given("a {processor_type} processor set up to communicate with the same s3 server") @given("a {processor_type} processor set up to communicate with an Azure blob storage") @given("a {processor_type} processor set up to communicate with an MQTT broker instance") -@given("a {processor_type} processor set up to communicate with the kinesis server") def step_impl(context, processor_type): __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow") @@ -499,18 +496,6 @@ def step_impl(context): context.test.start('mqtt-broker') -# s3 setup -@given("a s3 server is set up in correspondence with the PutS3Object") -@given("a s3 server is set up in correspondence with the DeleteS3Object") -def step_impl(context): - context.test.acquire_container(context=context, name="s3-server", engine="s3-server") - - -@given("a kinesis server is set up in correspondence with the PutKinesisStream") -def step_impl(context): - context.test.acquire_container(context=context, name="kinesis-server", engine="kinesis-server") - - # azure storage setup @given("an Azure storage server is set up") def step_impl(context): @@ -730,31 +715,6 @@ def step_impl(context, url): context.test.check_http_proxy_access('http-proxy', url) -@then("there is a record on the kinesis server with \"{record_data}\"") -def step_impl(context, record_data): - context.test.check_kinesis_server_record_data("kinesis-server", record_data) - - -@then("the object on the s3 server is \"{object_data}\"") -def step_impl(context, object_data): - context.test.check_s3_server_object_data("s3-server", object_data) - - -@then("the object on the s3 server is present and matches the original hash") -def step_impl(context): - context.test.check_s3_server_large_object_data("s3-server") - - -@then("the object content type on the s3 server is \"{content_type}\" and the object metadata matches use metadata") -def step_impl(context, content_type): - context.test.check_s3_server_object_metadata("s3-server", content_type) - - -@then("the object bucket on the s3 server is empty") -def step_impl(context): - context.test.check_empty_s3_bucket("s3-server") - - # Azure @when("test blob \"{blob_name}\" with the content \"{content}\" is created on Azure blob storage") def step_impl(context, blob_name, content): diff --git a/docker/test/integration/minifi/processors/DeleteS3Object.py b/docker/test/integration/minifi/processors/DeleteS3Object.py deleted file mode 100644 index 0f56c02cf6..0000000000 --- a/docker/test/integration/minifi/processors/DeleteS3Object.py +++ /dev/null @@ -1,42 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class DeleteS3Object(Processor): - def __init__( - self, - context, - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password=''): - super(DeleteS3Object, self).__init__( - context=context, - clazz='DeleteS3Object', - properties={ - 'Object Key': 'test_object_key', - 'Bucket': 'test_bucket', - 'Access Key': 'test_access_key', - 'Secret Key': 'test_secret', - 'Endpoint Override URL': f"http://s3-server-{context.feature_id}:9090", - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'Proxy Username': proxy_username, - 'Proxy Password': proxy_password, - }, - auto_terminate=['success']) diff --git a/docker/test/integration/minifi/processors/FetchS3Object.py b/docker/test/integration/minifi/processors/FetchS3Object.py deleted file mode 100644 index 0c60096da2..0000000000 --- a/docker/test/integration/minifi/processors/FetchS3Object.py +++ /dev/null @@ -1,41 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class FetchS3Object(Processor): - def __init__(self, - context, - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password=''): - super(FetchS3Object, self).__init__( - context=context, - clazz='FetchS3Object', - properties={ - 'Object Key': 'test_object_key', - 'Bucket': 'test_bucket', - 'Access Key': 'test_access_key', - 'Secret Key': 'test_secret', - 'Endpoint Override URL': f"http://s3-server-{context.feature_id}:9090", - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'Proxy Username': proxy_username, - 'Proxy Password': proxy_password, - }, - auto_terminate=['success', 'failure']) diff --git a/docker/test/integration/minifi/processors/ListS3.py b/docker/test/integration/minifi/processors/ListS3.py deleted file mode 100644 index 9f54064a28..0000000000 --- a/docker/test/integration/minifi/processors/ListS3.py +++ /dev/null @@ -1,41 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class ListS3(Processor): - def __init__(self, - context, - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password=''): - super(ListS3, self).__init__( - context=context, - clazz='ListS3', - properties={ - 'Bucket': 'test_bucket', - 'Access Key': 'test_access_key', - 'Secret Key': 'test_secret', - 'Endpoint Override URL': f"http://s3-server-{context.feature_id}:9090", - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'Proxy Username': proxy_username, - 'Proxy Password': proxy_password, - }, - schedule={'scheduling period': '2 sec'}, - auto_terminate=['success']) diff --git a/docker/test/integration/minifi/processors/PutKinesisStream.py b/docker/test/integration/minifi/processors/PutKinesisStream.py deleted file mode 100644 index 40e55e1c2d..0000000000 --- a/docker/test/integration/minifi/processors/PutKinesisStream.py +++ /dev/null @@ -1,42 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class PutKinesisStream(Processor): - def __init__( - self, - context, - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password=''): - super(PutKinesisStream, self).__init__( - context=context, - clazz='PutKinesisStream', - properties={ - 'Amazon Kinesis Stream Name': 'test_stream', - 'Access Key': 'test_access_key', - 'Secret Key': 'test_secret', - 'Endpoint Override URL': f"http://kinesis-server-{context.feature_id}:4568", - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'Proxy Username': proxy_username, - 'Proxy Password': proxy_password, - 'Region': 'us-east-1' - }, - auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/minifi/processors/PutS3Object.py b/docker/test/integration/minifi/processors/PutS3Object.py deleted file mode 100644 index ac33cb43a9..0000000000 --- a/docker/test/integration/minifi/processors/PutS3Object.py +++ /dev/null @@ -1,43 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class PutS3Object(Processor): - def __init__( - self, - context, - object_key='test_object_key', - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password=''): - super(PutS3Object, self).__init__( - context=context, - clazz='PutS3Object', - properties={ - 'Object Key': object_key, - 'Bucket': 'test_bucket', - 'Access Key': 'test_access_key', - 'Secret Key': 'test_secret', - 'Endpoint Override URL': f"http://s3-server-{context.feature_id}:9090", - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'Proxy Username': proxy_username, - 'Proxy Password': proxy_password - }, - auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/features/kinesis.feature b/extensions/aws/tests/features/kinesis.feature similarity index 59% rename from docker/test/integration/features/kinesis.feature rename to extensions/aws/tests/features/kinesis.feature index 95f5464fbc..6f903cd561 100644 --- a/docker/test/integration/features/kinesis.feature +++ b/extensions/aws/tests/features/kinesis.feature @@ -19,21 +19,27 @@ Feature: Sending data from MiNiFi-C++ to an AWS Kinesis server As a user of MiNiFi I need to have PutKinesisStream processor - Background: - Given the content of "/tmp/output" is monitored - Scenario: A MiNiFi instance can send data to AWS Kinesis - Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + Given a kinesis server is set up in correspondence with the PutKinesisStream + And a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "Schnappi, das kleine Krokodil" is present in "/tmp/input" - And a PutKinesisStream processor set up to communicate with the kinesis server + And a PutKinesisStream processor + And these processor properties are set + | processor name | property name | property value | + | PutKinesisStream | Amazon Kinesis Stream Name | test_stream | + | PutKinesisStream | Access Key | test_access_key | + | PutKinesisStream | Secret Key | test_secret | + | PutKinesisStream | Endpoint Override URL | http://kinesis-server-${scenario_id}:4568 | + | PutKinesisStream | Region | us-east-1 | + And PutKinesisStream is EVENT_DRIVEN And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the PutKinesisStream And the "success" relationship of the PutKinesisStream processor is connected to the PutFile And the "failure" relationship of the PutKinesisStream processor is connected to the PutKinesisStream - - And a kinesis server is set up in correspondence with the PutKinesisStream + And PutFile's success relationship is auto-terminated When both instances start up - Then a flowfile with the content "Schnappi, das kleine Krokodil" is placed in the monitored directory in less than 60 seconds + Then a single file with the content "Schnappi, das kleine Krokodil" is placed in the "/tmp/output" directory in less than 60 seconds And there is a record on the kinesis server with "Schnappi, das kleine Krokodil" diff --git a/docker/test/integration/resources/kinesis-mock/Dockerfile b/extensions/aws/tests/features/resources/kinesis-mock/Dockerfile similarity index 100% rename from docker/test/integration/resources/kinesis-mock/Dockerfile rename to extensions/aws/tests/features/resources/kinesis-mock/Dockerfile diff --git a/docker/test/integration/resources/kinesis-mock/consumer/consumer.js b/extensions/aws/tests/features/resources/kinesis-mock/consumer/consumer.js similarity index 100% rename from docker/test/integration/resources/kinesis-mock/consumer/consumer.js rename to extensions/aws/tests/features/resources/kinesis-mock/consumer/consumer.js diff --git a/docker/test/integration/resources/kinesis-mock/consumer/package.json b/extensions/aws/tests/features/resources/kinesis-mock/consumer/package.json similarity index 100% rename from docker/test/integration/resources/kinesis-mock/consumer/package.json rename to extensions/aws/tests/features/resources/kinesis-mock/consumer/package.json diff --git a/docker/test/integration/resources/kinesis-mock/server.json b/extensions/aws/tests/features/resources/kinesis-mock/server.json similarity index 100% rename from docker/test/integration/resources/kinesis-mock/server.json rename to extensions/aws/tests/features/resources/kinesis-mock/server.json diff --git a/extensions/aws/tests/features/steps/kinesis_server_container.py b/extensions/aws/tests/features/steps/kinesis_server_container.py new file mode 100644 index 0000000000..e92b33c411 --- /dev/null +++ b/extensions/aws/tests/features/steps/kinesis_server_container.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from pathlib import Path +from minifi_test_framework.containers.container import Container +from minifi_test_framework.core.helpers import wait_for_condition, retry_check +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from minifi_test_framework.containers.docker_image_builder import DockerImageBuilder + + +class KinesisServerContainer(Container): + def __init__(self, test_context: MinifiTestContext): + builder = DockerImageBuilder( + image_tag="minifi-kinesis-mock:latest", + build_context_path=str(Path(__file__).resolve().parent.parent / "resources" / "kinesis-mock") + ) + builder.build() + + super().__init__("minifi-kinesis-mock:latest", f"kinesis-server-{test_context.scenario_id}", test_context.network) + self.environment.append("INITIALIZE_STREAMS=test_stream:3") + self.environment.append("LOG_LEVEL=DEBUG") + + def deploy(self): + super().deploy() + finished_str = "Starting Kinesis Plain Mock Service on port 4568" + return wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=300, + bail_condition=lambda: self.exited, + context=None) + + @retry_check() + def check_kinesis_server_record_data(self, record_data): + (code, output) = self.exec_run(["node", "/app/consumer/consumer.js", record_data]) + logging.info(f"Kinesis server returned output: '{output}' with code '{code}'") + return code == 0 diff --git a/extensions/aws/tests/features/steps/s3_server_container.py b/extensions/aws/tests/features/steps/s3_server_container.py index bd1985f938..01db200301 100644 --- a/extensions/aws/tests/features/steps/s3_server_container.py +++ b/extensions/aws/tests/features/steps/s3_server_container.py @@ -35,10 +35,6 @@ def deploy(self): bail_condition=lambda: self.exited, context=None) - def check_kinesis_server_record_data(self, container_name, record_data): - (code, output) = self.exec_run(["node", "/app/consumer/consumer.js", record_data]) - return code == 0 - def check_s3_server_object_data(self, test_data): (code, output) = self.exec_run(["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"]) if code != 0: diff --git a/extensions/aws/tests/features/steps/steps.py b/extensions/aws/tests/features/steps/steps.py index 6f8d1d7579..6826855823 100644 --- a/extensions/aws/tests/features/steps/steps.py +++ b/extensions/aws/tests/features/steps/steps.py @@ -18,7 +18,7 @@ import string import humanfriendly -from behave import step +from behave import step, then from minifi_test_framework.containers.directory import Directory from minifi_test_framework.steps import checking_steps # noqa: F401 @@ -27,9 +27,10 @@ from minifi_test_framework.steps import flow_building_steps # noqa: F401 from minifi_test_framework.core.minifi_test_context import MinifiTestContext from minifi_test_framework.minifi.processor import Processor -from minifi_test_framework.core.helpers import wait_for_condition +from minifi_test_framework.core.helpers import wait_for_condition, log_due_to_failure from s3_server_container import S3ServerContainer +from kinesis_server_container import KinesisServerContainer @step('a {processor_name} processor set up to communicate with an s3 server') @@ -100,3 +101,13 @@ def step_impl(context): new_dir.files["input.txt"] = content context.get_or_create_default_minifi_container().dirs.append(new_dir) context.original_hash = computeMD5hash(content) + + +@step("a kinesis server is set up in correspondence with the PutKinesisStream") +def step_impl(context): + context.containers["kinesis-server"] = KinesisServerContainer(context) + + +@then("there is a record on the kinesis server with \"{record_data}\"") +def step_impl(context, record_data): + assert context.containers["kinesis-server"].check_kinesis_server_record_data(record_data) or log_due_to_failure(context)