Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
name: function-details-quoted-sample
namespace: default
spec:
image: streamnative/pulsar-functions-java-sample:3.2.2.1
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
replicas: 1
logTopic: persistent://public/default/logging-function-details-quoted-logs
funcConfig:
timePartitionPattern: "yyyy-MM-dd/HH'h'-mm'm'"
note: "quoted payload should survive shell parsing"
input:
topics:
- persistent://public/default/input-function-details-quoted-topic
typeClassName: java.lang.String
output:
topic: persistent://public/default/output-function-details-quoted-topic
typeClassName: java.lang.String
resources:
requests:
cpu: 50m
memory: 1G
limits:
memory: 1.1G
pulsar:
pulsarConfig: "test-pulsar"
tlsConfig:
enabled: false
allowInsecure: false
hostnameVerification: true
certSecretName: sn-platform-tls-broker
certSecretKey: ""
java:
jar: /pulsar/examples/api-examples.jar
clusterName: test
autoAck: true
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-pulsar
data:
webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080
brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650
116 changes: 116 additions & 0 deletions .ci/tests/integration/cases/quoted-function-details/verify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

set -e

E2E_DIR=$(dirname "$0")
BASE_DIR=$(cd "${E2E_DIR}"/../../../../..; pwd)
PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"}
PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"}
E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"}
MANIFESTS_FILE="${BASE_DIR}"/.ci/tests/integration/cases/quoted-function-details/manifests.yaml

source "${BASE_DIR}"/.ci/helm.sh

FUNCTION_NAME=function-details-quoted-sample
STS_NAME=${FUNCTION_NAME}-function

if [ ! "$KUBECONFIG" ]; then
export KUBECONFIG=${E2E_KUBECONFIG}
fi

if [ -z "${FUNCTION_NAME}" ]; then
echo "function name is empty"
exit 1
fi

cleanup() {
kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true
}

require_fragment() {
fragment=$1
message=$2

printf '%s' "${START_COMMAND}" | grep -F -- "${fragment}" > /dev/null 2>&1 || {
echo "${message}"
echo "${START_COMMAND}"
exit 1
}
}

trap cleanup EXIT

kubectl apply -f "${MANIFESTS_FILE}" > /dev/null 2>&1

START_COMMAND=""
for _ in $(seq 1 24); do
START_COMMAND=$(kubectl get statefulset "${STS_NAME}" -o jsonpath='{.spec.template.spec.containers[0].command[2]}' 2> /dev/null || true)
if [ -n "${START_COMMAND}" ]; then
break
fi
sleep 5
done

if [ -z "${START_COMMAND}" ]; then
echo "statefulset command is not available"
exit 1
fi

unsafe_pattern_fragment=$(cat <<'EOF'
timePartitionPattern\":\"yyyy-MM-dd/HH'h'-mm'm'
EOF
)
escaped_h_fragment=$(cat <<'EOF'
'"'"'h'"'"'
EOF
)
escaped_m_fragment=$(cat <<'EOF'
'"'"'m'"'"'
EOF
)

require_fragment "timePartitionPattern" "function details command does not include the quoted config key"
if printf '%s' "${START_COMMAND}" | grep -F -- "${unsafe_pattern_fragment}" > /dev/null 2>&1; then
echo "function details command still contains raw single quotes"
echo "${START_COMMAND}"
exit 1
fi
require_fragment "${escaped_h_fragment}" "function details command does not escape the quoted h segment"
require_fragment "${escaped_m_fragment}" "function details command does not escape the quoted m segment"

verify_fm_result=$(ci::verify_function_mesh "${FUNCTION_NAME}" 2>&1)
if [ $? -ne 0 ]; then
echo "${verify_fm_result}"
exit 1
fi

verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_exclamation_function \
persistent://public/default/input-function-details-quoted-topic \
persistent://public/default/output-function-details-quoted-topic \
test-message \
test-message! \
10 2>&1)
if [ $? -eq 0 ]; then
echo "e2e-test: ok" | yq eval -
else
echo "${verify_java_result}"
exit 1
fi
2 changes: 2 additions & 0 deletions .ci/tests/integration/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ verify:
expected: expected.data.yaml
- query: timeout 5m bash .ci/tests/integration/cases/java-function/verify.sh
expected: expected.data.yaml
- query: timeout 5m bash .ci/tests/integration/cases/quoted-function-details/verify.sh
expected: expected.data.yaml
- query: bash .ci/tests/integration/cases/java-function-vpa/verify.sh
expected: expected.data.yaml
- query: bash .ci/tests/integration/cases/reconciliation/verify.sh
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ index_build_*/
config/crd/bases/vpa-v1-crd.yaml
Listeners
.tool-versions
/tmp
2 changes: 1 addition & 1 deletion api/compute/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (o *OAuth2Config) GetMountFile() string {
}

func (o *OAuth2Config) AuthenticationParameters() string {
return fmt.Sprintf(`'{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}'`, o.GetMountFile(), o.GetMountFile(), o.GetMountFile(), o.IssuerURL, o.IssuerURL, o.Audience, o.Scope)
return fmt.Sprintf(`{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}`, o.GetMountFile(), o.GetMountFile(), o.GetMountFile(), o.IssuerURL, o.IssuerURL, o.Audience, o.Scope)
}

type GenericAuth struct {
Expand Down
20 changes: 12 additions & 8 deletions controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,12 +831,16 @@ func getOAuth2MountFile(authConfig *v1alpha1.OAuth2Config, mountPath string) str
return fmt.Sprintf("%s/%s", mountPath, authConfig.KeySecretKey)
}

func shellQuoteLiteral(value string) string {
return "'" + strings.ReplaceAll(value, "'", `'"'"'`) + "'"
}

