diff --git a/.ci/tests/integration/cases/quoted-function-details/manifests.yaml b/.ci/tests/integration/cases/quoted-function-details/manifests.yaml new file mode 100644 index 000000000..6ae32f25c --- /dev/null +++ b/.ci/tests/integration/cases/quoted-function-details/manifests.yaml @@ -0,0 +1,46 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Function +metadata: + name: function-details-quoted-sample + namespace: default +spec: + image: streamnative/pulsar-functions-java-sample:3.2.2.1 + className: org.apache.pulsar.functions.api.examples.ExclamationFunction + replicas: 1 + logTopic: persistent://public/default/logging-function-details-quoted-logs + funcConfig: + timePartitionPattern: "yyyy-MM-dd/HH'h'-mm'm'" + note: "quoted payload should survive shell parsing" + input: + topics: + - persistent://public/default/input-function-details-quoted-topic + typeClassName: java.lang.String + output: + topic: persistent://public/default/output-function-details-quoted-topic + typeClassName: java.lang.String + resources: + requests: + cpu: 50m + memory: 1G + limits: + memory: 1.1G + pulsar: + pulsarConfig: "test-pulsar" + tlsConfig: + enabled: false + allowInsecure: false + hostnameVerification: true + certSecretName: sn-platform-tls-broker + certSecretKey: "" + java: + jar: /pulsar/examples/api-examples.jar + clusterName: test + autoAck: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-pulsar +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 diff --git a/.ci/tests/integration/cases/quoted-function-details/verify.sh b/.ci/tests/integration/cases/quoted-function-details/verify.sh new file mode 100644 index 000000000..78b19e50c --- /dev/null +++ b/.ci/tests/integration/cases/quoted-function-details/verify.sh @@ -0,0 +1,116 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..; pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} +MANIFESTS_FILE="${BASE_DIR}"/.ci/tests/integration/cases/quoted-function-details/manifests.yaml + +source "${BASE_DIR}"/.ci/helm.sh + +FUNCTION_NAME=function-details-quoted-sample +STS_NAME=${FUNCTION_NAME}-function + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +if [ -z "${FUNCTION_NAME}" ]; then + echo "function name is empty" + exit 1 +fi + +cleanup() { + kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true +} + +require_fragment() { + fragment=$1 + message=$2 + + printf '%s' "${START_COMMAND}" | grep -F -- "${fragment}" > /dev/null 2>&1 || { + echo "${message}" + echo "${START_COMMAND}" + exit 1 + } +} + +trap cleanup EXIT + +kubectl apply -f "${MANIFESTS_FILE}" > /dev/null 2>&1 + +START_COMMAND="" +for _ in $(seq 1 24); do + START_COMMAND=$(kubectl get statefulset "${STS_NAME}" -o jsonpath='{.spec.template.spec.containers[0].command[2]}' 2> /dev/null || true) + if [ -n "${START_COMMAND}" ]; then + break + fi + sleep 5 +done + +if [ -z "${START_COMMAND}" ]; then + echo "statefulset command is not available" + exit 1 +fi + +unsafe_pattern_fragment=$(cat <<'EOF' +timePartitionPattern\":\"yyyy-MM-dd/HH'h'-mm'm' +EOF +) +escaped_h_fragment=$(cat <<'EOF' +'"'"'h'"'"' +EOF +) +escaped_m_fragment=$(cat <<'EOF' +'"'"'m'"'"' +EOF +) + +require_fragment "timePartitionPattern" "function details command does not include the quoted config key" +if printf '%s' "${START_COMMAND}" | grep -F -- "${unsafe_pattern_fragment}" > /dev/null 2>&1; then + echo "function details command still contains raw single quotes" + echo "${START_COMMAND}" + exit 1 +fi +require_fragment "${escaped_h_fragment}" "function details command does not escape the quoted h segment" +require_fragment "${escaped_m_fragment}" "function details command does not escape the quoted m segment" + +verify_fm_result=$(ci::verify_function_mesh "${FUNCTION_NAME}" 2>&1) +if [ $? -ne 0 ]; then + echo "${verify_fm_result}" + exit 1 +fi + +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_exclamation_function \ + persistent://public/default/input-function-details-quoted-topic \ + persistent://public/default/output-function-details-quoted-topic \ + test-message \ + test-message! \ + 10 2>&1) +if [ $? -eq 0 ]; then + echo "e2e-test: ok" | yq eval - +else + echo "${verify_java_result}" + exit 1 +fi diff --git a/.ci/tests/integration/e2e.yaml b/.ci/tests/integration/e2e.yaml index 58bd413e5..fa87a1e30 100644 --- a/.ci/tests/integration/e2e.yaml +++ b/.ci/tests/integration/e2e.yaml @@ -133,6 +133,8 @@ verify: expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration/cases/java-function/verify.sh expected: expected.data.yaml + - query: timeout 5m bash .ci/tests/integration/cases/quoted-function-details/verify.sh + expected: expected.data.yaml - query: bash .ci/tests/integration/cases/java-function-vpa/verify.sh expected: expected.data.yaml - query: bash .ci/tests/integration/cases/reconciliation/verify.sh diff --git a/.gitignore b/.gitignore index 138f2ef41..ba204de7a 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,4 @@ index_build_*/ config/crd/bases/vpa-v1-crd.yaml Listeners .tool-versions +/tmp diff --git a/api/compute/v1alpha1/common.go b/api/compute/v1alpha1/common.go index 5d022c59b..998b21702 100644 --- a/api/compute/v1alpha1/common.go +++ b/api/compute/v1alpha1/common.go @@ -127,7 +127,7 @@ func (o *OAuth2Config) GetMountFile() string { } func (o *OAuth2Config) AuthenticationParameters() string { - return fmt.Sprintf(`'{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}'`, o.GetMountFile(), o.GetMountFile(), o.GetMountFile(), o.IssuerURL, o.IssuerURL, o.Audience, o.Scope) + return fmt.Sprintf(`{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}`, o.GetMountFile(), o.GetMountFile(), o.GetMountFile(), o.IssuerURL, o.IssuerURL, o.Audience, o.Scope) } type GenericAuth struct { diff --git a/controllers/spec/common.go b/controllers/spec/common.go index c597eb6b1..1060637ee 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -831,12 +831,16 @@ func getOAuth2MountFile(authConfig *v1alpha1.OAuth2Config, mountPath string) str return fmt.Sprintf("%s/%s", mountPath, authConfig.KeySecretKey) } +func shellQuoteLiteral(value string) string { + return "'" + strings.ReplaceAll(value, "'", `'"'"'`) + "'" +} + func makeOAuth2AuthenticationParameters(authConfig *v1alpha1.OAuth2Config, mountPath string) string { if authConfig == nil { return "" } credentialsFile := getOAuth2MountFile(authConfig, mountPath) - return fmt.Sprintf(`'{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}'`, + return fmt.Sprintf(`{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}`, credentialsFile, credentialsFile, credentialsFile, authConfig.IssuerURL, authConfig.IssuerURL, authConfig.Audience, authConfig.Scope) } @@ -859,14 +863,14 @@ func getPulsarAdminCommandWithEnv(authProvided, tlsProvided bool, tlsConfig TLSC "--auth-plugin", OAuth2AuthenticationPlugin, "--auth-params", - makeOAuth2AuthenticationParameters(authConfig.OAuth2Config, oauth2MountPath), + shellQuoteLiteral(makeOAuth2AuthenticationParameters(authConfig.OAuth2Config, oauth2MountPath)), }...) } else if authConfig.GenericAuth != nil { args = append(args, []string{ "--auth-plugin", authConfig.GenericAuth.ClientAuthenticationPlugin, "--auth-params", - "'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'", + shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters), }...) } } else if authProvided { @@ -960,13 +964,13 @@ func getPulsarctlCommandWithEnv(authProvided, tlsProvided bool, tlsConfig TLSCon "oauth2", "activate", "--auth-params", - "'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'", + shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters), "|| true ) &&", PulsarctlExecutableFile, "--auth-plugin", authConfig.GenericAuth.ClientAuthenticationPlugin, "--auth-params", - "'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'", + shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters), "--admin-service-url", "$" + envNames.webServiceURL, } @@ -1610,7 +1614,7 @@ func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvi "--function_version", "0", "--function_details", - "'" + details + "'", //in json format + shellQuoteLiteral(details), // in json format "--pulsar_serviceurl", "$brokerServiceURL", "--max_buffered_tuples", @@ -1631,13 +1635,13 @@ func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvi "--client_auth_plugin", OAuth2AuthenticationPlugin, "--client_auth_params", - authConfig.OAuth2Config.AuthenticationParameters()}...) + shellQuoteLiteral(authConfig.OAuth2Config.AuthenticationParameters())}...) } else if authConfig.GenericAuth != nil { args = append(args, []string{ "--client_auth_plugin", authConfig.GenericAuth.ClientAuthenticationPlugin, "--client_auth_params", - "'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'"}...) + shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters)}...) } } else if authProvided { args = append(args, []string{ diff --git a/controllers/spec/common_test.go b/controllers/spec/common_test.go index 9cfdd513f..a619d71aa 100644 --- a/controllers/spec/common_test.go +++ b/controllers/spec/common_test.go @@ -205,11 +205,11 @@ func TestGetDownloadCommand(t *testing.T) { "oauth2", "activate", "--auth-params", - "'auth-params'", + shellQuoteLiteral("auth-params"), "|| true ) &&", PulsarctlExecutableFile, "--auth-plugin", "auth-plugin", - "--auth-params", "'auth-params'", + "--auth-params", shellQuoteLiteral("auth-params"), "--admin-service-url", "$webServiceURL", "--tls-allow-insecure=true", "--tls-enable-hostname-verification=false", @@ -239,7 +239,7 @@ func TestGetDownloadCommand(t *testing.T) { PulsarAdminExecutableFile, "--admin-url", "$webServiceURL", "--auth-plugin", OAuth2AuthenticationPlugin, - "--auth-params", testOauth2.AuthenticationParameters(), + "--auth-params", shellQuoteLiteral(testOauth2.AuthenticationParameters()), "packages", "download", "function://public/default/test@v1", "--path", "function-package.jar", }, false, @@ -252,7 +252,7 @@ func TestGetDownloadCommand(t *testing.T) { PulsarAdminExecutableFile, "--admin-url", "$webServiceURL", "--auth-plugin", "auth-plugin", - "--auth-params", "'auth-params'", + "--auth-params", shellQuoteLiteral("auth-params"), "packages", "download", "sink://public/default/test@v1", "--path", "sink-package.jar", }, false, @@ -335,6 +335,67 @@ func TestGetDownloadCommand(t *testing.T) { } } +func TestShellQuoteLiteral(t *testing.T) { + testCases := []struct { + name string + input string + expected string + }{ + { + name: "empty", + input: "", + expected: "''", + }, + { + name: "plain", + input: "auth-params", + expected: "'auth-params'", + }, + { + name: "embedded single quote", + input: "a'b", + expected: `'a'"'"'b'`, + }, + { + name: "time partition pattern", + input: "yyyy-MM-dd/HH'h'-mm'm'", + expected: `'yyyy-MM-dd/HH'"'"'h'"'"'-mm'"'"'m'"'"''`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, shellQuoteLiteral(tc.input)) + }) + } +} + +func TestGetPulsarAdminCommandWithQuotedAuthParams(t *testing.T) { + authConfig := &v1alpha1.AuthConfig{ + GenericAuth: &v1alpha1.GenericAuth{ + ClientAuthenticationPlugin: "auth-plugin", + ClientAuthenticationParameters: `{"token":"a'b"}`, + }, + } + + command := getPulsarAdminCommand(false, false, nil, authConfig) + assert.Equal(t, "auth-plugin", command[4]) + assert.Equal(t, shellQuoteLiteral(`{"token":"a'b"}`), command[6]) +} + +func TestGetPulsarctlCommandWithQuotedAuthParams(t *testing.T) { + authConfig := &v1alpha1.AuthConfig{ + GenericAuth: &v1alpha1.GenericAuth{ + ClientAuthenticationPlugin: "auth-plugin", + ClientAuthenticationParameters: `{"token":"a'b"}`, + }, + } + + command := getPulsarctlCommand(false, false, nil, authConfig) + assert.Equal(t, shellQuoteLiteral(`{"token":"a'b"}`), command[5]) + assert.Equal(t, shellQuoteLiteral(`{"token":"a'b"}`), command[11]) +} + func TestGetFunctionRunnerImage(t *testing.T) { javaRuntime := v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{ Jar: "test.jar", diff --git a/controllers/spec/function_test.go b/controllers/spec/function_test.go index 14525fc13..ea1ca0b46 100644 --- a/controllers/spec/function_test.go +++ b/controllers/spec/function_test.go @@ -19,7 +19,6 @@ package spec import ( "encoding/json" - "regexp" "strings" "testing" @@ -198,11 +197,9 @@ func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) { startCommand := commands[2] assert.Assert(t, strings.Contains(startCommand, "--connectors_directory "+DefaultConnectorsDirectory), "start command should include connectors directory but got %s", startCommand) - re := regexp.MustCompile(`--function_details '([^']+)'`) - matches := re.FindStringSubmatch(startCommand) - assert.Assert(t, len(matches) == 2, "unable to locate function details in command: %s", startCommand) - - functionDetailsJSON := matches[1] + functionDetailsJSON := generateFunctionDetailsInJSON(function) + assert.Assert(t, strings.Contains(startCommand, "--function_details "+shellQuoteLiteral(functionDetailsJSON)), + "start command should include shell quoted function details but got %s", startCommand) details := &proto.FunctionDetails{} err := protojson.Unmarshal([]byte(functionDetailsJSON), details) assert.NilError(t, err) @@ -232,6 +229,116 @@ func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) { assert.Equal(t, producerConfig["enable.idempotence"], true) } +func TestSinkCommandShellQuotesFunctionDetails(t *testing.T) { + replicas := int32(1) + trueVal := true + sinkConfig := v1alpha1.NewConfig(map[string]interface{}{ + "partitionerType": "time", + "timePartitionDuration": "1m", + "timePartitionPattern": "yyyy-MM-dd/HH'h'-mm'm'", + }) + sink := &v1alpha1.Sink{ + TypeMeta: metav1.TypeMeta{ + Kind: "Sink", + APIVersion: "compute.functionmesh.io/v1alpha1", + }, + ObjectMeta: *makeSampleObjectMeta("time-pattern-sink"), + Spec: v1alpha1.SinkSpec{ + Name: "time-pattern-sink", + ClassName: "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink", + Tenant: "public", + Namespace: "default", + ClusterName: TestClusterName, + Input: v1alpha1.InputConf{ + Topics: []string{ + "persistent://public/default/input", + }, + TypeClassName: "org.apache.pulsar.client.api.schema.GenericRecord", + }, + SinkConfig: &sinkConfig, + Replicas: &replicas, + AutoAck: &trueVal, + Messaging: v1alpha1.Messaging{ + Pulsar: &v1alpha1.PulsarMessaging{ + PulsarConfig: TestClusterName, + }, + }, + Runtime: v1alpha1.Runtime{ + Java: &v1alpha1.JavaRuntime{ + Jar: "connectors/pulsar-io-cloud-storage.nar", + JarLocation: "", + }, + }, + Image: "streamnative/pulsar-io-cloud-storage:latest", + }, + } + + commands := MakeSinkCommand(sink) + assert.Assert(t, len(commands) == 3, "commands should be 3 but got %d", len(commands)) + + sinkDetailsJSON := generateSinkDetailsInJSON(sink) + assert.Assert(t, strings.Contains(commands[2], "--function_details "+shellQuoteLiteral(sinkDetailsJSON)), + "sink command should include shell quoted function details but got %s", commands[2]) + assert.Assert(t, strings.Contains(commands[2], `'"'"'`), + "sink command should escape embedded single quotes but got %s", commands[2]) +} + +func TestSinkCommandShellQuotesClientAuthParams(t *testing.T) { + replicas := int32(1) + trueVal := true + authParams := `{"token":"a'b"}` + sink := &v1alpha1.Sink{ + TypeMeta: metav1.TypeMeta{ + Kind: "Sink", + APIVersion: "compute.functionmesh.io/v1alpha1", + }, + ObjectMeta: *makeSampleObjectMeta("auth-config-sink"), + Spec: v1alpha1.SinkSpec{ + Name: "auth-config-sink", + ClassName: "org.apache.pulsar.io.elasticsearch.ElasticSearchSink", + Tenant: "public", + Namespace: "default", + ClusterName: TestClusterName, + Input: v1alpha1.InputConf{ + Topics: []string{ + "persistent://public/default/input", + }, + TypeClassName: "[B", + }, + SinkConfig: &v1alpha1.Config{ + Data: map[string]interface{}{ + "elasticSearchUrl": "http://localhost:9200", + }, + }, + Replicas: &replicas, + AutoAck: &trueVal, + Messaging: v1alpha1.Messaging{ + Pulsar: &v1alpha1.PulsarMessaging{ + PulsarConfig: TestClusterName, + AuthConfig: &v1alpha1.AuthConfig{ + GenericAuth: &v1alpha1.GenericAuth{ + ClientAuthenticationPlugin: "auth-plugin", + ClientAuthenticationParameters: authParams, + }, + }, + }, + }, + Runtime: v1alpha1.Runtime{ + Java: &v1alpha1.JavaRuntime{ + Jar: "connectors/pulsar-io-elastic-search.nar", + JarLocation: "", + }, + }, + Image: "streamnative/pulsar-io-elastic-search:latest", + }, + } + + commands := MakeSinkCommand(sink) + assert.Assert(t, len(commands) == 3, "commands should be 3 but got %d", len(commands)) + assert.Assert(t, strings.Contains(commands[2], "--client_auth_params "+shellQuoteLiteral(authParams)), + "sink command should shell quote client auth params but got %s", commands[2]) +} + func TestFunctionPulsarPackageServiceDownloadCommandAndPodWiring(t *testing.T) { previous := utils.EnableInitContainers defer func() {