Skip to content
Merged
94 changes: 94 additions & 0 deletions examples/server-create-custom-structures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from opcua import ua, Server
from opcua.common.type_dictionary_buider import DataTypeDictionaryBuilder, get_ua_class
from IPython import embed


class DemoServer:

def __init__(self):
self.server = Server()

self.server.set_endpoint('opc.tcp://0.0.0.0:51210/UA/SampleServer')
self.server.set_server_name('Custom structure demo server')
# idx name will be used later for creating the xml used in data type dictionary
self._idx_name = 'http://examples.freeopcua.github.io'
self.idx = self.server.register_namespace(self._idx_name)

self.dict_builder = DataTypeDictionaryBuilder(self.server, self.idx, self._idx_name, 'MyDictionary')

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
quit()

def start_server(self):
self.server.start()

def create_structure(self, name):
# save the created data type
return self.dict_builder.create_data_type(name)

def complete_creation(self):
self.dict_builder.set_dict_byte_string()


if __name__ == '__main__':

with DemoServer() as ua_server:
# add one basic structure
basic_struct_name = 'basic_structure'
basic_struct = ua_server.create_structure(basic_struct_name)
basic_struct.add_field('ID', ua.VariantType.Int32)
basic_struct.add_field('Gender', ua.VariantType.Boolean)
basic_struct.add_field('Comments', ua.VariantType.String)

# add an advance structure which uses our basic structure
nested_struct_name = 'nested_structure'
nested_struct = ua_server.create_structure(nested_struct_name)
nested_struct.add_field('Name', ua.VariantType.String)
nested_struct.add_field('Surname', ua.VariantType.String)
# add simple structure as field
nested_struct.add_field('Stuff', basic_struct)

# this operation will write the OPC dict string to our new data type dictionary
# namely the 'MyDictionary'
ua_server.complete_creation()

# get the working classes
ua_server.server.load_type_definitions()

# Create one test structure
basic_var = ua_server.server.nodes.objects.add_variable(ua.NodeId(namespaceidx=ua_server.idx), 'BasicStruct',
ua.Variant(None, ua.VariantType.Null),
datatype=basic_struct.data_type)

basic_var.set_writable()
basic_msg = get_ua_class(basic_struct_name)()
basic_msg.ID = 3
basic_msg.Gender = True
basic_msg.Comments = 'Test string'
basic_var.set_value(basic_msg)

# Create one advance test structure
nested_var = ua_server.server.nodes.objects.add_variable(ua.NodeId(namespaceidx=ua_server.idx), 'NestedStruct',
ua.Variant(None, ua.VariantType.Null),
datatype=nested_struct.data_type)

nested_var.set_writable()
nested_msg = get_ua_class(nested_struct_name)()
nested_msg.Stuff = basic_msg
nested_msg.Name = 'Max'
nested_msg.Surname = 'Karl'
nested_var.set_value(nested_msg)

ua_server.start_server()

# see the xml value in our customized dictionary 'MyDictionary', only for debugging use
print(getattr(ua_server.dict_builder, '_type_dictionary').get_dict_value())

# values can be write back and retrieved with the codes below.
basic_result = basic_var.get_value()
nested_result = nested_var.get_value()

embed()
265 changes: 265 additions & 0 deletions opcua/common/type_dictionary_buider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
from opcua import ua
from enum import Enum

import xml.etree.ElementTree as Et
import re

# Indicates which type should be OPC build in types
_ua_build_in_types = [ua_type for ua_type in ua.VariantType.__members__ if ua_type != 'ExtensionObject']


def _repl_func(m):
"""
taken from
https://stackoverflow.com/questions/1549641/how-to-capitalize-the-first-letter-of-each-word-in-a-string-python
"""
return m.group(1) + m.group(2).upper()


def _to_camel_case(name):
"""
Create python class name from an arbitrary string to CamelCase string
e.g. actionlib/TestAction -> ActionlibTestAction
turtle_actionlib/ShapeActionFeedback -> TurtleActionlibShapeActionFeedback
"""
name = re.sub(r'[^a-zA-Z0-9]+', ' ', name)
name = re.sub('(^|\s)(\S)', _repl_func, name)
name = name.replace(' ', '')
return name


class OPCTypeDictionaryBuilder:

def __init__(self, idx_name):
"""
:param idx_name: name of the name space
types in dict is created as opc:xxx, otherwise as tns:xxx
"""
head_attributes = {'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'xmlns:tns': idx_name,
'DefaultByteOrder': 'LittleEndian', 'xmlns:opc': 'http://opcfoundation.org/BinarySchema/',
'xmlns:ua': 'http://opcfoundation.org/UA/', 'TargetNamespace': idx_name}