func makeOAuth2AuthenticationParameters(authConfig *v1alpha1.OAuth2Config, mountPath string) string {
if authConfig == nil {
return ""
}
credentialsFile := getOAuth2MountFile(authConfig, mountPath)
return fmt.Sprintf(`'{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}'`,
return fmt.Sprintf(`{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}`,
credentialsFile, credentialsFile, credentialsFile, authConfig.IssuerURL, authConfig.IssuerURL, authConfig.Audience, authConfig.Scope)
}

Expand All @@ -859,14 +863,14 @@ func getPulsarAdminCommandWithEnv(authProvided, tlsProvided bool, tlsConfig TLSC
"--auth-plugin",
OAuth2AuthenticationPlugin,
"--auth-params",
makeOAuth2AuthenticationParameters(authConfig.OAuth2Config, oauth2MountPath),
shellQuoteLiteral(makeOAuth2AuthenticationParameters(authConfig.OAuth2Config, oauth2MountPath)),
}...)
} else if authConfig.GenericAuth != nil {
args = append(args, []string{
"--auth-plugin",
authConfig.GenericAuth.ClientAuthenticationPlugin,
"--auth-params",
"'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'",
shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters),
}...)
}
} else if authProvided {
Expand Down Expand Up @@ -960,13 +964,13 @@ func getPulsarctlCommandWithEnv(authProvided, tlsProvided bool, tlsConfig TLSCon
"oauth2",
"activate",
"--auth-params",
"'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'",
shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters),
"|| true ) &&",
PulsarctlExecutableFile,
"--auth-plugin",
authConfig.GenericAuth.ClientAuthenticationPlugin,
"--auth-params",
"'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'",
shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters),
"--admin-service-url",
"$" + envNames.webServiceURL,
}
Expand Down Expand Up @@ -1610,7 +1614,7 @@ func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvi
"--function_version",
"0",
"--function_details",
"'" + details + "'", //in json format
shellQuoteLiteral(details), // in json format
"--pulsar_serviceurl",
"$brokerServiceURL",
"--max_buffered_tuples",
Expand All @@ -1631,13 +1635,13 @@ func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvi
"--client_auth_plugin",
OAuth2AuthenticationPlugin,
"--client_auth_params",
authConfig.OAuth2Config.AuthenticationParameters()}...)
shellQuoteLiteral(authConfig.OAuth2Config.AuthenticationParameters())}...)
} else if authConfig.GenericAuth != nil {
args = append(args, []string{
"--client_auth_plugin",
authConfig.GenericAuth.ClientAuthenticationPlugin,
"--client_auth_params",
"'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'"}...)
shellQuoteLiteral(authConfig.GenericAuth.ClientAuthenticationParameters)}...)
}
} else if authProvided {
args = append(args, []string{
Expand Down
69 changes: 65 additions & 4 deletions controllers/spec/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ func TestGetDownloadCommand(t *testing.T) {
"oauth2",
"activate",
"--auth-params",
"'auth-params'",
shellQuoteLiteral("auth-params"),
"|| true ) &&",
PulsarctlExecutableFile,
"--auth-plugin", "auth-plugin",
"--auth-params", "'auth-params'",
"--auth-params", shellQuoteLiteral("auth-params"),
"--admin-service-url", "$webServiceURL",
"--tls-allow-insecure=true",
"--tls-enable-hostname-verification=false",
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestGetDownloadCommand(t *testing.T) {
PulsarAdminExecutableFile,
"--admin-url", "$webServiceURL",
"--auth-plugin", OAuth2AuthenticationPlugin,
"--auth-params", testOauth2.AuthenticationParameters(),
"--auth-params", shellQuoteLiteral(testOauth2.AuthenticationParameters()),
"packages", "download", "function://public/default/test@v1", "--path", "function-package.jar",
},
false,
Expand All @@ -252,7 +252,7 @@ func TestGetDownloadCommand(t *testing.T) {
PulsarAdminExecutableFile,
"--admin-url", "$webServiceURL",
"--auth-plugin", "auth-plugin",
"--auth-params", "'auth-params'",
"--auth-params", shellQuoteLiteral("auth-params"),
"packages", "download", "sink://public/default/test@v1", "--path", "sink-package.jar",
},
false,
Expand Down Expand Up @@ -335,6 +335,67 @@ func TestGetDownloadCommand(t *testing.T) {
}
}

func TestShellQuoteLiteral(t *testing.T) {
testCases := []struct {
name string
input string
expected string
}{
{
name: "empty",
input: "",
expected: "''",
},
{
name: "plain",
input: "auth-params",
expected: "'auth-params'",
},
{
name: "embedded single quote",
input: "a'b",
expected: `'a'"'"'b'`,
},
{
name: "time partition pattern",
input: "yyyy-MM-dd/HH'h'-mm'm'",
expected: `'yyyy-MM-dd/HH'"'"'h'"'"'-mm'"'"'m'"'"''`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expected, shellQuoteLiteral(tc.input))
})
}
}

func TestGetPulsarAdminCommandWithQuotedAuthParams(t *testing.T) {
authConfig := &v1alpha1.AuthConfig{
GenericAuth: &v1alpha1.GenericAuth{
ClientAuthenticationPlugin: "auth-plugin",
ClientAuthenticationParameters: `{"token":"a'b"}`,
},
}

command := getPulsarAdminCommand(false, false, nil, authConfig)
assert.Equal(t, "auth-plugin", command[4])
assert.Equal(t, shellQuoteLiteral(`{"token":"a'b"}`), command[6])
}

func TestGetPulsarctlCommandWithQuotedAuthParams(t *testing.T) {
authConfig := &v1alpha1.AuthConfig{
GenericAuth: &v1alpha1.GenericAuth{
ClientAuthenticationPlugin: "auth-plugin",
ClientAuthenticationParameters: `{"token":"a'b"}`,
},
}

command := getPulsarctlCommand(false, false, nil, authConfig)
assert.Equal(t, shellQuoteLiteral(`{"token":"a'b"}`), command[5])
assert.Equal(t, shellQuoteLiteral(`{"token":"a'b"}`), command[11])
}

func TestGetFunctionRunnerImage(t *testing.T) {
javaRuntime := v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
Expand Down
Loading
Loading