Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,7 @@ object PrivilegesBuilder {
val spec = getTableCommandSpec(command)
val functionPrivAndOpType = spec.queries(plan)
.map(plan => buildFunctions(plan, spark))
functionPrivAndOpType.map(_._1)
.reduce(_ ++ _)
.foreach(functionPriv => inputObjs += functionPriv)
inputObjs ++= functionPrivAndOpType.flatMap(_._1)

case plan => plan transformAllExpressions {
case hiveFunction: Expression if isKnownFunction(hiveFunction) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AccessResource private (val objectType: ObjectType, val catalog: Option[St
extends RangerAccessResourceImpl {
implicit def asString(obj: Object): String = if (obj != null) obj.asInstanceOf[String] else null
def getDatabase: String = getValue("database")
def getUdf: String = getValue("udf")
def getTable: String = getValue("table")
def getColumn: String = getValue("column")
def getColumns: Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) {

override def apply(v1: SparkSessionExtensions): Unit = {
v1.injectCheckRule(AuthzConfigurationChecker)
v1.injectCheckRule(RuleFunctionAuthorization)
v1.injectResolutionRule(_ => RuleReplaceShowObjectCommands)
v1.injectResolutionRule(_ => RuleApplyPermanentViewMarker)
v1.injectResolutionRule(_ => RuleApplyTypeOfMarker)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz.ranger

import scala.collection.mutable

import org.apache.ranger.plugin.policyengine.RangerAccessRequest
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.plugin.spark.authz._
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan => Unit) {
final val AUTHZ_UDF_KEY: String = "spark.kyuubi.authz.udf.enabled"
private val authzUDFEnabled: Boolean =
spark.conf.getOption(AUTHZ_UDF_KEY).exists(_.equalsIgnoreCase("true"))
override def apply(plan: LogicalPlan): Unit = {
if (!authzUDFEnabled) {
return
}

val auditHandler = new SparkRangerAuditHandler
val ugi = getAuthzUgi(spark.sparkContext)
val (inputs, _, opType) = PrivilegesBuilder.buildFunctions(plan, spark)

// Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all
// the non-duplicate requests and in the same order as the input requests.
val requests = new mutable.ArrayBuffer[AccessRequest]()
val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]()

def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = {
objects.foreach { obj =>
val resource = AccessResource(obj, opType)
val accessType = ranger.AccessType(obj, opType, isInput)
if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) {
requests += AccessRequest(resource, ugi, opType, accessType)
requestsSet.add(resource, accessType)
}
}
}

addAccessRequest(inputs, isInput = true)

val requestSeq: Seq[RangerAccessRequest] =
requests.map(_.asInstanceOf[RangerAccessRequest]).toSeq

if (authorizeInSingleCall) {
verify(requestSeq, auditHandler)
} else {
requestSeq.foreach { req =>
verify(Seq(req), auditHandler)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class PolicyJsonFileGenerator extends AnyFunSuite {
policyAccessForPermViewAccessOnly,
policyAccessForTable2AccessOnly,
policyAccessForPaimonNsTable1SelectOnly,
policyAccessForDefaultDbUDF,
// row filter
policyFilterForSrcTableKeyLessThan20,
policyFilterForPermViewKeyLessThan20,
Expand Down Expand Up @@ -371,4 +372,20 @@ class PolicyJsonFileGenerator extends AnyFunSuite {
users = List(table1OnlyUserForNs),
accesses = allowTypes(select),
delegateAdmin = true)))

private val policyAccessForDefaultDbUDF = KRangerPolicy(
name = "defaultdb_udf",
description = "Policy for default db udf",
resources = Map(
databaseRes(defaultDb),
"udf" -> KRangerPolicyResource(values = List("kyuubi_func*"))),
policyItems = List(
KRangerPolicyItem(
users = List(bob),
accesses = allowTypes(select, update, create, drop, alter, index, lock, all, read, write),
delegateAdmin = true),
KRangerPolicyItem(
users = List(kent),
accesses = allowTypes(select),
delegateAdmin = true)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,72 @@
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
"name" : "defaultdb_udf",
"policyType" : 0,
"policyPriority" : 0,
"description" : "Policy for default db udf",
"isAuditEnabled" : true,
"resources" : {
"database" : {
"values" : [ "default" ],
"isExcludes" : false,
"isRecursive" : false
},
"udf" : {
"values" : [ "kyuubi_func*" ],
"isExcludes" : false,
"isRecursive" : false
}
},
"policyItems" : [ {
"accesses" : [ {
"type" : "select",
"isAllowed" : true
}, {
"type" : "update",
"isAllowed" : true
}, {
"type" : "create",
"isAllowed" : true
}, {
"type" : "drop",
"isAllowed" : true
}, {
"type" : "alter",
"isAllowed" : true
}, {
"type" : "index",
"isAllowed" : true
}, {
"type" : "lock",
"isAllowed" : true
}, {
"type" : "all",
"isAllowed" : true
}, {
"type" : "read",
"isAllowed" : true
}, {
"type" : "write",
"isAllowed" : true
} ],
"users" : [ "bob" ],
"delegateAdmin" : true
}, {
"accesses" : [ {
"type" : "select",
"isAllowed" : true
} ],
"users" : [ "kent" ],
"delegateAdmin" : true
} ],
"isDenyAllElse" : false
}, {
"id" : 11,
"guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
"name" : "src_key_less_than_20",
"policyType" : 2,
"policyPriority" : 0,
Expand Down Expand Up @@ -539,8 +605,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 11,
"guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca",
"id" : 12,
"guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -573,8 +639,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 12,
"guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710",
"id" : 13,
"guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -612,8 +678,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 13,
"guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39",
"id" : 14,
"guid" : "aab32389-22bc-325a-af60-6eb525ffdc56",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -651,8 +717,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 14,
"guid" : "aab32389-22bc-325a-af60-6eb525ffdc56",
"id" : 15,
"guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -690,8 +756,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 15,
"guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3",
"id" : 16,
"guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -729,8 +795,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 16,
"guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf",
"id" : 17,
"guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down Expand Up @@ -768,8 +834,8 @@
} ],
"isDenyAllElse" : false
}, {
"id" : 17,
"guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb",
"id" : 18,
"guid" : "6f4922f4-5568-361a-8cdf-4ad2299f6d23",
"isEnabled" : true,
"version" : 1,
"service" : "hive_jenkins",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,89 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite
}
}

test("Built in and UDF Function Call Query") {
val plan = sql(
s"""
|SELECT
| kyuubi_fun_0('TESTSTRING') AS col1,
| kyuubi_fun_0(value) AS col2,
| abs(key) AS col3, abs(-100) AS col4,
| lower(value) AS col5,lower('TESTSTRING') AS col6
|FROM $reusedTable
|""".stripMargin).queryExecution.analyzed
val (inputs, _, _) = PrivilegesBuilder.buildFunctions(plan, spark)
assert(inputs.size === 2)
inputs.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
}

test("Function Call in Create Table/View") {
val plan1 = sql(
s"""
|CREATE TABLE table1 AS
|SELECT
| kyuubi_fun_0('KYUUBI_TESTSTRING'),
| kyuubi_fun_0(value)
|FROM $reusedTable
|""".stripMargin).queryExecution.analyzed
val (inputs1, _, _) = PrivilegesBuilder.buildFunctions(plan1, spark)
assert(inputs1.size === 2)
inputs1.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
val plan2 = sql("DROP TABLE IF EXISTS table1").queryExecution.analyzed
val (inputs2, _, _) = PrivilegesBuilder.buildFunctions(plan2, spark)
assert(inputs2.size === 0)

val plan3 = sql(
s"""
|CREATE VIEW view1 AS SELECT
| kyuubi_fun_0('KYUUBI_TESTSTRING') AS fun1,
| kyuubi_fun_0(value) AS fun2
|FROM $reusedTable
|""".stripMargin).queryExecution.analyzed
val (inputs3, _, _) = PrivilegesBuilder.buildFunctions(plan3, spark)
assert(inputs3.size === 2)
inputs3.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
val plan4 = sql("DROP VIEW IF EXISTS view1").queryExecution.analyzed
val (inputs4, _, _) = PrivilegesBuilder.buildFunctions(plan4, spark)
assert(inputs4.size === 0)
}

test("Function Call in INSERT OVERWRITE") {
val plan = sql(
s"""
|INSERT OVERWRITE TABLE $reusedTable
|SELECT key, kyuubi_fun_0(value)
|FROM $reusedPartTable
|""".stripMargin).queryExecution.analyzed
val (inputsUpdate, _, _) = PrivilegesBuilder.buildFunctions(plan, spark)
assert(inputsUpdate.size === 1)
inputsUpdate.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.dbname startsWith reusedDb.toLowerCase)
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
val accessType = ranger.AccessType(po, QUERY, isInput = true)
assert(accessType === AccessType.SELECT)
}
}
}
Loading
Loading