Skip to content

Commit e596b10

Browse files
committed
registry of message busses
1 parent 5c7e4b7 commit e596b10

File tree

18 files changed

+308
-149
lines changed

18 files changed

+308
-149
lines changed

core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@
3939
<property name="typeClass" value="com.cloud.ha.FenceBuilder" />
4040
</bean>
4141

42+
<bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
43+
<property name="registry" ref="eventBussesRegistry" />
44+
<property name="typeClass" value="org.apache.cloudstack.framework.events.EventBus" />
45+
</bean>
46+
4247
<bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
4348
<property name="registry" ref="hypervisorGurusRegistry" />
4449
<property name="typeClass" value="com.cloud.hypervisor.HypervisorGuru" />

core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,16 @@
287287
<property name="excludeKey" value="api.commands.exclude" />
288288
</bean>
289289

290+
<bean id="eventBussesRegistry"
291+
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
292+
<property name="excludeKey" value="event.busses.exclude" />
293+
</bean>
294+
290295
<bean id="hypervisorGurusRegistry"
291-
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
296+
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
292297
<property name="excludeKey" value="hypervisor.gurus.exclude" />
293298
</bean>
294-
299+
295300
<bean id="vpcProvidersRegistry"
296301
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
297302
<property name="excludeKey" value="vpc.providers.exclude" />

engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,13 @@
2424

2525
import javax.inject.Inject;
2626

27+
import com.cloud.utils.component.ComponentContext;
2728
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
28-
import org.apache.cloudstack.framework.events.EventBus;
29-
import org.apache.cloudstack.framework.events.EventBusException;
30-
import org.apache.log4j.Logger;
31-
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
29+
import org.apache.cloudstack.framework.events.EventDistributor;
3230

3331
import com.cloud.event.EventCategory;
3432
import com.cloud.network.Network.Event;
3533
import com.cloud.network.Network.State;
36-
import com.cloud.utils.component.ComponentContext;
3734
import com.cloud.utils.fsm.StateListener;
3835
import com.cloud.utils.fsm.StateMachine2;
3936

@@ -42,14 +39,16 @@ public class NetworkStateListener implements StateListener<State, Event, Network
4239
@Inject
4340
private ConfigurationDao _configDao;
4441

45-
private static EventBus s_eventBus = null;
46-
47-
private static final Logger s_logger = Logger.getLogger(NetworkStateListener.class);
42+
private EventDistributor eventDistributor;
4843

4944
public NetworkStateListener(ConfigurationDao configDao) {
5045
_configDao = configDao;
5146
}
5247

