diff --git a/.github/workflows/smoke-nifi-services.yml b/.github/workflows/smoke-nifi-services.yml index 782a7c8d..39ef3445 100644 --- a/.github/workflows/smoke-nifi-services.yml +++ b/.github/workflows/smoke-nifi-services.yml @@ -33,9 +33,7 @@ jobs: - name: Start NiFi services (build) run: | set -euo pipefail - source deploy/export_env_vars.sh - set -euo pipefail - docker compose -f deploy/services.dev.yml up -d --build nifi nifi-nginx nifi-registry-flow + make -C deploy start-nifi-dev-build - name: Smoke tests run: | @@ -44,7 +42,7 @@ jobs: retries=30 delay=15 for attempt in $(seq 1 $retries); do - if ./scripts/smoke_nifi_services.sh; then + if ./scripts/tests/smoke_nifi_services.sh; then exit 0 fi echo "Attempt ${attempt}/${retries} failed. Sleeping ${delay}s..." diff --git a/deploy/Makefile b/deploy/Makefile index 367f1fe4..9d50c3dc 100644 --- a/deploy/Makefile +++ b/deploy/Makefile @@ -25,17 +25,23 @@ load-env: show-env: ${WITH_ENV} >/dev/null 2>&1; printenv | sort + +fix-nifi-registry-perms: + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh $(COMPOSE_FILE) # start services start-nifi: - $(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.yml; \ + docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow start-nifi-dev: - $(WITH_ENV) docker compose -f services.dev.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.dev.yml; \ + docker compose -f services.dev.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow start-nifi-dev-build: - $(WITH_ENV) docker compose -f services.dev.yml up -d --build nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.dev.yml; \ + docker compose -f services.dev.yml up -d --build nifi nifi-nginx nifi-registry-flow start-elastic: $(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) elasticsearch-1 elasticsearch-2 kibana @@ -101,7 +107,7 @@ start-data-infra: start-nifi start-elastic start-samples start-all: start-data-infra start-jupyter start-medcat-service start-ocr-services -.PHONY: start-all start-data-infra start-nifi start-nifi-dev start-nifi-dev-build start-elastic start-samples start-jupyter +.PHONY: start-all start-data-infra start-nifi start-nifi-dev start-nifi-dev-build start-elastic start-samples start-jupyter fix-nifi-registry-perms # stop services @@ -112,6 +118,28 @@ stop-nifi: stop-nifi-dev: $(WITH_ENV) docker compose -f services.dev.yml $(DC_STOP_CMD) nifi nifi-nginx nifi-registry-flow +delete-nifi-containers: + $(WITH_ENV) docker compose -f services.yml rm -f -s nifi nifi-nginx nifi-registry-flow + +delete-nifi-dev-containers: + $(WITH_ENV) docker compose -f services.dev.yml rm -f -s nifi nifi-nginx nifi-registry-flow + +delete-nifi-images: + $(WITH_ENV) images="$$(docker compose -f services.yml config --images nifi nifi-nginx nifi-registry-flow | sort -u)"; \ + if [ -n "$$images" ]; then \ + docker image rm -f $$images; \ + else \ + echo "No NiFi images found in services.yml"; \ + fi + +delete-nifi-dev-images: + $(WITH_ENV) images="$$(docker compose -f services.dev.yml config --images nifi nifi-nginx nifi-registry-flow | sort -u)"; \ + if [ -n "$$images" ]; then \ + docker image rm -f $$images; \ + else \ + echo "No NiFi images found in services.dev.yml"; \ + fi + stop-elastic: $(WITH_ENV) docker compose -f services.yml $(DC_STOP_CMD) elasticsearch-1 elasticsearch-2 kibana @@ -173,7 +201,7 @@ stop-data-infra: stop-nifi stop-elastic stop-samples stop-all: stop-data-infra stop-jupyter stop-medcat-service stop-ocr-services -.PHONY: stop-data-infra stop-nifi stop-nifi-dev stop-elastic stop-samples stop-jupyter +.PHONY: stop-data-infra stop-nifi stop-nifi-dev delete-nifi-containers delete-nifi-dev-containers delete-nifi-images delete-nifi-dev-images stop-elastic stop-samples stop-jupyter # cleanup diff --git a/deploy/elasticsearch.env b/deploy/elasticsearch.env index 9b3616f5..0c7519e0 100644 --- a/deploy/elasticsearch.env +++ b/deploy/elasticsearch.env @@ -7,7 +7,7 @@ ELASTICSEARCH_VERSION=opensearch # possible values : -# - elasticsearch : docker.elastic.co/elasticsearch/elasticsearch:8.18.2 +# - elasticsearch : docker.elastic.co/elasticsearch/elasticsearch:8.19.11 # - elasticsearch (custom cogstack image) : cogstacksystems/cogstack-elasticsearch:latest # - opensearch : opensearchproject/opensearch:3.4.0 # the custom cogstack image is always based on the last image of ES native @@ -89,6 +89,8 @@ ELASTICSEARCH_SECURITY_DIR=../security/certificates/elastic/ # MEMORY CONFIG ELASTICSEARCH_JAVA_OPTS="-Xms512m -Xmx512m -Des.failure_store_feature_flag_enabled=true" +ES_JAVA_OPTS=$ELASTICSEARCH_JAVA_OPTS +OPENSEARCH_JAVA_OPTS=$ELASTICSEARCH_JAVA_OPTS ELASTICSEARCH_DOCKER_CPU_MIN=1 ELASTICSEARCH_DOCKER_CPU_MAX=1 @@ -163,7 +165,7 @@ KIBANA_VERSION=opensearch-dashboards KIBANA_CONFIG_FILE_VERSION=opensearch_dashboards # possible values: -# - elasticsearch : docker.elastic.co/kibana/kibana:8.18.2 +# - elasticsearch : docker.elastic.co/kibana/kibana:8.19.11 # - elasticsearch (custom cogstack image) : cogstacksystems/cogstack-kibana:latest # - opensearch : opensearchproject/opensearch-dashboards:3.4.0 # the custom cogstack image is always based on the last image of ES native @@ -205,7 +207,7 @@ ELASTICSEARCH_XPACK_SECURITY_REPORTING_ENCRYPTION_KEY="e0Y1gTxHWOopIWMTtpjQsDS6K ######################################################################### METRICBEAT Env vars ########################################################################## -METRICBEAT_IMAGE="docker.elastic.co/beats/metricbeat:8.18.2" +METRICBEAT_IMAGE="docker.elastic.co/beats/metricbeat:8.19.11" METRICBEAT_DOCKER_SHM=512m METRICBEAT_DOCKER_CPU_MIN=1 @@ -222,7 +224,7 @@ FILEBEAT_STARTUP_COMMAND="-e --strict.perms=false" FILEBEAT_HOST="https://elasticsearch-1:9200" -FILEBEAT_IMAGE="docker.elastic.co/beats/filebeat:8.18.2" +FILEBEAT_IMAGE="docker.elastic.co/beats/filebeat:8.19.11" FILEBEAT_DOCKER_SHM=512m diff --git a/deploy/export_env_vars.sh b/deploy/export_env_vars.sh index 2ee8a95c..c68b4f1b 100755 --- a/deploy/export_env_vars.sh +++ b/deploy/export_env_vars.sh @@ -41,22 +41,6 @@ env_files=( "$SERVICES_DIR/cogstack-nlp/medcat-service/env/medcat.env" ) -LINT_SCRIPT="$SCRIPT_DIR/../nifi/user_scripts/utils/lint_env.py" - -if [ -e "$LINT_SCRIPT" ]; then - chmod +x $LINT_SCRIPT -fi - -if [ -x "$LINT_SCRIPT" ]; then - echo "🔍 Validating env files..." - if ! python3 "$LINT_SCRIPT" "${env_files[@]}"; then - echo "❌ Env validation failed. Fix the errors above before continuing." - exit 1 - fi -else - echo "⚠️ Skipping env validation; $LINT_SCRIPT not found or not executable." -fi - for env_file in "${env_files[@]}"; do if [ -f "$env_file" ]; then echo "✅ Sourcing $env_file" diff --git a/deploy/network_settings.env b/deploy/network_settings.env index ee235796..6b19bab5 100644 --- a/deploy/network_settings.env +++ b/deploy/network_settings.env @@ -18,4 +18,4 @@ HTTP_PROXY="" NO_PROXY="" no_proxy="" http_proxy="" -https_proxy="" \ No newline at end of file +https_proxy="" diff --git a/deploy/nifi.env b/deploy/nifi.env index c9d007d7..190f65bf 100644 --- a/deploy/nifi.env +++ b/deploy/nifi.env @@ -41,6 +41,9 @@ NIFI_DATA_PATH="../data/" NIFI_TOOLKIT_VERSION=$NIFI_VERSION +# this is to mount medcat models (optional) +NIFI_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH="../services/cogstack-nlp/medcat-service/models/" + #### Port and network settings NIFI_WEB_PROXY_CONTEXT_PATH="/nifi" diff --git a/deploy/services.dev.yml b/deploy/services.dev.yml index e1970b44..b8797418 100644 --- a/deploy/services.dev.yml +++ b/deploy/services.dev.yml @@ -63,11 +63,11 @@ x-nifi-common: &nifi-common x-nifi-volumes: &nifi-volumes # Drivers - - ../nifi/drivers:/opt/nifi/drivers + - ../nifi/drivers:/opt/nifi/drivers:ro # User overrides bundled in the image - ../nifi/user_scripts:/opt/nifi/user_scripts:rw - - ../nifi/user_schemas:/opt/nifi/user_schemas:rw + - ../nifi/user_schemas:/opt/nifi/user_schemas:ro # Python processors (NiFi 2.x) - ../nifi/user_python_extensions:/opt/nifi/nifi-current/python_extensions:rw @@ -84,9 +84,6 @@ x-nifi-volumes: &nifi-volumes # Ingest data directory - ./${NIFI_DATA_PATH:-../data/}:/data/:rw - # DB schemas - - ../services/cogstack-db/:/opt/cogstack-db/:rw - # MedCAT models - ./${RES_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw @@ -127,21 +124,10 @@ services: build: context: .. dockerfile: nifi/Dockerfile - args: - HTTP_PROXY: $HTTP_PROXY - HTTPS_PROXY: $HTTPS_PROXY - no_proxy: $no_proxy container_name: cogstack-nifi hostname: nifi shm_size: ${NIFI_DOCKER_SHM_SIZE:-"1g"} environment: - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - - NIFI_WEB_PROXY_HOST=${NIFI_WEB_PROXY_HOST:-"localhost:8443"} - - NIFI_WEB_PROXY_CONTEXT_PATH=${NIFI_WEB_PROXY_CONTEXT_PATH:-"/nifi"} - - NIFI_INTERNAL_PORT=${NIFI_INTERNAL_PORT:-8443} - - NIFI_OUTPUT_PORT=${NIFI_OUTPUT_PORT:-8082} - - NIFI_INPUT_SOCKET_PORT=${NIFI_INPUT_SOCKET_PORT:-10000} - PYTHONPATH=${NIFI_PYTHONPATH:-/opt/nifi/nifi-current/python/framework} - JVM_OPTS="${NIFI_JVM_OPTS:--XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ParallelRefProcEnabled -Djava.security.egd=file:/dev/./urandom}" deploy: @@ -170,13 +156,7 @@ services: hostname: nifi-registry container_name: cogstack-nifi-registry-flow shm_size: ${NIFI_DOCKER_REGISTRY_SHM_SIZE:-1g} - user: root environment: - - http_proxy=$HTTP_PROXY - - https_proxy=$HTTPS_PROXY - - no_proxy=$no_proxy - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - KEYSTORE_PATH=${NIFI_REGISTRY_KEYSTORE_PATH:-/security/certificates/nifi/nifi-keystore.jks} - KEYSTORE_TYPE=${NIFI_KEYSTORE_TYPE:-jks} - KEYSTORE_PASSWORD=${NIFI_KEYSTORE_PASSWORD:-"cogstackNifi"} @@ -186,9 +166,6 @@ services: - TRUSTSTORE_TYPE=${NIFI_TRUSTSTORE_TYPE:-jks} - INITIAL_ADMIN_IDENTITY=${NIFI_INITIAL_ADMIN_IDENTITY:-"cogstack"} - AUTH=${NIFI_AUTH:-"tls"} - - NIFI_REGISTRY_DB_DIR=${NIFI_REGISTRY_DB_DIR:-/opt/nifi-registry/nifi-registry-current/database} - #- NIFI_REGISTRY_FLOW_PROVIDER=${NIFI_REGISTRY_FLOW_PROVIDER:-file} - - NIFI_REGISTRY_FLOW_STORAGE_DIR=${NIFI_REGISTRY_FLOW_STORAGE_DIR:-/opt/nifi-registry/nifi-registry-current/flow_storage} deploy: resources: limits: @@ -203,11 +180,6 @@ services: ports: - "${NIFI_REGISTRY_FLOW_OUTPUT_PORT:-8083}:${NIFI_REGISTRY_FLOW_INPUT_PORT:-18443}" - entrypoint: bash -c "chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/database && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/flow_storage && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/work && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/logs && \ - bash /opt/nifi-registry/scripts/start.sh" logging: *nifi-logging-common nifi-nginx: diff --git a/deploy/services.yml b/deploy/services.yml index a0901553..d85f2b2b 100644 --- a/deploy/services.yml +++ b/deploy/services.yml @@ -74,11 +74,11 @@ x-nifi-common: &nifi-common x-nifi-volumes: &nifi-volumes # Drivers - - ../nifi/drivers:/opt/nifi/drivers + - ../nifi/drivers:/opt/nifi/drivers:ro # User overrides bundled in the image - ../nifi/user_scripts:/opt/nifi/user_scripts:rw - - ../nifi/user_schemas:/opt/nifi/user_schemas:rw + - ../nifi/user_schemas:/opt/nifi/user_schemas:ro # Python processors (NiFi 2.x) - ../nifi/user_python_extensions:/opt/nifi/nifi-current/python_extensions:rw @@ -95,11 +95,8 @@ x-nifi-volumes: &nifi-volumes # Ingest data directory - ./${NIFI_DATA_PATH:-../data/}:/data/:rw - # DB schemas - - ../services/cogstack-db/:/opt/cogstack-db/:rw - # MedCAT models - - ./${RES_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw + - ./${NIFI_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw # NiFi repositories/state - nifi-vol-logs:/opt/nifi/nifi-current/logs @@ -122,8 +119,8 @@ x-nifi-registry-volumes: &nifi-registry-volumes # Registry persistence - nifi-registry-vol-database:/opt/nifi-registry/nifi-registry-current/database - nifi-registry-vol-flow-storage:/opt/nifi-registry/nifi-registry-current/flow_storage - - nifi-registry-vol-work:/opt/nifi-registry/nifi-registry-current/work - nifi-registry-vol-logs:/opt/nifi-registry/nifi-registry-current/logs + - nifi-registry-vol-work:/opt/nifi-registry/nifi-registry-current/work x-db-common: &db-common <<: *common-ulimits @@ -145,7 +142,6 @@ x-es-common-volumes: &es-common-volumes - ../services/elasticsearch/config/log4j2_${ELASTICSEARCH_VERSION:-opensearch}.properties:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/log4j2.properties:ro # Shared root CA + admin certs - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.crt.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/root-ca.crt:ro - - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.key.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/root-ca.key:ro # OPENSEARCH specific (always mounted even if unused) - ../security/certificates/elastic/opensearch/admin.crt:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/admin.crt:ro - ../security/certificates/elastic/opensearch/admin.key.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/admin.key.pem:ro @@ -171,9 +167,6 @@ x-es-common: &es-common networks: - cognet extra_hosts: *common-hosts - environment: - ES_JAVA_OPTS: ${ELASTICSEARCH_JAVA_OPTS:--Xms2048m -Xmx2048m -Des.failure_store_feature_flag_enabled=true} - OPENSEARCH_JAVA_OPTS: ${ELASTICSEARCH_JAVA_OPTS:--Xms2048m -Xmx2048m -Des.failure_store_feature_flag_enabled=true} logging: *es-logging-common deploy: resources: @@ -204,7 +197,6 @@ x-metricbeat-common: &metricbeat-common volumes: - ../services/metricbeat/metricbeat.yml:/usr/share/metricbeat/metricbeat.yml:ro - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.crt.pem:/usr/share/metricbeat/root-ca.crt:ro - - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.key.pem:/usr/share/metricbeat/root-ca.key:ro networks: - cognet extra_hosts: *common-hosts @@ -219,11 +211,6 @@ x-filebeat-common: &filebeat-common env_file: - ./elasticsearch.env - ../security/env/users_elasticsearch.env - environment: - - ELASTICSEARCH_HOSTS=${ELASTICSEARCH_HOSTS:-["https://elasticsearch-1:9200","https://elasticsearch-2:9200"]} - - FILEBEAT_USER=${FILEBEAT_USER:-elastic} - - FILEBEAT_PASSWORD=${FILEBEAT_PASSWORD:-kibanaserver} - - KIBANA_HOST=${KIBANA_HOST:-"https://kibana:5601"} deploy: resources: limits: @@ -233,9 +220,8 @@ x-filebeat-common: &filebeat-common cpus: "${FILEBEAT_DOCKER_CPU_MIN}" memory: "${FILEBEAT_DOCKER_RAM}" volumes: - - ../services/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:rw + - ../services/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.crt.pem:/etc/pki/root/root-ca.crt:ro - - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.key.pem:/etc/pki/root/root-ca.key:ro networks: - cognet extra_hosts: *common-hosts @@ -251,7 +237,7 @@ services: #---------------------------------------------------------------------------# samples-db: <<: *db-common - image: postgres:17.7-alpine + image: postgres:18.1-trixie container_name: cogstack-samples-db platform: linux/amd64 environment: @@ -264,7 +250,7 @@ services: - ../services/pgsamples/schemas:/data/schemas:rw - ../services/pgsamples/init_db.sh:/docker-entrypoint-initdb.d/init_db.sh:ro # data persistence - - samples-vol:/var/lib/postgresql/data + - samples-vol:/var/lib/postgresql command: postgres -c "max_connections=${POSTGRES_DB_MAX_CONNECTIONS:-100}" ports: - 5554:5432 @@ -278,7 +264,7 @@ services: #---------------------------------------------------------------------------# cogstack-databank-db: <<: *db-common - image: postgres:17.7-alpine + image: postgres:18.1-trixie container_name: cogstack-production-databank-db platform: linux/amd64 environment: @@ -290,7 +276,7 @@ services: - ../services/cogstack-db/pgsql/schemas:/data/:ro - ../services/cogstack-db/pgsql/init_db.sh:/docker-entrypoint-initdb.d/init_db.sh:ro # data persistence - - databank-vol:/var/lib/postgresql/data + - databank-vol:/var/lib/postgresql command: postgres -c "max_connections=${POSTGRES_DB_MAX_CONNECTIONS:-100}" ports: - 5558:5432 @@ -298,15 +284,13 @@ services: - 5432 networks: - cognet - + cogstack-databank-db-mssql: <<: *db-common image: mcr.microsoft.com/mssql/server:2019-latest container_name: cogstack-production-databank-db-mssql environment: - ACCEPT_EULA=y - - MSSQL_SA_USER=${MSSQL_SA_USER:-sa} - - MSSQL_SA_PASSWORD=${MSSQL_SA_PASSWORD:-admin!COGSTACK2022} volumes: # mapping postgres data dump and initialization - ../services/cogstack-db/mssql/schemas:/data/:ro @@ -481,7 +465,6 @@ services: # Security certificates, general - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.crt.pem:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.crt:ro - - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.key.pem:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.key:ro - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.p12:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.p12:ro - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elasticsearch/${ES_INSTANCE_NAME_1:-elasticsearch-1}/${ES_INSTANCE_NAME_1:-elasticsearch-1}.crt:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/esnode1.crt:ro @@ -509,13 +492,6 @@ services: hostname: nifi shm_size: ${NIFI_DOCKER_SHM_SIZE:-"1g"} environment: - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - - NIFI_WEB_PROXY_HOST=${NIFI_WEB_PROXY_HOST:-"localhost:8443"} - - NIFI_WEB_PROXY_CONTEXT_PATH=${NIFI_WEB_PROXY_CONTEXT_PATH:-"/nifi"} - - NIFI_INTERNAL_PORT=${NIFI_INTERNAL_PORT:-8443} - - NIFI_OUTPUT_PORT=${NIFI_OUTPUT_PORT:-8082} - - NIFI_INPUT_SOCKET_PORT=${NIFI_INPUT_SOCKET_PORT:-10000} - PYTHONPATH=${NIFI_PYTHONPATH:-/opt/nifi/nifi-current/python/framework} - JVM_OPTS="${NIFI_JVM_OPTS:--XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ParallelRefProcEnabled -Djava.security.egd=file:/dev/./urandom}" deploy: @@ -544,25 +520,15 @@ services: hostname: nifi-registry container_name: cogstack-nifi-registry-flow shm_size: ${NIFI_DOCKER_REGISTRY_SHM_SIZE:-1g} - user: root environment: - - http_proxy=$HTTP_PROXY - - https_proxy=$HTTPS_PROXY - - no_proxy=$no_proxy - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - KEYSTORE_PATH=${NIFI_REGISTRY_KEYSTORE_PATH:-/security/certificates/nifi/nifi-keystore.jks} - KEYSTORE_TYPE=${NIFI_KEYSTORE_TYPE:-jks} - KEYSTORE_PASSWORD=${NIFI_KEYSTORE_PASSWORD:-"cogstackNifi"} - TRUSTSTORE_PASSWORD=${NIFI_TRUSTSTORE_PASSWORD:-"cogstackNifi"} - TRUSTSTORE_PATH=${NIFI_REGISTRY_TRUSTSTORE_PATH:-/security/certificates/nifi/nifi-truststore.jks} - - TRUSTSTORE_TYPE=${NIFI_TRUSTSTORE_TYPE:-jks} - INITIAL_ADMIN_IDENTITY=${NIFI_INITIAL_ADMIN_IDENTITY:-"cogstack"} - AUTH=${NIFI_AUTH:-"tls"} - - NIFI_REGISTRY_DB_DIR=${NIFI_REGISTRY_DB_DIR:-/opt/nifi-registry/nifi-registry-current/database} - #- NIFI_REGISTRY_FLOW_PROVIDER=${NIFI_REGISTRY_FLOW_PROVIDER:-file} - - NIFI_REGISTRY_FLOW_STORAGE_DIR=${NIFI_REGISTRY_FLOW_STORAGE_DIR:-/opt/nifi-registry/nifi-registry-current/flow_storage} deploy: resources: limits: @@ -577,11 +543,6 @@ services: ports: - "${NIFI_REGISTRY_FLOW_OUTPUT_PORT:-8083}:${NIFI_REGISTRY_FLOW_INPUT_PORT:-18443}" - entrypoint: bash -c "chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/database && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/flow_storage && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/work && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/logs && \ - bash /opt/nifi-registry/scripts/start.sh" logging: *nifi-logging-common nifi-nginx: @@ -638,10 +599,6 @@ services: image: gitea/gitea:1.23-rootless shm_size: ${GITEA_DOCKER_SHM_SIZE:-"1g"} restart: always - environment: - - http_proxy=$HTTP_PROXY - - https_proxy=$HTTPS_PROXY - - no_proxy=$no_proxy deploy: resources: limits: @@ -728,10 +685,10 @@ volumes: driver: local nifi-registry-vol-flow-storage: driver: local - nifi-registry-vol-work: - driver: local nifi-registry-vol-logs: driver: local + nifi-registry-vol-work: + driver: local # Gitea gitea-lib-vol: diff --git a/docs/deploy/deployment.md b/docs/deploy/deployment.md index 1a0bd744..65b5c534 100644 --- a/docs/deploy/deployment.md +++ b/docs/deploy/deployment.md @@ -17,6 +17,14 @@ Make sure you have read the [Prerequisites](./main.md) section before proceeding These variables configure NiFi, Elasticsearch/OpenSearch, Kibana, Jupyter, Metricbeat, the sample DB, etc. +> **Important:** If you run `docker compose` directly (instead of `make`), first load the envs with: +> +> ```bash +> source ./deploy/export_env_vars.sh +> ``` +> +> The Makefile targets already do this for you. + ## 🧩 Modular service design (important) This repository follows a **modular deployment model**: diff --git a/docs/nifi/main.md b/docs/nifi/main.md index 82d899b8..9c0b8b90 100644 --- a/docs/nifi/main.md +++ b/docs/nifi/main.md @@ -140,6 +140,16 @@ You should check if the env vars have been set after running the script: echo $NIFI_GID ``` +### NiFi Registry permissions helper + +If NiFi Registry fails to start due to permission issues on its persistent volumes, run the helper script once to fix ownership: + + ```bash + ./nifi/fix_nifi_registry_perms.sh + ``` + +This script runs the registry container as root only long enough to `chown` the registry `database`, `flow_storage`, `work`, and `logs` directories, then exits. Subsequent starts can run as the default non-root user. + If the above command prints some numbers then it means that the `export_env_vars.sh` script worked. Otherwise, if you don't see anything, or just blank lines, then you need to execute the following: ```bash @@ -172,7 +182,7 @@ Then execute the `recreate_nifi_docker_image.sh` script located in the `./nifi` bash recreate_nifi_docker_image.sh ``` -Remember that the above export script and/or command are only visible in the current shell, so every time you restart your shell terminal you must execute the `./deploy/export_env_vars.sh` so that the variables will be visible by docker at runtime, because it uses the GID/UID in the `services.yml` file , specifying in the service definition `user: "${USER_ID:-${NIFI_UID:-1000}}:${GROUP_ID:-${NIFI_GID:-1000}}"`. +Remember that the above export script and/or command are only visible in the current shell, so every time you restart your shell terminal you must `source ./deploy/export_env_vars.sh` so the variables are visible to Docker at runtime. If you're using the `deploy/Makefile` targets, it handles this for you. ### `{bootstrap.conf}` diff --git a/nifi/fix_nifi_registry_perms.sh b/nifi/fix_nifi_registry_perms.sh new file mode 100755 index 00000000..7e7e8c27 --- /dev/null +++ b/nifi/fix_nifi_registry_perms.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +set -euo pipefail + +# Support being run from any directory. +SCRIPT_SOURCE="${BASH_SOURCE[0]-$0}" +SCRIPT_DIR="$(cd "$(dirname "$SCRIPT_SOURCE")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + +COMPOSE_FILE="${1:-services.yml}" +if [[ "$COMPOSE_FILE" != /* ]]; then + if [[ "$COMPOSE_FILE" == services*.yml ]]; then + COMPOSE_FILE="deploy/$COMPOSE_FILE" + fi + COMPOSE_PATH="$REPO_ROOT/$COMPOSE_FILE" +else + COMPOSE_PATH="$COMPOSE_FILE" +fi + +if [ ! -f "$COMPOSE_PATH" ]; then + echo "Compose file not found: $COMPOSE_PATH" >&2 + exit 1 +fi + +if [ "${SKIP_EXPORT_ENV:-}" != "1" ]; then + set -a + source "$REPO_ROOT/deploy/export_env_vars.sh" + set +a +fi + +docker compose -f "$COMPOSE_PATH" run --rm --no-deps --user root --entrypoint bash -T nifi-registry-flow \ + -c 'chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/{database,flow_storage,work,logs}' diff --git a/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py b/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py index 17829a55..fadee638 100644 --- a/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py +++ b/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py @@ -2,7 +2,6 @@ import copy import io import json -import traceback from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter @@ -14,7 +13,6 @@ StandardValidators, ) from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -83,10 +81,9 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ - Transforms an Avro flow file by converting a specified binary field to a base64-encoded string. + Processes an Avro flow file by converting a specified binary field to a base64-encoded string. Args: context (ProcessContext): The process context containing processor properties. @@ -100,69 +97,64 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr FlowFileTransformResult: The result containing the transformed flow file, updated attributes, and relationship. """ - try: - self.process_context = context - self.set_properties(context.getProperties()) - - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) - - schema: Schema | None = reader.datum_reader.writers_schema - - # change the datatype of the binary field from bytes to string - # (avoids headaches later on when converting avro to json) - # because if we dont change the schema the native NiFi converter will convert bytes to an array of integers. - output_schema = None - if schema is not None and isinstance(schema, RecordSchema): - schema_dict = copy.deepcopy(schema.to_json()) - for field in schema_dict["fields"]: # type: ignore - self.logger.info(str(field)) - if field["name"] == self.binary_field_name: - field["type"] = ["null", "string"] - break - output_schema = parse(json.dumps(schema_dict)) - - # Write them to a binary avro stream - output_byte_buffer = io.BytesIO() - writer = DataFileWriter(output_byte_buffer, DatumWriter(), output_schema) - - for record in reader: - if type(record) is dict: - record_document_binary_data = record.get(str(self.binary_field_name), None) - - if record_document_binary_data is not None: - if self.operation_mode == "base64": - record_document_binary_data = base64.b64encode(record_document_binary_data).decode() - else: - self.logger.info("No binary data found in record, using empty content") + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) + + schema: Schema | None = reader.datum_reader.writers_schema + + # change the datatype of the binary field from bytes to string + # (avoids headaches later on when converting avro to json) + # because if we dont change the schema the native NiFi converter will convert bytes to an array of integers. + output_schema = None + if schema is not None and isinstance(schema, RecordSchema): + schema_dict = copy.deepcopy(schema.to_json()) + for field in schema_dict["fields"]: # type: ignore + self.logger.info(str(field)) + if field["name"] == self.binary_field_name: + field["type"] = ["null", "string"] + break + output_schema = parse(json.dumps(schema_dict)) + + # Write them to a binary avro stream + output_byte_buffer = io.BytesIO() + writer = DataFileWriter(output_byte_buffer, DatumWriter(), output_schema) + + for record in reader: + if type(record) is dict: + record_document_binary_data = record.get(str(self.binary_field_name), None) + + if record_document_binary_data is not None: + if self.operation_mode == "base64": + record_document_binary_data = base64.b64encode(record_document_binary_data).decode() else: - raise TypeError("Expected Avro record to be a dictionary, but got: " + str(type(record))) - - _tmp_record = {} - _tmp_record[str(self.binary_field_name)] = record_document_binary_data - - for k, v in record.items(): - if k != str(self.binary_field_name): - _tmp_record[k] = v - - writer.append(_tmp_record) - - input_byte_buffer.close() - reader.close() - writer.flush() - output_byte_buffer.seek(0) - - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["binary_field"] = str(self.binary_field_name) - attributes["operation_mode"] = str(self.operation_mode) - attributes["mime.type"] = "application/avro-binary" - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=output_byte_buffer.getvalue()) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + self.logger.info("No binary data found in record, using empty content") + else: + raise TypeError("Expected Avro record to be a dictionary, but got: " + str(type(record))) + + _tmp_record = {} + _tmp_record[str(self.binary_field_name)] = record_document_binary_data + + for k, v in record.items(): + if k != str(self.binary_field_name): + _tmp_record[k] = v + + writer.append(_tmp_record) + + input_byte_buffer.close() + reader.close() + writer.flush() + output_byte_buffer.seek(0) + + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["binary_field"] = str(self.binary_field_name) + attributes["operation_mode"] = str(self.operation_mode) + attributes["mime.type"] = "application/avro-binary" + + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=output_byte_buffer.getvalue(), + ) diff --git a/nifi/user_python_extensions/convert_json_record_schema.py b/nifi/user_python_extensions/convert_json_record_schema.py index ac29e842..ed313d8f 100644 --- a/nifi/user_python_extensions/convert_json_record_schema.py +++ b/nifi/user_python_extensions/convert_json_record_schema.py @@ -1,12 +1,10 @@ import json -import traceback from collections import defaultdict from typing import Any from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ProcessContext, PropertyDescriptor, StandardValidators from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -322,35 +320,29 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: return new_record - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: output_contents: list[dict[Any, Any]] = [] - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) + if isinstance(records, dict): + records = [records] - if isinstance(records, dict): - records = [records] + json_mapper_schema: dict = {} + with open(self.json_mapper_schema_path) as file: + json_mapper_schema = json.load(file) - json_mapper_schema: dict = {} - with open(self.json_mapper_schema_path) as file: - json_mapper_schema = json.load(file) + for record in records: + output_contents.append(self.map_record(record, json_mapper_schema)) - for record in records: - output_contents.append(self.map_record(record, json_mapper_schema)) + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["json_mapper_schema_path"] = str(self.json_mapper_schema_path) + attributes["mime.type"] = "application/json" - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["json_mapper_schema_path"] = str(self.json_mapper_schema_path) - attributes["mime.type"] = "application/json" - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents).encode('utf-8')) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/convert_json_to_attribute.py b/nifi/user_python_extensions/convert_json_to_attribute.py index 231663be..43a11bf7 100644 --- a/nifi/user_python_extensions/convert_json_to_attribute.py +++ b/nifi/user_python_extensions/convert_json_to_attribute.py @@ -1,10 +1,8 @@ import json import re -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ProcessContext, PropertyDescriptor, StandardValidators -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -44,46 +42,39 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: DIGITS = re.compile(r"^\d+$") + + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + text = (input_raw_bytes.decode("utf-8", errors="replace").strip() if input_raw_bytes else "[]") + try: - self.process_context = context - self.set_properties(context.getProperties()) - - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - text = (input_raw_bytes.decode("utf-8", errors="replace").strip() if input_raw_bytes else "[]") - - try: - parsed = json.loads(text) if text else [] - except Exception: - parsed = [] - - records = parsed if isinstance(parsed, list) else parsed.get("records", []) - if not isinstance(records, list): - records = [] - - ids = [] - for r in records: - if not isinstance(r, dict): - continue - v = r.get(self.field_name) - if v is None: - continue - s = str(v).strip() - if DIGITS.match(s): - ids.append(s) - - ids_csv = ",".join(ids) - return FlowFileTransformResult( - relationship="success", - attributes={ - "ids_csv": ids_csv, - "ids_count": str(len(ids)), - "ids_len": str(len(ids_csv)), - }, - ) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + parsed = json.loads(text) if text else [] + except Exception: + parsed = [] + + records = parsed if isinstance(parsed, list) else parsed.get("records", []) + if not isinstance(records, list): + records = [] + + ids = [] + for r in records: + if not isinstance(r, dict): + continue + v = r.get(self.field_name) + if v is None: + continue + s = str(v).strip() + if DIGITS.match(s): + ids.append(s) + + ids_csv = ",".join(ids) + return FlowFileTransformResult( + relationship="success", + attributes={ + "ids_csv": ids_csv, + "ids_count": str(len(ids)), + "ids_len": str(len(ids_csv)), + }, + ) diff --git a/nifi/user_python_extensions/convert_record_parquet_to_json.py b/nifi/user_python_extensions/convert_record_parquet_to_json.py index cde065ae..bb2f49d8 100644 --- a/nifi/user_python_extensions/convert_record_parquet_to_json.py +++ b/nifi/user_python_extensions/convert_record_parquet_to_json.py @@ -1,10 +1,8 @@ import io import json -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ProcessContext, PropertyDescriptor -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from pyarrow import parquet @@ -30,49 +28,43 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ """ - try: - self.process_context = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + parquet_file = parquet.ParquetFile(input_byte_buffer) - parquet_file = parquet.ParquetFile(input_byte_buffer) + output_buffer: io.BytesIO = io.BytesIO() + record_count: int = 0 - output_buffer: io.BytesIO = io.BytesIO() - record_count: int = 0 + for batch in parquet_file.iter_batches(batch_size=10000): + records: list[dict] = batch.to_pylist() - for batch in parquet_file.iter_batches(batch_size=10000): - records: list[dict] = batch.to_pylist() + for record in records: + json_record = json.dumps( + record, + ensure_ascii=False, + separators=(",", ":"), + default=parquet_json_data_type_convert, + ) - for record in records: - json_record = json.dumps( - record, - ensure_ascii=False, - separators=(",", ":"), - default=parquet_json_data_type_convert, - ) + output_buffer.write(json_record.encode("utf-8")) + output_buffer.write(b"\n") + record_count += len(records) - output_buffer.write(json_record.encode("utf-8")) - output_buffer.write(b"\n") - record_count += len(records) + input_byte_buffer.close() - input_byte_buffer.close() + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["mime.type"] = "application/x-ndjson" + attributes["record.count"] = str(record_count) - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["mime.type"] = "application/x-ndjson" - attributes["record.count"] = str(record_count) - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=output_buffer.getvalue()) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=output_buffer.getvalue(), + ) diff --git a/nifi/user_python_extensions/parse_service_response.py b/nifi/user_python_extensions/parse_service_response.py index e4eddf8c..d8ea0a77 100644 --- a/nifi/user_python_extensions/parse_service_response.py +++ b/nifi/user_python_extensions/parse_service_response.py @@ -1,5 +1,4 @@ import json -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( @@ -8,7 +7,6 @@ StandardValidators, ) from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -95,10 +93,9 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ - Transforms the input FlowFile by parsing the service response and extracting relevant fields. + Processes the input FlowFile by parsing the service response and extracting relevant fields. Args: context (ProcessContext): The process context containing processor properties. @@ -113,87 +110,83 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list = [] - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() + records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) - records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) + if isinstance(records, dict): + records = [records] - if isinstance(records, dict): - records = [records] + if self.service_message_type == "ocr": + for record in records: + result = record.get("result", {}) - if self.service_message_type == "ocr": - for record in records: - result = record.get("result", {}) + _record = {} + _record["metadata"] = result.get("metadata", {}) + _record[self.output_text_field_name] = result.get("text", "") + _record["success"] = result.get("success", False) + _record["timestamp"] = result.get("timestamp", None) - _record = {} - _record["metadata"] = result.get("metadata", {}) - _record[self.output_text_field_name] = result.get("text", "") - _record["success"] = result.get("success", False) - _record["timestamp"] = result.get("timestamp", None) + if "footer" in result: + for k, v in result["footer"].items(): + _record[k] = v - if "footer" in result: - for k, v in result["footer"].items(): - _record[k] = v + output_contents.append(_record) - output_contents.append(_record) + elif self.service_message_type == "medcat" and "result" in records[0]: + result = records[0].get("result", []) + medcat_info = records[0].get("medcat_info", {}) - elif self.service_message_type == "medcat" and "result" in records[0]: - result = records[0].get("result", []) - medcat_info = records[0].get("medcat_info", {}) + if isinstance(result, dict): + result = [result] - if isinstance(result, dict): - result = [result] + for annotated_record in result: + annotations = annotated_record.get("annotations", []) + annotations = annotations[0] if len(annotations) > 0 else annotations + footer = annotated_record.get("footer", {}) - for annotated_record in result: - annotations = annotated_record.get("annotations", []) - annotations = annotations[0] if len(annotations) > 0 else annotations - footer = annotated_record.get("footer", {}) + if self.medcat_output_mode == "deid": + _output_annotated_record = {} + _output_annotated_record["service_model"] = medcat_info + _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) + _output_annotated_record[self.output_text_field_name] = annotated_record.get("text", "") - if self.medcat_output_mode == "deid": - _output_annotated_record = {} - _output_annotated_record["service_model"] = medcat_info - _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) - _output_annotated_record[self.output_text_field_name] = annotated_record.get("text", "") + if self.medcat_deid_keep_annotations is True: + _output_annotated_record["annotations"] = annotations + else: + _output_annotated_record["annotations"] = {} - if self.medcat_deid_keep_annotations is True: - _output_annotated_record["annotations"] = annotations - else: - _output_annotated_record["annotations"] = {} + for k, v in footer.items(): + _output_annotated_record[k] = v + output_contents.append(_output_annotated_record) - for k, v in footer.items(): - _output_annotated_record[k] = v - output_contents.append(_output_annotated_record) + else: + for annotation_id, annotation_data in annotations.items(): + _output_annotated_record = {} + _output_annotated_record["service_model"] = medcat_info + _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) - else: - for annotation_id, annotation_data in annotations.items(): - _output_annotated_record = {} - _output_annotated_record["service_model"] = medcat_info - _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) + for k, v in annotation_data.items(): + _output_annotated_record[k] = v - for k, v in annotation_data.items(): - _output_annotated_record[k] = v + for k, v in footer.items(): + _output_annotated_record[k] = v - for k, v in footer.items(): - _output_annotated_record[k] = v + if self.document_id_field_name in footer: + _output_annotated_record["annotation_id"] = ( + str(footer[self.document_id_field_name]) + "_" + str(annotation_id) + ) - if self.document_id_field_name in footer: - _output_annotated_record["annotation_id"] = \ - str(footer[self.document_id_field_name]) + "_" + str(annotation_id) + output_contents.append(_output_annotated_record) - output_contents.append(_output_annotated_record) + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["output_text_field_name"] = str(self.output_text_field_name) + attributes["mime.type"] = "application/json" - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["output_text_field_name"] = str(self.output_text_field_name) - attributes["mime.type"] = "application/json" - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents).encode('utf-8')) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/prepare_record_for_nlp.py b/nifi/user_python_extensions/prepare_record_for_nlp.py index 443de6b2..40f7f70a 100644 --- a/nifi/user_python_extensions/prepare_record_for_nlp.py +++ b/nifi/user_python_extensions/prepare_record_for_nlp.py @@ -1,6 +1,5 @@ import io import json -import traceback from typing import Any from avro.datafile import DataFileReader @@ -11,7 +10,6 @@ PropertyDescriptor, StandardValidators, ) -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -55,8 +53,7 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """_summary_ Args: @@ -73,49 +70,46 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list = [] - try: - self.process_context = context - self.set_properties(context.getProperties()) + self.process_flow_file_type = str(self.process_flow_file_type).lower() - self.process_flow_file_type = str(self.process_flow_file_type).lower() + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + reader: DataFileReader | (list[dict[str, Any]] | list[Any]) - reader: DataFileReader | (list[dict[str, Any]] | list[Any]) + if self.process_flow_file_type == "avro": + reader = DataFileReader(input_byte_buffer, DatumReader()) + else: + json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) + reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] - if self.process_flow_file_type == "avro": - reader = DataFileReader(input_byte_buffer, DatumReader()) + for record in reader: + if type(record) is dict: + record_document_text = record.get(str(self.document_text_field_name), "") else: - json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) - reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] + raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) - for record in reader: - if type(record) is dict: - record_document_text = record.get(str(self.document_text_field_name), "") - else: - raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) - - output_contents.append({ + output_contents.append( + { "text": record_document_text, - "footer": {k: v for k, v in record.items() if k != str(self.document_text_field_name)} - }) + "footer": {k: v for k, v in record.items() if k != str(self.document_text_field_name)}, + } + ) - input_byte_buffer.close() + input_byte_buffer.close() - if isinstance(reader, DataFileReader): - reader.close() + if isinstance(reader, DataFileReader): + reader.close() - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["mime.type"] = "application/json" + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["mime.type"] = "application/json" - output_contents = output_contents[0] if len(output_contents) == 1 else output_contents + output_contents = output_contents[0] if len(output_contents) == 1 else output_contents - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps({"content": output_contents}).encode("utf-8")) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps({"content": output_contents}).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/prepare_record_for_ocr.py b/nifi/user_python_extensions/prepare_record_for_ocr.py index e6c41e5c..709c185d 100644 --- a/nifi/user_python_extensions/prepare_record_for_ocr.py +++ b/nifi/user_python_extensions/prepare_record_for_ocr.py @@ -1,7 +1,6 @@ import base64 import io import json -import traceback from typing import Any from avro.datafile import DataFileReader @@ -13,7 +12,6 @@ StandardValidators, ) from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -78,68 +76,65 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: output_contents: list = [] - try: - self.process_context = context - self.set_properties(context.getProperties()) - - self.process_flow_file_type = str(self.process_flow_file_type).lower() - - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - - reader: DataFileReader | (list[dict[str, Any]] | list[Any]) - - if self.process_flow_file_type == "avro": - reader = DataFileReader(input_byte_buffer, DatumReader()) - elif self.process_flow_file_type == "ndjson": - json_lines = input_byte_buffer.read().decode("utf-8").splitlines() - reader = [json.loads(line) for line in json_lines if line.strip()] - else: - json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) - reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] - - for record in reader: - if type(record) is dict: - record_document_binary_data = record.get(str(self.binary_field_name), None) - if record_document_binary_data is not None: - if self.operation_mode == "base64": - record_document_binary_data = base64.b64encode(record_document_binary_data).decode() - else: - self.logger.info("No binary data found in record, using empty content") + self.process_flow_file_type = str(self.process_flow_file_type).lower() + + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + + reader: DataFileReader | (list[dict[str, Any]] | list[Any]) + + if self.process_flow_file_type == "avro": + reader = DataFileReader(input_byte_buffer, DatumReader()) + elif self.process_flow_file_type == "ndjson": + json_lines = input_byte_buffer.read().decode("utf-8").splitlines() + reader = [json.loads(line) for line in json_lines if line.strip()] + else: + json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) + reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] + + for record in reader: + if type(record) is dict: + record_document_binary_data = record.get(str(self.binary_field_name), None) + if record_document_binary_data is not None: + if self.operation_mode == "base64": + record_document_binary_data = base64.b64encode(record_document_binary_data).decode() else: - raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) + self.logger.info("No binary data found in record, using empty content") + else: + raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) - output_contents.append({ + output_contents.append( + { "binary_data": record_document_binary_data, - "footer": {k: v for k, v in record.items() if k != str(self.binary_field_name)} - }) + "footer": {k: v for k, v in record.items() if k != str(self.binary_field_name)}, + } + ) - input_byte_buffer.close() + input_byte_buffer.close() - if isinstance(reader, DataFileReader): - reader.close() + if isinstance(reader, DataFileReader): + reader.close() - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["binary_field"] = str(self.binary_field_name) - attributes["output_text_field_name"] = str(self.output_text_field_name) - attributes["mime.type"] = "application/json" + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["binary_field"] = str(self.binary_field_name) + attributes["output_text_field_name"] = str(self.output_text_field_name) + attributes["mime.type"] = "application/json" - if self.process_flow_file_type == "avro": - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents, cls=AvroJSONEncoder).encode("utf-8") - ) - else: - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents).encode("utf-8")) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception + if self.process_flow_file_type == "avro": + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents, cls=AvroJSONEncoder).encode("utf-8"), + ) + + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/record_add_geolocation.py b/nifi/user_python_extensions/record_add_geolocation.py index 2a027937..dce48bbe 100644 --- a/nifi/user_python_extensions/record_add_geolocation.py +++ b/nifi/user_python_extensions/record_add_geolocation.py @@ -11,7 +11,6 @@ PropertyDescriptor, StandardValidators, ) -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.generic import download_file_from_url, safe_delete_paths @@ -88,7 +87,6 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides def onScheduled(self, context: ProcessContext) -> None: """ Initializes processor resources when scheduled. Args: @@ -155,9 +153,8 @@ def _check_geolocation_lookup_datafile(self) -> bool: return file_found - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: - """ Transforms the input FlowFile by adding geolocation data based on postcode lookup. + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + """ Processes the input FlowFile by adding geolocation data based on postcode lookup. Args: context (ProcessContext): The process context. flowFile (JavaObject): The input FlowFile to be transformed. @@ -171,54 +168,45 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr Use SplitRecord processor to split large files into smaller chunks before processing. """ - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) - - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - - records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) - - valid_records: list[dict] = [] - error_records: list[dict] = [] - - if isinstance(records, dict): - records = [records] - - if self.postcode_lookup_index: - for record in records: - if self.postcode_field_name in record: - _postcode = str(record[self.postcode_field_name]).replace(" ", "") - _data_col_row_idx = self.postcode_lookup_index.get(_postcode, -1) - - if _data_col_row_idx != -1: - _selected_row = self.loaded_csv_file_rows[_data_col_row_idx] - _lat, _long = str(_selected_row[7]).strip(), str(_selected_row[8]).strip() - try: - record[self.geolocation_field_name] = { - "lat": float(_lat), - "lon": float(_long) - } - except ValueError: - self.logger.debug(f"invalid lat/long values for postcode {_postcode}: {_lat}, {_long}") - error_records.append(record) - valid_records.append(record) - else: - raise FileNotFoundError("geolocation lookup datafile is not available and data was not loaded, " \ - "please check URLs") - - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["mime.type"] = "application/json" - - if error_records: - attributes["record.count.errors"] = str(len(error_records)) - attributes["record.count"] = str(len(valid_records)) - - return FlowFileTransformResult( - relationship="success", - attributes=attributes, - contents=json.dumps(valid_records).encode("utf-8"), + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + + records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) + + valid_records: list[dict] = [] + error_records: list[dict] = [] + + if isinstance(records, dict): + records = [records] + + if self.postcode_lookup_index: + for record in records: + if self.postcode_field_name in record: + _postcode = str(record[self.postcode_field_name]).replace(" ", "") + _data_col_row_idx = self.postcode_lookup_index.get(_postcode, -1) + + if _data_col_row_idx != -1: + _selected_row = self.loaded_csv_file_rows[_data_col_row_idx] + _lat, _long = str(_selected_row[7]).strip(), str(_selected_row[8]).strip() + try: + record[self.geolocation_field_name] = {"lat": float(_lat), "lon": float(_long)} + except ValueError: + self.logger.debug(f"invalid lat/long values for postcode {_postcode}: {_lat}, {_long}") + error_records.append(record) + valid_records.append(record) + else: + raise FileNotFoundError( + "geolocation lookup datafile is not available and data was not loaded, please check URLs" ) - except Exception as exception: - self.logger.error("Exception during flowfile processing:\n" + traceback.format_exc()) - return self.build_failure_result(flowFile, exception) \ No newline at end of file + + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["mime.type"] = "application/json" + + if error_records: + attributes["record.count.errors"] = str(len(error_records)) + attributes["record.count"] = str(len(valid_records)) + + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(valid_records).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index 718a8a14..a26917a2 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -1,6 +1,5 @@ import base64 import json -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( @@ -8,7 +7,6 @@ PropertyDescriptor, StandardValidators, ) -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob @@ -95,8 +93,24 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def _load_json_records(self, input_raw_bytes: bytes | bytearray) -> list | dict: + try: + return json.loads(input_raw_bytes.decode()) + except json.JSONDecodeError as exc: + self.logger.error(f"Error decoding JSON: {exc} \nAttempting to decode as {self.input_charset}") + try: + return json.loads(input_raw_bytes.decode(self.input_charset)) + except json.JSONDecodeError as exc2: + self.logger.error(f"Error decoding JSON: {exc2} \nAttempting to decode as windows-1252") + try: + return json.loads(input_raw_bytes.decode("windows-1252")) + except json.JSONDecodeError as exc3: + raise ValueError( + "Error decoding JSON after trying utf-8, " + f"{self.input_charset}, and windows-1252: {exc3}" + ) from exc3 + + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ Transforms the input FlowFile by decompressing Cerner blob data from JSON records. @@ -114,198 +128,117 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list = [] attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - try: - self.process_context = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes | bytearray = flowFile.getContentsAsBytes() - # read avro record - input_raw_bytes: bytes | bytearray = flowFile.getContentsAsBytes() + records: list | dict = self._load_json_records(input_raw_bytes) - records: list | dict = [] + if not isinstance(records, list): + records = [records] - try: - records = json.loads(input_raw_bytes.decode()) - except json.JSONDecodeError as e: - self.logger.error(f"Error decoding JSON: {str(e)} \nAttempting to decode as {self.input_charset}") - try: - records = json.loads(input_raw_bytes.decode(self.input_charset)) - except json.JSONDecodeError as e: - self.logger.error(f"Error decoding JSON: {str(e)} \nAttempting to decode as windows-1252") - try: - records = json.loads(input_raw_bytes.decode("windows-1252")) - except json.JSONDecodeError as e: - return self.build_failure_result( - flowFile, - ValueError(f"Error decoding JSON: {str(e)} \n with windows-1252"), - attributes=attributes, - contents=input_raw_bytes, - ) - - if not isinstance(records, list): - records = [records] - - if not records: - return self.build_failure_result( - flowFile, - ValueError("No records found in JSON input"), - attributes=attributes, - contents=input_raw_bytes, - ) + if not records: + raise ValueError("No records found in JSON input") - # sanity check: blobs are from the same document_id - doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records} - if len(doc_ids) > 1: - return self.build_failure_result( - flowFile, - ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}"), - attributes=attributes, - contents=input_raw_bytes, - ) + # sanity check: blobs are from the same document_id + doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records} + if len(doc_ids) > 1: + raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}") + + concatenated_blob_sequence_order: dict = {} + output_merged_record: dict = {} + + have_any_sequence: bool = any(self.blob_sequence_order_field_name in record for record in records) + have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records) - concatenated_blob_sequence_order: dict = {} - output_merged_record: dict = {} - - have_any_sequence: bool = any(self.blob_sequence_order_field_name in record for record in records) - have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records) - - if have_any_sequence and have_any_no_sequence: - return self.build_failure_result( - flowFile, - ValueError( - f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. " - "Cannot safely reconstruct blob stream." - ), - attributes=attributes, - contents=input_raw_bytes, + if have_any_sequence and have_any_no_sequence: + raise ValueError( + f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. " + "Cannot safely reconstruct blob stream." + ) + + for record in records: + if self.binary_field_name not in record or record[self.binary_field_name] in (None, ""): + raise ValueError(f"Missing '{self.binary_field_name}' in a record") + + if have_any_sequence: + seq = int(record[self.blob_sequence_order_field_name]) + if seq in concatenated_blob_sequence_order: + raise ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}") + + concatenated_blob_sequence_order[seq] = record[self.binary_field_name] + else: + # no sequence anywhere: preserve record order (0..n-1) + seq = len(concatenated_blob_sequence_order) + concatenated_blob_sequence_order[seq] = record[self.binary_field_name] + + # take fields from the first record, doesn't matter which one, + # as they are expected to be the same except for the binary data field + for k, v in records[0].items(): + if k not in output_merged_record and k != self.binary_field_name: + output_merged_record[k] = v + + full_compressed_blob = bytearray() + + # double check to make sure there is no gap in the blob sequence, i.e missing blob. + order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys()) + for i in range(1, len(order_of_blobs_keys)): + if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1: + raise ValueError( + f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} " + f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})" ) - for record in records: - if self.binary_field_name not in record or record[self.binary_field_name] in (None, ""): - return self.build_failure_result( - flowFile, - ValueError(f"Missing '{self.binary_field_name}' in a record"), - attributes=attributes, - contents=input_raw_bytes, - ) + for k in order_of_blobs_keys: + v = concatenated_blob_sequence_order[k] - if have_any_sequence: - seq = int(record[self.blob_sequence_order_field_name]) - if seq in concatenated_blob_sequence_order: - return self.build_failure_result( - flowFile, - ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}"), - attributes=attributes, - contents=input_raw_bytes, - ) - - concatenated_blob_sequence_order[seq] = record[self.binary_field_name] - else: - # no sequence anywhere: preserve record order (0..n-1) - seq = len(concatenated_blob_sequence_order) - concatenated_blob_sequence_order[seq] = record[self.binary_field_name] - - # take fields from the first record, doesn't matter which one, - # as they are expected to be the same except for the binary data field - for k, v in records[0].items(): - if k not in output_merged_record and k != self.binary_field_name: - output_merged_record[k] = v - - full_compressed_blob = bytearray() - - # double check to make sure there is no gap in the blob sequence, i.e missing blob. - order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys()) - for i in range(1, len(order_of_blobs_keys)): - if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1: - return self.build_failure_result( - flowFile, - ValueError( - f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} " - f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})" - ), - attributes=attributes, - contents=input_raw_bytes, - ) + temporary_blob: bytes = b"" - for k in order_of_blobs_keys: - v = concatenated_blob_sequence_order[k] - - temporary_blob: bytes = b"" - - if self.binary_field_source_encoding == "base64": - if not isinstance(v, str): - return self.build_failure_result( - flowFile, - ValueError( - f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}" - ), - attributes=attributes, - contents=input_raw_bytes, - ) - try: - temporary_blob = base64.b64decode(v, validate=True) - except Exception as e: - return self.build_failure_result( - flowFile, - ValueError(f"Error decoding base64 blob part {k}: {e}"), - attributes=attributes, - contents=input_raw_bytes, - ) + if self.binary_field_source_encoding == "base64": + if not isinstance(v, str): + raise ValueError( + f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}" + ) + try: + temporary_blob = base64.b64decode(v, validate=True) + except Exception as exc: + raise ValueError(f"Error decoding base64 blob part {k}: {exc}") from exc + else: + # raw bytes path + if isinstance(v, (bytes, bytearray)): + temporary_blob = v else: - # raw bytes path - if isinstance(v, (bytes, bytearray)): - temporary_blob = v - else: - return self.build_failure_result( - flowFile, - ValueError( - f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}" - ), - attributes=attributes, - contents=input_raw_bytes, - ) - - full_compressed_blob.extend(temporary_blob) - - # build / add new attributes to dict before doing anything else to have some trace. - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) - attributes["binary_field"] = str(self.binary_field_name) - attributes["output_text_field_name"] = str(self.output_text_field_name) - attributes["mime.type"] = "application/json" - attributes["blob_parts"] = str(len(order_of_blobs_keys)) - attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else "" - attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else "" - attributes["compressed_len"] = str(len(full_compressed_blob)) - attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex() + raise ValueError( + f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}" + ) - try: - decompress_blob = DecompressLzwCernerBlob() - decompress_blob.decompress(full_compressed_blob) - output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream) - except Exception as exception: - return self.build_failure_result( - flowFile, - exception=exception, - attributes=attributes, - include_flowfile_attributes=False, - contents=input_raw_bytes - ) + full_compressed_blob.extend(temporary_blob) - if self.output_mode == "base64": - output_merged_record[self.binary_field_name] = \ - base64.b64encode(output_merged_record[self.binary_field_name]).decode(self.output_charset) + # build / add new attributes to dict before doing anything else to have some trace. + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) + attributes["binary_field"] = str(self.binary_field_name) + attributes["output_text_field_name"] = str(self.output_text_field_name) + attributes["mime.type"] = "application/json" + attributes["blob_parts"] = str(len(order_of_blobs_keys)) + attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else "" + attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else "" + attributes["compressed_len"] = str(len(full_compressed_blob)) + attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex() - output_contents.append(output_merged_record) + try: + decompress_blob = DecompressLzwCernerBlob() + decompress_blob.decompress(full_compressed_blob) + output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream) - return FlowFileTransformResult(relationship=self.REL_SUCCESS, - attributes=attributes, - contents=json.dumps(output_contents).encode("utf-8")) except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - return self.build_failure_result( - flowFile, - exception, - attributes=attributes, - contents=locals().get("input_raw_bytes", flowFile.getContentsAsBytes()), - include_flowfile_attributes=False - ) + raise RuntimeError("Error decompressing Cerner LZW blob") from exception + + if self.output_mode == "base64": + output_merged_record[self.binary_field_name] = \ + base64.b64encode(output_merged_record[self.binary_field_name]).decode(self.output_charset) + + output_contents.append(output_merged_record) + + return FlowFileTransformResult(relationship=self.REL_SUCCESS, + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8")) diff --git a/nifi/user_python_extensions/sample_processor.py b/nifi/user_python_extensions/sample_processor.py index a92a0543..73320e63 100644 --- a/nifi/user_python_extensions/sample_processor.py +++ b/nifi/user_python_extensions/sample_processor.py @@ -1,6 +1,5 @@ import io import json -import traceback from typing import Any from avro.datafile import DataFileReader, DataFileWriter @@ -92,7 +91,7 @@ def onScheduled(self, context: ProcessContext) -> None: """ pass - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ NOTE: This is a sample method meant to be overridden and reimplemented by subclasses. @@ -115,48 +114,42 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list[Any] = [] - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - self.logger.info("Successfully transformed Avro content for OCR") - - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - - # read avro record - self.logger.debug("Reading flowfile content as bytes") - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) - - # below is an example of how to handle avro records, each record - schema: Schema | None = reader.datum_reader.writers_schema - - for record in reader: - #do stuff - pass - - # streams need to be closed - input_byte_buffer.close() - reader.close() - - # Write them to a binary avro stre - output_byte_buffer = io.BytesIO() - writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema) - - writer.flush() - writer.close() - output_byte_buffer.seek(0) - - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["sample_property_one"] = str(self.sample_property_one) - attributes["sample_property_two"] = str(self.sample_property_two) - attributes["sample_property_three"] = str(self.sample_property_three) - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents)) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + self.logger.info("Successfully transformed Avro content for OCR") + + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + + # read avro record + self.logger.debug("Reading flowfile content as bytes") + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) + + # below is an example of how to handle avro records, each record + schema: Schema | None = reader.datum_reader.writers_schema + + for record in reader: + #do stuff + pass + + # streams need to be closed + input_byte_buffer.close() + reader.close() + + # Write them to a binary avro stre + output_byte_buffer = io.BytesIO() + writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema) + + writer.flush() + writer.close() + output_byte_buffer.seek(0) + + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["sample_property_one"] = str(self.sample_property_one) + attributes["sample_property_two"] = str(self.sample_property_two) + attributes["sample_property_three"] = str(self.sample_property_three) + + return FlowFileTransformResult(relationship="success", + attributes=attributes, + contents=json.dumps(output_contents)) diff --git a/nifi/user_scripts/processors/record_decompress_cerner_blob.py b/nifi/user_scripts/processors/record_decompress_cerner_blob.py new file mode 100644 index 00000000..1b93487d --- /dev/null +++ b/nifi/user_scripts/processors/record_decompress_cerner_blob.py @@ -0,0 +1,160 @@ +import base64 +import json +import os +import sys + +try: + from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob +except ModuleNotFoundError: + # Fallback for direct script execution when PYTHONPATH does not include repository root. + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.abspath(os.path.join(script_dir, "..", "..", "..")) + if repo_root not in sys.path: + sys.path.insert(0, repo_root) + from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob + +BINARY_FIELD_NAME = "binarydoc" +OUTPUT_TEXT_FIELD_NAME = "text" +DOCUMENT_ID_FIELD_NAME = "id" +INPUT_CHARSET = "utf-8" +OUTPUT_CHARSET = "utf-8" +OUTPUT_MODE = "base64" +BINARY_FIELD_SOURCE_ENCODING = "base64" +BLOB_SEQUENCE_ORDER_FIELD_NAME = "blob_sequence_num" +OPERATION_MODE = "base64" + +for arg in sys.argv: + _arg = arg.split("=", 1) + if _arg[0] == "binary_field_name": + BINARY_FIELD_NAME = _arg[1] + elif _arg[0] == "output_text_field_name": + OUTPUT_TEXT_FIELD_NAME = _arg[1] + elif _arg[0] == "document_id_field_name": + DOCUMENT_ID_FIELD_NAME = _arg[1] + elif _arg[0] == "input_charset": + INPUT_CHARSET = _arg[1] + elif _arg[0] == "output_charset": + OUTPUT_CHARSET = _arg[1] + elif _arg[0] == "output_mode": + OUTPUT_MODE = _arg[1] + elif _arg[0] == "binary_field_source_encoding": + BINARY_FIELD_SOURCE_ENCODING = _arg[1] + elif _arg[0] == "blob_sequence_order_field_name": + BLOB_SEQUENCE_ORDER_FIELD_NAME = _arg[1] + elif _arg[0] == "operation_mode": + OPERATION_MODE = _arg[1] + + +def load_json_records(input_raw_bytes): + try: + return json.loads(input_raw_bytes.decode()) + except (UnicodeDecodeError, json.JSONDecodeError): + try: + return json.loads(input_raw_bytes.decode(INPUT_CHARSET)) + except (UnicodeDecodeError, json.JSONDecodeError): + try: + return json.loads(input_raw_bytes.decode("windows-1252")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + raise ValueError( + "Error decoding JSON after trying utf-8, " + + INPUT_CHARSET + + ", and windows-1252" + ) from exc + + +def decode_blob_part(value, blob_part): + if BINARY_FIELD_SOURCE_ENCODING == "base64": + if not isinstance(value, str): + raise ValueError( + f"Expected base64 string in {BINARY_FIELD_NAME} for part {blob_part}, got {type(value)}" + ) + + try: + return base64.b64decode(value, validate=True) + except Exception as exc: + raise ValueError(f"Error decoding base64 blob part {blob_part}: {exc}") from exc + + if isinstance(value, str): + return value.encode(INPUT_CHARSET) + + if isinstance(value, list) and all(isinstance(v, int) and 0 <= v <= 255 for v in value): + return bytes(value) + + if isinstance(value, (bytes, bytearray)): + return bytes(value) + + raise ValueError( + f"Expected bytes-like data in {BINARY_FIELD_NAME} for part {blob_part}, got {type(value)}" + ) + + +records = load_json_records(sys.stdin.buffer.read()) + +if isinstance(records, dict): + records = [records] + +if not records: + raise ValueError("No records found in JSON input") + +# keep the same sanity check as the extension: one flowfile should carry one document +doc_ids = {str(record.get(DOCUMENT_ID_FIELD_NAME, "")) for record in records} +if len(doc_ids) > 1: + raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}") + +concatenated_blob_sequence_order = {} +output_merged_record = {} + +have_any_sequence = any(BLOB_SEQUENCE_ORDER_FIELD_NAME in record for record in records) +have_any_no_sequence = any(BLOB_SEQUENCE_ORDER_FIELD_NAME not in record for record in records) + +if have_any_sequence and have_any_no_sequence: + raise ValueError( + f"Mixed records: some have '{BLOB_SEQUENCE_ORDER_FIELD_NAME}', some don't. " + "Cannot safely reconstruct blob stream." + ) + +for record in records: + if BINARY_FIELD_NAME not in record or record[BINARY_FIELD_NAME] in (None, ""): + raise ValueError(f"Missing '{BINARY_FIELD_NAME}' in a record") + + if have_any_sequence: + sequence_number = int(record[BLOB_SEQUENCE_ORDER_FIELD_NAME]) + if sequence_number in concatenated_blob_sequence_order: + raise ValueError(f"Duplicate {BLOB_SEQUENCE_ORDER_FIELD_NAME}: {sequence_number}") + concatenated_blob_sequence_order[sequence_number] = record[BINARY_FIELD_NAME] + else: + sequence_number = len(concatenated_blob_sequence_order) + concatenated_blob_sequence_order[sequence_number] = record[BINARY_FIELD_NAME] + +# copy all non-binary fields from the first input record +for k, v in records[0].items(): + if k != BINARY_FIELD_NAME and k not in output_merged_record: + output_merged_record[k] = v + +full_compressed_blob = bytearray() +blob_sequence_keys = sorted(concatenated_blob_sequence_order.keys()) + +for i in range(1, len(blob_sequence_keys)): + if blob_sequence_keys[i] != blob_sequence_keys[i - 1] + 1: + raise ValueError( + f"Sequence gap: missing {blob_sequence_keys[i - 1] + 1} " + f"(have {blob_sequence_keys[i - 1]} then {blob_sequence_keys[i]})" + ) + +for blob_part in blob_sequence_keys: + full_compressed_blob.extend( + decode_blob_part(concatenated_blob_sequence_order[blob_part], blob_part) + ) + +decompress_blob = DecompressLzwCernerBlob() +decompress_blob.decompress(full_compressed_blob) +decompressed_blob = bytes(decompress_blob.output_stream) + +if OUTPUT_MODE == "base64": + output_merged_record[BINARY_FIELD_NAME] = base64.b64encode(decompressed_blob).decode(OUTPUT_CHARSET) +elif OUTPUT_MODE == "raw": + output_merged_record[BINARY_FIELD_NAME] = decompressed_blob.decode(OUTPUT_CHARSET, errors="replace") +else: + raise ValueError(f"Unsupported output_mode: {OUTPUT_MODE}") + +sys.stdout.buffer.write(json.dumps([output_merged_record], ensure_ascii=False).encode("utf-8")) diff --git a/nifi/user_scripts/utils/nifi/base_nifi_processor.py b/nifi/user_scripts/utils/nifi/base_nifi_processor.py index f29478cf..d5915bff 100644 --- a/nifi/user_scripts/utils/nifi/base_nifi_processor.py +++ b/nifi/user_scripts/utils/nifi/base_nifi_processor.py @@ -219,11 +219,11 @@ def onScheduled(self, context: ProcessContext) -> None: """ pass - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ Process a FlowFile and return a FlowFileTransformResult. - Subclasses must override this method to implement processor logic. + Subclasses should implement this method with processor logic. Args: context: The NiFi ProcessContext for this invocation. @@ -233,3 +233,30 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr NotImplementedError: Always, until overridden by a subclass. """ raise NotImplementedError + + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + """ + NiFi entrypoint. Calls process() and builds a failure result on exceptions. + + Args: + context: The NiFi ProcessContext for this invocation. + flowFile: The FlowFile being processed. + """ + self.process_context = context + + try: + self.set_properties(context.getProperties()) + result = self.process(context, flowFile) + if not isinstance(result, FlowFileTransformResult): + raise TypeError( + f"{self.__class__.__name__}.process() must return FlowFileTransformResult, " + f"got {type(result).__name__}" + ) + return result + except Exception as exception: + self.logger.error("Exception during flowfile processing", exc_info=True) + return self.build_failure_result( + flowFile, + exception, + include_flowfile_attributes=True, + ) diff --git a/scripts/smoke_nifi_services.sh b/scripts/tests/smoke_nifi_services.sh similarity index 95% rename from scripts/smoke_nifi_services.sh rename to scripts/tests/smoke_nifi_services.sh index 0d2c7985..ef185224 100755 --- a/scripts/smoke_nifi_services.sh +++ b/scripts/tests/smoke_nifi_services.sh @@ -3,7 +3,7 @@ set -euo pipefail -ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" ENV_LOADER="${ROOT_DIR}/deploy/export_env_vars.sh" if [[ -f "$ENV_LOADER" ]]; then