self.etree = Et.ElementTree(Et.Element('opc:TypeDictionary', head_attributes))

name_space = Et.SubElement(self.etree.getroot(), 'opc:Import')
name_space.attrib['Namespace'] = 'http://opcfoundation.org/UA/'

self._structs_dict = {}
self._build_in_list = _ua_build_in_types

def _process_type(self, data_type):
if data_type in self._build_in_list:
data_type = 'opc:' + data_type
else:
data_type = 'tns:' + _to_camel_case(data_type)
return data_type

def _add_field(self, variable_name, data_type, struct_name):
data_type = self._process_type(data_type)
field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field')
field.attrib['Name'] = variable_name
field.attrib['TypeName'] = data_type

def _add_array_field(self, variable_name, data_type, struct_name):
data_type = self._process_type(data_type)
array_len = 'NoOf' + variable_name
field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field')
field.attrib['Name'] = array_len
field.attrib['TypeName'] = 'opc:Int32'
field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field')
field.attrib['Name'] = variable_name
field.attrib['TypeName'] = data_type
field.attrib['LengthField'] = array_len

def add_field(self, variable_name, data_type, struct_name, is_array=False):
if isinstance(data_type, Enum):
data_type = data_type.name
if is_array:
self._add_array_field(variable_name, data_type, struct_name)
else:
self._add_field(variable_name, data_type, struct_name)

def append_struct(self, name):
appended_struct = Et.SubElement(self.etree.getroot(), 'opc:StructuredType')
appended_struct.attrib['BaseType'] = 'ua:ExtensionObject'
appended_struct.attrib['Name'] = _to_camel_case(name)
self._structs_dict[name] = appended_struct
return appended_struct

def get_dict_value(self):
self.indent(self.etree.getroot())
# For debugging
# Et.dump(self.etree.getroot())
return Et.tostring(self.etree.getroot(), encoding='utf-8')

def indent(self, elem, level=0):
i = '\n' + level * ' '
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + ' '
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
self.indent(elem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i


def _reference_generator(source_id, target_id, reference_type, is_forward=True):
ref = ua.AddReferencesItem()
ref.IsForward = is_forward
ref.ReferenceTypeId = reference_type
ref.SourceNodeId = source_id
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = target_id
return ref


class DataTypeDictionaryBuilder:

def __init__(self, server, idx, idx_name, dict_name):
self._server = server
self._session_server = server.get_root_node().server
self._idx = idx
# Risk of bugs using a fixed number without checking
self._id_counter = 8000
self.dict_id = self._add_dictionary(dict_name)
self._type_dictionary = OPCTypeDictionaryBuilder(idx_name)

def _add_dictionary(self, name):
dictionary_node_id = self._nodeid_generator()
node = ua.AddNodesItem()
node.RequestedNewNodeId = dictionary_node_id
node.BrowseName = ua.QualifiedName(name, self._idx)
node.NodeClass = ua.NodeClass.Variable
node.ParentNodeId = ua.NodeId(ua.ObjectIds.OPCBinarySchema_TypeSystem, 0)
node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0)
node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDictionaryType, 0)
attrs = ua.VariableAttributes()
attrs.DisplayName = ua.LocalizedText(name)
attrs.DataType = ua.NodeId(ua.ObjectIds.ByteString)
# Value should be set after all data types created by calling set_dict_byte_string
attrs.Value = ua.Variant(None, ua.VariantType.Null)
attrs.ValueRank = -1
node.NodeAttributes = attrs
self._session_server.add_nodes([node])

return dictionary_node_id

def _nodeid_generator(self):
self._id_counter += 1
return ua.NodeId(self._id_counter, namespaceidx=self._idx, nodeidtype=ua.NodeIdType.Numeric)

