forked from asyncapi/java-spring-template
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMqttConfig.java
More file actions
101 lines (84 loc) · 3.82 KB
/
MqttConfig.java
File metadata and controls
101 lines (84 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
{% macro mqttConfig(asyncapi, params) %}
import {{params['userJavaPackage']}}.service.MessageHandlerService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
@Configuration
public class Config {
@Value("${mqtt.broker.host}")
private String host;
@Value("${mqtt.broker.port}")
private int port;
@Value("${mqtt.broker.username}")
private String username;
@Value("${mqtt.broker.password}")
private String password;
{% for channelName, channel in asyncapi.channels() %}
@Value("${mqtt.topic.{{-channelName-}}Topic}")
private String {{channelName}}Topic;
{% endfor %}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { host + ":" + port });
if (!StringUtils.isEmpty(username)) {
options.setUserName(username);
}
if (!StringUtils.isEmpty(password)) {
options.setPassword(password.toCharArray());
}
factory.setConnectionOptions(options);
return factory;
}
// consumer
@Autowired
MessageHandlerService messageHandlerService;
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Bean
public IntegrationFlow {{channelName | camelCase}}Flow() {
return IntegrationFlows.from({{channelName | camelCase}}Inbound())
.handle(messageHandlerService::handle{{channelName | upperFirst}})
.get();
}
@Bean
public MessageProducerSupport {{channelName | camelCase}}Inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("{{channelName | camelCase}}Subscriber",
mqttClientFactory(), {{channelName}}Topic);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
return adapter;
}
{% endif %}{% endfor %}
// publisher
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
@Bean
public MessageChannel {{channelName | camelCase}}OutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "{{channelName | camelCase}}OutboundChannel")
public MessageHandler {{channelName | camelCase}}Outbound() {
MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler("{{channelName | camelCase}}Publisher", mqttClientFactory());
pahoMessageHandler.setAsync(true);
pahoMessageHandler.setDefaultTopic({{channelName}}Topic);
return pahoMessageHandler;
}
{% endif %}{% endfor %}
}
{% endmacro %}