48+
public void setEventDistributor(EventDistributor eventDistributor) {
49+
this.eventDistributor = eventDistributor;
50+
}
51+
5352
@Override
5453
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Network vo, boolean status, Object opaque) {
5554
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -66,36 +65,30 @@ public boolean postStateTransitionEvent(StateMachine2.Transition<State, Event> t
6665
}
6766

6867
private void pubishOnEventBus(String event, String status, Network vo, State oldState, State newState) {
69-
70-
String configKey = "publish.resource.state.events";
71-
String value = _configDao.getValue(configKey);
72-
boolean configValue = Boolean.parseBoolean(value);
73-
if(!configValue)
74-
return;
75-
try {
76-
s_eventBus = ComponentContext.getComponent(EventBus.class);
77-
} catch (NoSuchBeanDefinitionException nbe) {
78-
return; // no provider is configured to provide events bus, so just return
79-
}
80-
81-
String resourceName = getEntityFromClassName(Network.class.getName());
82-
org.apache.cloudstack.framework.events.Event eventMsg =
83-
new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
84-
Map<String, String> eventDescription = new HashMap<String, String>();
85-
eventDescription.put("resource", resourceName);
86-
eventDescription.put("id", vo.getUuid());
87-
eventDescription.put("old-state", oldState.name());
88-
eventDescription.put("new-state", newState.name());
89-
90-
String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
91-
eventDescription.put("eventDateTime", eventDate);
92-
93-
eventMsg.setDescription(eventDescription);
94-
try {
95-
s_eventBus.publish(eventMsg);
96-
} catch (EventBusException e) {
97-
s_logger.warn("Failed to publish state change event on the event bus.");
98-
}
68+
String configKey = "publish.resource.state.events";
69+
String value = _configDao.getValue(configKey);
70+
boolean configValue = Boolean.parseBoolean(value);
71+
if(!configValue)
72+
return;
73+
if (eventDistributor == null) {
74+
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
75+
}
76+
77+
String resourceName = getEntityFromClassName(Network.class.getName());
78+
org.apache.cloudstack.framework.events.Event eventMsg =
79+
new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
80+
Map<String, String> eventDescription = new HashMap<>();
81+
eventDescription.put("resource", resourceName);
82+
eventDescription.put("id", vo.getUuid());
83+
eventDescription.put("old-state", oldState.name());
84+
eventDescription.put("new-state", newState.name());
85+
86+
String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
87+
eventDescription.put("eventDateTime", eventDate);
88+
89+
eventMsg.setDescription(eventDescription);
90+
91+
eventDistributor.publish(eventMsg);
9992
}
10093

10194
private String getEntityFromClassName(String entityClassName) {

framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ public class Event {
3131
String description;
3232

3333
public Event(String eventSource, String eventCategory, String eventType, String resourceType, String resourceUUID) {
34-
this.eventCategory = eventCategory;
35-
this.eventType = eventType;
36-
this.eventSource = eventSource;
37-
this.resourceType = resourceType;
38-
this.resourceUUID = resourceUUID;
34+
setEventCategory(eventCategory);
35+
setEventType(eventType);
36+
setEventSource(eventSource);
37+
setResourceType(resourceType);
38+
setResourceUUID(resourceUUID);
3939
}
4040

4141
public String getEventCategory() {
@@ -68,7 +68,7 @@ public String getDescription() {
6868

6969
public void setDescription(Object message) {
7070
Gson gson = new Gson();
71-
this.description = gson.toJson(message).toString();
71+
this.description = gson.toJson(message);
7272
}
7373

7474
public void setDescription(String description) {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cloudstack.framework.events;
21+
22+
import com.cloud.utils.component.Manager;
23+
24+
import java.util.List;
25+
26+
public interface EventDistributor extends Manager {
27+
/**
28+
* publish an event on to the event busses
29+
*
30+
* @param event event that needs to be published on the event bus
31+
*/
32+
List<EventBusException> publish(Event event);
33+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cloudstack.framework.events;
21+
22+
import com.cloud.utils.component.ManagerBase;
23+
import org.apache.log4j.Logger;
24+
25+
import javax.annotation.PostConstruct;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
29+
public class EventDistributorImpl extends ManagerBase implements EventDistributor {
30+
private static final Logger LOGGER = Logger.getLogger(EventDistributorImpl.class);
31+
32+
public void setEventBusses(List<EventBus> eventBusses) {
33+
this.eventBusses = eventBusses;
34+
}
35+
36+
List<EventBus> eventBusses;
37+
38+
@PostConstruct
39+
public void init() {
40+
if (LOGGER.isTraceEnabled()) {
41+
LOGGER.trace(String.format("testing %d event busses", eventBusses.size()));
42+
}
43+
publish(new Event("server", "NONE","starting", "server", "NONE"));
44+
}
45+
46+
@Override
47+
public List<EventBusException> publish(Event event) {
48+
LOGGER.info(String.format("publishing %s to %d event busses", (event == null ? "<none>" : event.getDescription()), eventBusses.size()));
49+
List<EventBusException> exceptions = new ArrayList<>();
50+
if (event == null) {
51+
return exceptions;
52+
}
53+
for (EventBus bus : eventBusses) {
54+
try {
55+
bus.publish(event);
56+
} catch (EventBusException e) {
57+
LOGGER.warn(String.format("no publish for bus %s of event %s", bus.getClass().getName(), event.getDescription()));
58+
exceptions.add(e);
59+
}
60+
}
61+
return exceptions;
62+
}
63+
64+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
<beans xmlns="http://www.springframework.org/schema/beans"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xmlns:context="http://www.springframework.org/schema/context"
22+
xmlns:aop="http://www.springframework.org/schema/aop"
23+
xsi:schemaLocation="http://www.springframework.org/schema/beans
24+
http://www.springframework.org/schema/beans/spring-beans.xsd
25+
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
26+
http://www.springframework.org/schema/context
27+
http://www.springframework.org/schema/context/spring-context.xsd"
28+
>
29+
<bean id="eventDistributor"
30+
class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
31+
<property name="eventBusses"
32+
value="#{eventBussesRegistry.registered}" />
33+
</bean>
34+
</beans>

plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws Event
6262
if (subscriber == null || topic == null) {
6363
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
6464
}
65+
if (s_logger.isDebugEnabled()) {
66+
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
67+
}
68+
6569
UUID subscriberId = UUID.randomUUID();
6670

6771
subscribers.put(subscriberId, new Pair<EventTopic, EventSubscriber>(topic, subscriber));
@@ -70,6 +74,9 @@ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws Event
7074

7175
@Override
7276
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
77+
if (s_logger.isDebugEnabled()) {
78+
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
79+
}
7380
if (subscriberId == null) {
7481
throw new EventBusException("Cannot unregister a null subscriberId.");
7582
}
@@ -87,7 +94,11 @@ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws Ev
8794

8895
@Override
8996
public void publish(Event event) throws EventBusException {
97+
if (s_logger.isTraceEnabled()) {
98+
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
99+
}
90100
if (subscribers == null || subscribers.isEmpty()) {
101+
s_logger.trace("no subscribers, no publish");
91102
return; // no subscriber to publish to, so just return
92103
}
93104

plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,29 @@ public void setName(String name) {
8989

9090
@Override
9191
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
92+
if (s_logger.isDebugEnabled()) {
93+
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
94+
}
95+
9296
/* NOOP */
9397
return UUID.randomUUID();
9498
}
9599

96100
@Override
97101
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
102+
if (s_logger.isDebugEnabled()) {
103+
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
104+
}
98105
/* NOOP */
99106
}
100107

101108
@Override
102109
public void publish(Event event) throws EventBusException {
103-
ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription());
104-
_producer.send(record);
110+
if (s_logger.isTraceEnabled()) {
111+
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
112+
}
113+
ProducerRecord<String, String> newRecord = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription());
114+
_producer.send(newRecord);
105115
}
106116

107117
@Override

plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,14 @@ public static void setRetryInterval(Integer retryInterval) {
187187
*/
188188
@Override
189189
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
190-
191190
if (subscriber == null || topic == null) {
192191
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
193192
}
194193

194+
if (s_logger.isDebugEnabled()) {
195+
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
196+
}
197+
195198
// create a UUID, that will be used for managing subscriptions and also used as queue name
196199
// for on the queue used for the subscriber on the AMQP broker
197200
UUID queueId = UUID.randomUUID();
@@ -252,6 +255,9 @@ public void handleDelivery(String queueName, Envelope envelope, AMQP.BasicProper
252255

253256
@Override
254257
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
258+
if (s_logger.isDebugEnabled()) {
259+
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
260+
}
255261
try {
256262
String classname = subscriber.getClass().getName();
257263
String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
@@ -267,6 +273,9 @@ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws Ev
267273
// publish event on to the exchange created on AMQP server
268274
@Override
269275
public void publish(Event event) throws EventBusException {
276+
if (s_logger.isTraceEnabled()) {
277+
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
278+
}
270279

271280
String routingKey = createRoutingKey(event);
272281
String eventDescription = event.getDescription();

0 commit comments

Comments
 (0)