def _link_nodes(self, linked_obj_node_id, data_type_node_id, description_node_id):
"""link the three node by their node ids according to UA standard"""
refs = [
# add reverse reference to BaseDataType -> Structure
_reference_generator(data_type_node_id, ua.NodeId(ua.ObjectIds.Structure, 0),
ua.NodeId(ua.ObjectIds.HasSubtype, 0), False),
# add reverse reference to created data type
_reference_generator(linked_obj_node_id, data_type_node_id,
ua.NodeId(ua.ObjectIds.HasEncoding, 0), False),
# add HasDescription link to dictionary description
_reference_generator(linked_obj_node_id, description_node_id,
ua.NodeId(ua.ObjectIds.HasDescription, 0)),
# add reverse HasDescription link
_reference_generator(description_node_id, linked_obj_node_id,
ua.NodeId(ua.ObjectIds.HasDescription, 0), False),
# add link to the type definition node
_reference_generator(linked_obj_node_id, ua.NodeId(ua.ObjectIds.DataTypeEncodingType, 0),
ua.NodeId(ua.ObjectIds.HasTypeDefinition, 0)),
# add has type definition link
_reference_generator(description_node_id, ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0),
ua.NodeId(ua.ObjectIds.HasTypeDefinition, 0)),
# add forward link of dict to description item
_reference_generator(self.dict_id, description_node_id,
ua.NodeId(ua.ObjectIds.HasComponent, 0)),
# add reverse link to dictionary
_reference_generator(description_node_id, self.dict_id,
ua.NodeId(ua.ObjectIds.HasComponent, 0), False)]
self._session_server.add_references(refs)

def _create_data_type(self, type_name):
name = _to_camel_case(type_name)
# apply for new node id
data_type_node_id = self._nodeid_generator()
description_node_id = self._nodeid_generator()
bind_obj_node_id = self._nodeid_generator()

# create data type node
dt_node = ua.AddNodesItem()
dt_node.RequestedNewNodeId = data_type_node_id
dt_node.BrowseName = ua.QualifiedName(name, self._idx)
dt_node.NodeClass = ua.NodeClass.DataType
dt_node.ParentNodeId = ua.NodeId(ua.ObjectIds.Structure, 0)
dt_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype, 0)
dt_attributes = ua.DataTypeAttributes()
dt_attributes.DisplayName = ua.LocalizedText(type_name)
dt_node.NodeAttributes = dt_attributes

# create description node
desc_node = ua.AddNodesItem()
desc_node.RequestedNewNodeId = description_node_id
desc_node.BrowseName = ua.QualifiedName(name, self._idx)
desc_node.NodeClass = ua.NodeClass.Variable
desc_node.ParentNodeId = self.dict_id
desc_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0)
desc_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0)
desc_attributes = ua.VariableAttributes()
desc_attributes.DisplayName = ua.LocalizedText(type_name)
desc_attributes.DataType = ua.NodeId(ua.ObjectIds.String)
desc_attributes.Value = ua.Variant(name, ua.VariantType.String)
desc_attributes.ValueRank = -1
desc_node.NodeAttributes = desc_attributes

# create object node which the loaded python class should link to
obj_node = ua.AddNodesItem()
obj_node.RequestedNewNodeId = bind_obj_node_id
obj_node.BrowseName = ua.QualifiedName('Default Binary', 0)
obj_node.NodeClass = ua.NodeClass.Object
obj_node.ParentNodeId = data_type_node_id
obj_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasEncoding, 0)
obj_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeEncodingType, 0)
obj_attributes = ua.ObjectAttributes()
obj_attributes.DisplayName = ua.LocalizedText('Default Binary')
obj_attributes.EventNotifier = 0
obj_node.NodeAttributes = obj_attributes

self._session_server.add_nodes([dt_node, desc_node, obj_node])
self._link_nodes(bind_obj_node_id, data_type_node_id, description_node_id)

self._type_dictionary.append_struct(type_name)
return StructNode(self, data_type_node_id, type_name)

def create_data_type(self, type_name):
return self._create_data_type(type_name)

def add_field(self, variable_name, data_type, struct_name, is_array=False):
self._type_dictionary.add_field(variable_name, data_type, struct_name, is_array)

def set_dict_byte_string(self):
dict_node = self._server.get_node(self.dict_id)
value = self._type_dictionary.get_dict_value()
dict_node.set_value(value, ua.VariantType.ByteString)


class StructNode:

def __init__(self, type_dict, data_type, name):
self._type_dict = type_dict
self.data_type = data_type
self.name = name
pass

def add_field(self, field_name, data_type, is_array=False):
# nested structure could directly use simple structure as field
if isinstance(data_type, StructNode):
data_type = data_type.name
self._type_dict.add_field(field_name, data_type, self.name, is_array)


def get_ua_class(ua_class_name):
return getattr(ua, _to_camel_case(ua_class_name))
1 change: 1 addition & 0 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tests_history import TestHistory, TestHistorySQL, TestHistoryLimits, TestHistorySQLLimits
from tests_crypto_connect import TestCryptoConnect
from tests_uaerrors import TestUaErrors
from tests_custom_structures import TypeDictionaryBuilderTest


if __name__ == '__main__':
Expand Down
Loading