-
Notifications
You must be signed in to change notification settings - Fork 670
Implement the security endpoints check for identification #783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| from opcua import ua, Server | ||
| from opcua.crypto import uacrypto | ||
|
|
||
| import time | ||
|
|
||
| import sys | ||
| sys.path.insert(0, "..") | ||
|
|
||
|
|
||
| def simple_certificate_manager(isession, certificate, signature): | ||
| """ | ||
| Simple certificate manager that only allows clients that are authorized. | ||
| To simplify this example, we use the same certificate on the both side. | ||
| """ | ||
| server_certificate = uacrypto.der_from_x509(uacrypto.load_certificate('certificate-example.der')) | ||
| trusted_certificate = uacrypto.load_certificate('certificate-example.der') | ||
|
|
||
| if uacrypto.der_from_x509(trusted_certificate) == certificate: | ||
| data = server_certificate + isession.nonce | ||
| try: | ||
| uacrypto.verify_sha256(trusted_certificate, data, signature) | ||
| return True | ||
| except: | ||
| return False | ||
|
|
||
| return False | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
|
|
||
| # setup our server | ||
| server = Server() | ||
| server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/") | ||
|
|
||
| # setup our own namespace, not really necessary but should as spec | ||
| uri = "http://examples.freeopcua.github.io" | ||
| idx = server.register_namespace(uri) | ||
|
|
||
| # load server certificate and private key. This enables endpoints | ||
| # with signing and encryption. | ||
| server.load_certificate("certificate-example.der") | ||
| server.load_private_key("private-key-example.pem") | ||
|
|
||
| server.certificate_manager.set_certificate_manager(simple_certificate_manager) | ||
|
|
||
| # starting | ||
| server.start() | ||
|
|
||
| try: | ||
| while True: | ||
| time.sleep(5) | ||
| finally: | ||
| server.stop() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| import logging | ||
|
|
||
|
|
||
| class CertificateManager(object): | ||
|
|
||
| def __init__(self): | ||
| self.logger = logging.getLogger(__name__) | ||
| self.certificate_manager = self.default_certificate_manager | ||
|
|
||
| def default_certificate_manager(self, isession, certificate, signature): | ||
| """ | ||
| Default certificate_manager, does nothing. | ||
| """ | ||
| self.logger.warning("There is no certificate manager. You have to set one to manage the client identity.") | ||
| return True | ||
|
|
||
| def set_certificate_manager(self, certificate_manager): | ||
| """ | ||
| set up a function which will check for the authorize client certificate. Input function takes certificate | ||
| and signature as parameters and returns True of client is allowed access, False otherwise. | ||
| """ | ||
| self.certificate_manager = certificate_manager | ||
|
|
||
| def check_certificate_token(self, isession, client_params): | ||
| """ | ||
| call the certificate manager | ||
| """ | ||
| return self.certificate_manager(isession, client_params.UserIdentityToken.CertificateData, | ||
| client_params.UserTokenSignature.Signature) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,6 +76,10 @@ def __init__(self, shelffile=None, parent=None): | |
| def user_manager(self): | ||
| return self._parent.user_manager | ||
|
|
||
| @property | ||
| def certificate_manager(self): | ||
| return self._parent.certificate_manager | ||
|
|
||
| @property | ||
| def thread_loop(self): | ||
| if self.loop is None: | ||
|
|
@@ -282,6 +286,10 @@ def __init__(self, internal_server, aspace, submgr, name, user=UserManager.User. | |
| def user_manager(self): | ||
| return self.iserver.user_manager | ||
|
|
||
| @property | ||
| def certificate_manager(self): | ||
| return self.iserver.certificate_manager | ||
|
|
||
| def __str__(self): | ||
| return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format( | ||
| self.name, self.user, self.session_id, self.authentication_token) | ||
|
|
@@ -310,18 +318,45 @@ def close_session(self, delete_subs=True): | |
|
|
||
| def activate_session(self, params): | ||
| self.logger.info("activate session") | ||
|
|
||
| result = ua.ActivateSessionResult() | ||
| if self.state != SessionState.Created: | ||
| raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid) | ||
|
|
||
| # FIXME: Get the server available UserIdentityToken policy's ids. I don't how to get it in other way. | ||
| # iserver_policy_ids = ["anonymous", "certificate_basic256sha256", "username"] | ||
| # Note that it is different from server._policyIDs = ["Anonymous", "Basic256Sha256", "Username"] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes that seems to be the correct way to get policy IDs . Just write a comment that we assume that the same polcies are used n all endpoints.. maybe add a FIXME. I am not sure what spec says but in theory it should be possible to have different policies per endpoints |
||
| iserver_policy_ids = [] | ||
| edp = self.iserver.get_endpoints()[0] # FIXME: is it the same list whatever endpoint ? | ||
| for token in edp.UserIdentityTokens: | ||
| iserver_policy_ids.append(token.PolicyId) | ||
|
|
||
| # Check if params UserIdentityToken is authorized by the server. | ||
| params_id_token = params.UserIdentityToken | ||
| if params_id_token.PolicyId not in iserver_policy_ids: | ||
| raise utils.ServiceError(ua.StatusCodes.BadIdentityTokenInvalid) | ||
|
|
||
| # Route to the correct manager | ||
| if isinstance(params_id_token, ua.AnonymousIdentityToken): | ||
| pass # TODO: Call AnonymousIdentityToken Manager | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should at least check that "anonymous" is in the list of Policies and return an UaError if not
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we use the user_manager for that? maybe just have a bool member called something like "allow_anonymous"? not sure I have not looked at that code for very long... |
||
|
|
||
| elif isinstance(params_id_token, ua.UserNameIdentityToken): | ||
| if not self.user_manager.check_user_token(self, params_id_token): | ||
| raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) | ||
|
|
||
| elif isinstance(params_id_token, ua.X509IdentityToken): | ||
| if not self.certificate_manager.check_certificate_token(self, params): | ||
| raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) | ||
|
|
||
| else: # Not implemented IdentityToken | ||
| raise utils.ServiceError(ua.StatusCodes.BadIdentityTokenInvalid) | ||
|
|
||
| self.nonce = utils.create_nonce(32) | ||
| result.ServerNonce = self.nonce | ||
| for _ in params.ClientSoftwareCertificates: | ||
| result.Results.append(ua.StatusCode()) | ||
| self.state = SessionState.Activated | ||
| id_token = params.UserIdentityToken | ||
| if isinstance(id_token, ua.UserNameIdentityToken): | ||
| if self.user_manager.check_user_token(self, id_token) == False: | ||
| raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) | ||
|
|
||
| self.logger.info("Activated internal session %s for user %s", self.name, self.user) | ||
| return result | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not looked in detail at this. But is it enough with a function? Should'nt we use a class with a few methods? just asking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it is just a function, then using the name manager is a bit wrong. A manager is an object. a function would be called something like set_certificate_validation_function or whatever