Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<property name="typeClass" value="com.cloud.ha.FenceBuilder" />
</bean>

<bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
<property name="registry" ref="eventBussesRegistry" />
<property name="typeClass" value="org.apache.cloudstack.framework.events.EventBus" />
</bean>

<bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
<property name="registry" ref="hypervisorGurusRegistry" />
<property name="typeClass" value="com.cloud.hypervisor.HypervisorGuru" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,16 @@
<property name="excludeKey" value="api.commands.exclude" />
</bean>

<bean id="eventBussesRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="event.busses.exclude" />
</bean>

<bean id="hypervisorGurusRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="hypervisor.gurus.exclude" />
</bean>

<bean id="vpcProvidersRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="vpc.providers.exclude" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@

import javax.inject.Inject;

import com.cloud.utils.component.ComponentContext;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.apache.cloudstack.framework.events.EventDistributor;

import com.cloud.event.EventCategory;
import com.cloud.network.Network.Event;
import com.cloud.network.Network.State;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;

Expand All @@ -42,14 +39,16 @@ public class NetworkStateListener implements StateListener<State, Event, Network
@Inject
private ConfigurationDao _configDao;

private static EventBus s_eventBus = null;

private static final Logger s_logger = Logger.getLogger(NetworkStateListener.class);
private EventDistributor eventDistributor;

public NetworkStateListener(ConfigurationDao configDao) {
_configDao = configDao;
}

public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
}

@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Network vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
Expand All @@ -66,36 +65,30 @@ public boolean postStateTransitionEvent(StateMachine2.Transition<State, Event> t
}

private void pubishOnEventBus(String event, String status, Network vo, State oldState, State newState) {

String configKey = "publish.resource.state.events";
String value = _configDao.getValue(configKey);
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
try {
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}

String resourceName = getEntityFromClassName(Network.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
eventDescription.put("new-state", newState.name());

String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
eventDescription.put("eventDateTime", eventDate);

eventMsg.setDescription(eventDescription);
try {
s_eventBus.publish(eventMsg);
} catch (EventBusException e) {
s_logger.warn("Failed to publish state change event on the event bus.");
}
String configKey = "publish.resource.state.events";
String value = _configDao.getValue(configKey);
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
if (eventDistributor == null) {
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}

String resourceName = getEntityFromClassName(Network.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
eventDescription.put("new-state", newState.name());

String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
eventDescription.put("eventDateTime", eventDate);

eventMsg.setDescription(eventDescription);

eventDistributor.publish(eventMsg);
}

private String getEntityFromClassName(String entityClassName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public class Event {
String description;

public Event(String eventSource, String eventCategory, String eventType, String resourceType, String resourceUUID) {
this.eventCategory = eventCategory;
this.eventType = eventType;
this.eventSource = eventSource;
this.resourceType = resourceType;
this.resourceUUID = resourceUUID;
setEventCategory(eventCategory);
setEventType(eventType);
setEventSource(eventSource);
setResourceType(resourceType);
setResourceUUID(resourceUUID);
}

public String getEventCategory() {
Expand Down Expand Up @@ -68,7 +68,7 @@ public String getDescription() {

public void setDescription(Object message) {
Gson gson = new Gson();
this.description = gson.toJson(message).toString();
this.description = gson.toJson(message);
}

public void setDescription(String description) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cloudstack.framework.events;

import com.cloud.utils.component.Manager;

import java.util.List;

public interface EventDistributor extends Manager {
/**
* publish an event on to the event busses
*
* @param event event that needs to be published on the event bus
*/
List<EventBusException> publish(Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cloudstack.framework.events;

import com.cloud.utils.component.ManagerBase;
import org.apache.log4j.Logger;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

public class EventDistributorImpl extends ManagerBase implements EventDistributor {
private static final Logger LOGGER = Logger.getLogger(EventDistributorImpl.class);

public void setEventBusses(List<EventBus> eventBusses) {
this.eventBusses = eventBusses;
}

List<EventBus> eventBusses;

@PostConstruct
public void init() {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(String.format("testing %d event busses", eventBusses.size()));
}
publish(new Event("server", "NONE","starting", "server", "NONE"));
}

@Override
public List<EventBusException> publish(Event event) {
LOGGER.info(String.format("publishing %s to %d event busses", (event == null ? "<none>" : event.getDescription()), eventBusses.size()));
List<EventBusException> exceptions = new ArrayList<>();
if (event == null) {
return exceptions;
}
for (EventBus bus : eventBusses) {
try {
bus.publish(event);
} catch (EventBusException e) {
LOGGER.warn(String.format("no publish for bus %s of event %s", bus.getClass().getName(), event.getDescription()));
exceptions.add(e);
}
}
return exceptions;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd"
>
<bean id="eventDistributor"
class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
<property name="eventBusses"
value="#{eventBussesRegistry.registered}" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws Event
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
}

UUID subscriberId = UUID.randomUUID();

subscribers.put(subscriberId, new Pair<EventTopic, EventSubscriber>(topic, subscriber));
Expand All @@ -70,6 +74,9 @@ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws Event

@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
}
if (subscriberId == null) {
throw new EventBusException("Cannot unregister a null subscriberId.");
}
Expand All @@ -87,7 +94,11 @@ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws Ev

@Override
public void publish(Event event) throws EventBusException {
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
}
if (subscribers == null || subscribers.isEmpty()) {
s_logger.trace("no subscribers, no publish");
return; // no subscriber to publish to, so just return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,29 @@ public void setName(String name) {

@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
}

/* NOOP */
return UUID.randomUUID();
}

@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
}
/* NOOP */
}

@Override
public void publish(Event event) throws EventBusException {
ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription());
_producer.send(record);
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
}
ProducerRecord<String, String> newRecord = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription());
_producer.send(newRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,14 @@ public static void setRetryInterval(Integer retryInterval) {
*/
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {

if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}

if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
}

// create a UUID, that will be used for managing subscriptions and also used as queue name
// for on the queue used for the subscriber on the AMQP broker
UUID queueId = UUID.randomUUID();
Expand Down Expand Up @@ -252,6 +255,9 @@ public void handleDelivery(String queueName, Envelope envelope, AMQP.BasicProper

@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
}
try {
String classname = subscriber.getClass().getName();
String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
Expand All @@ -267,6 +273,9 @@ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws Ev
// publish event on to the exchange created on AMQP server
@Override
public void publish(Event event) throws EventBusException {
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
}

String routingKey = createRoutingKey(event);
String eventDescription = event.getDescription();
Expand Down
Loading