From 61497f888ee68ab900288a41819f3132e8a8145e Mon Sep 17 00:00:00 2001 From: okyame Date: Sat, 12 Jan 2019 06:34:13 +0100 Subject: [PATCH 1/3] Implement the security endpoints check for identification --- opcua/server/internal_server.py | 34 +++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/opcua/server/internal_server.py b/opcua/server/internal_server.py index f5ee51f2b..aa6b50136 100644 --- a/opcua/server/internal_server.py +++ b/opcua/server/internal_server.py @@ -310,18 +310,44 @@ def close_session(self, delete_subs=True): def activate_session(self, params): self.logger.info("activate session") + result = ua.ActivateSessionResult() if self.state != SessionState.Created: raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid) + + # FIXME: Get the server available UserIdentityToken policy's ids. I don't how to get it in other way. + # iserver_policy_ids = ["anonymous", "certificate_basic256sha256", "username"] + # Note that it is different from server._policyIDs = ["Anonymous", "Basic256Sha256", "Username"] + iserver_policy_ids = [] + edp = self.iserver.get_endpoints()[0] # FIXME: is it the same list whatever endpoint ? + for token in edp.UserIdentityTokens: + iserver_policy_ids.append(token.PolicyId) + + # Check if params UserIdentityToken is authorized by the server. + params_id_token = params.UserIdentityToken + if params_id_token.PolicyId not in iserver_policy_ids: + raise utils.ServiceError(ua.StatusCodes.BadIdentityTokenInvalid) + + # Route to the correct manager + if isinstance(params_id_token, ua.AnonymousIdentityToken): + pass # TODO: Call AnonymousIdentityToken Manager + + elif isinstance(params_id_token, ua.UserNameIdentityToken): + if self.user_manager.check_user_token(self, params_id_token) == False: + raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) + + elif isinstance(params_id_token, ua.X509IdentityToken): + pass # TODO: Call X509IdentityToken Manager + + else: # Not implement IdentityToken + raise utils.ServiceError(ua.StatusCodes.BadIdentityTokenInvalid) + self.nonce = utils.create_nonce(32) result.ServerNonce = self.nonce for _ in params.ClientSoftwareCertificates: result.Results.append(ua.StatusCode()) self.state = SessionState.Activated - id_token = params.UserIdentityToken - if isinstance(id_token, ua.UserNameIdentityToken): - if self.user_manager.check_user_token(self, id_token) == False: - raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) + self.logger.info("Activated internal session %s for user %s", self.name, self.user) return result From 0f4ddf39526d3a6b0e2d5d6db2e6d1456d324dd0 Mon Sep 17 00:00:00 2001 From: okyame Date: Sat, 12 Jan 2019 15:17:48 +0100 Subject: [PATCH 2/3] Add usertoken certificate validation support --- opcua/server/certificate_manager.py | 29 +++++++++++++++++++++++++++++ opcua/server/internal_server.py | 15 ++++++++++++--- opcua/server/server.py | 4 +++- 3 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 opcua/server/certificate_manager.py diff --git a/opcua/server/certificate_manager.py b/opcua/server/certificate_manager.py new file mode 100644 index 000000000..eace9668b --- /dev/null +++ b/opcua/server/certificate_manager.py @@ -0,0 +1,29 @@ +import logging + + +class CertificateManager(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.certificate_manager = self.default_certificate_manager + + def default_certificate_manager(self, isession, certificate, signature): + """ + Default certificate_manager, does nothing. + """ + self.logger.warning("There is no certificate manager. You have to set one to manage the client identity.") + return True + + def set_certificate_manager(self, certificate_manager): + """ + set up a function which will check for the authorize client certificate. Input function takes certificate + and signature as parameters and returns True of client is allowed access, False otherwise. + """ + self.certificate_manager = certificate_manager + + def check_certificate_token(self, isession, client_params): + """ + call the certificate manager + """ + return self.certificate_manager(isession, client_params.UserIdentityToken.CertificateData, + client_params.UserTokenSignature.Signature) diff --git a/opcua/server/internal_server.py b/opcua/server/internal_server.py index aa6b50136..c8fc3776c 100644 --- a/opcua/server/internal_server.py +++ b/opcua/server/internal_server.py @@ -76,6 +76,10 @@ def __init__(self, shelffile=None, parent=None): def user_manager(self): return self._parent.user_manager + @property + def certificate_manager(self): + return self._parent.certificate_manager + @property def thread_loop(self): if self.loop is None: @@ -282,6 +286,10 @@ def __init__(self, internal_server, aspace, submgr, name, user=UserManager.User. def user_manager(self): return self.iserver.user_manager + @property + def certificate_manager(self): + return self.iserver.certificate_manager + def __str__(self): return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format( self.name, self.user, self.session_id, self.authentication_token) @@ -333,13 +341,14 @@ def activate_session(self, params): pass # TODO: Call AnonymousIdentityToken Manager elif isinstance(params_id_token, ua.UserNameIdentityToken): - if self.user_manager.check_user_token(self, params_id_token) == False: + if not self.user_manager.check_user_token(self, params_id_token): raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) elif isinstance(params_id_token, ua.X509IdentityToken): - pass # TODO: Call X509IdentityToken Manager + if not self.certificate_manager.check_certificate_token(self, params): + raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) - else: # Not implement IdentityToken + else: # Not implemented IdentityToken raise utils.ServiceError(ua.StatusCodes.BadIdentityTokenInvalid) self.nonce = utils.create_nonce(32) diff --git a/opcua/server/server.py b/opcua/server/server.py index 01911ee52..da0b1615c 100644 --- a/opcua/server/server.py +++ b/opcua/server/server.py @@ -16,6 +16,7 @@ from opcua.server.internal_server import InternalServer from opcua.server.event_generator import EventGenerator from opcua.server.user_manager import UserManager +from opcua.server.certificate_manager import CertificateManager from opcua.server.discovery_service import LocalDiscoveryService from opcua.common.node import Node from opcua.common.subscription import Subscription @@ -113,7 +114,8 @@ def __init__(self, shelffile=None, iserver=None): # enable all endpoints by default self.certificate = None self.private_key = None - self.user_manager = UserManager(parent = self) + self.user_manager = UserManager(parent=self) + self.certificate_manager = CertificateManager() self._security_policy = [ ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt, From bf0493fc5e917507a067cad30590f5cc70491521 Mon Sep 17 00:00:00 2001 From: okyame Date: Sat, 12 Jan 2019 15:19:50 +0100 Subject: [PATCH 3/3] Add example of how to implement a custom certificate manager --- examples/server_certificate_manager.py | 53 ++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 examples/server_certificate_manager.py diff --git a/examples/server_certificate_manager.py b/examples/server_certificate_manager.py new file mode 100644 index 000000000..b04367724 --- /dev/null +++ b/examples/server_certificate_manager.py @@ -0,0 +1,53 @@ +from opcua import ua, Server +from opcua.crypto import uacrypto + +import time + +import sys +sys.path.insert(0, "..") + + +def simple_certificate_manager(isession, certificate, signature): + """ + Simple certificate manager that only allows clients that are authorized. + To simplify this example, we use the same certificate on the both side. + """ + server_certificate = uacrypto.der_from_x509(uacrypto.load_certificate('certificate-example.der')) + trusted_certificate = uacrypto.load_certificate('certificate-example.der') + + if uacrypto.der_from_x509(trusted_certificate) == certificate: + data = server_certificate + isession.nonce + try: + uacrypto.verify_sha256(trusted_certificate, data, signature) + return True + except: + return False + + return False + + +if __name__ == "__main__": + + # setup our server + server = Server() + server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/") + + # setup our own namespace, not really necessary but should as spec + uri = "http://examples.freeopcua.github.io" + idx = server.register_namespace(uri) + + # load server certificate and private key. This enables endpoints + # with signing and encryption. + server.load_certificate("certificate-example.der") + server.load_private_key("private-key-example.pem") + + server.certificate_manager.set_certificate_manager(simple_certificate_manager) + + # starting + server.start() + + try: + while True: + time.sleep(5) + finally: + server.stop()