Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public final class Config {
@Comment("It is not recommended to decrease this number (milliseconds)")
private int serverOnlineCheckDelay = 500;

@Comment("How many online checks need to be successful before we consider the server online")
private int minOnlineChecks = 3;

@Comment("Where to send the queue position message and what to send.")
private boolean positionMessageChat = true;
private boolean positionMessageHotBar = false;
Expand Down Expand Up @@ -207,6 +210,7 @@ public void copyFrom(Config source) {
registerTab = source.registerTab;
serverIsFullMessage = source.serverIsFullMessage;
serverOnlineCheckDelay = source.serverOnlineCheckDelay;
minOnlineChecks = source.minOnlineChecks;
positionMessageChat = source.positionMessageChat;
positionMessageHotBar = source.positionMessageHotBar;
queuePosition = source.queuePosition;
Expand Down Expand Up @@ -278,6 +282,10 @@ public int serverOnlineCheckDelay() {
return serverOnlineCheckDelay;
}

public int minOnlineChecks() {
return minOnlineChecks;
}

public boolean positionMessageChat() {
return positionMessageChat;
}
Expand Down Expand Up @@ -527,6 +535,10 @@ public void setServerIsFullMessage(String message) {
this.serverIsFullMessage = message;
}

public void setMinOnlineChecks(int minOnlineChecks) {
this.minOnlineChecks = minOnlineChecks;
}

public void setServerDownKickMessage(String message) {
this.serverDownKickMessage = message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ default void scheduleTasks(QueueListenerShared queueListener) {
final QueueGroup defaultGroup = resolvedDefaultGroup;
// Sends the position message and updates tab on an interval in chat
schedule(() -> {
boolean targetsOnline = defaultGroup.targetServers().stream().anyMatch(queueListener.getOnlineServers()::contains);
boolean targetsOnline = defaultGroup.targetServers().stream().anyMatch(queueListener.getServerStatusManager().getOnlineServers()::contains);
if (targetsOnline) {
for (QueueType type : config.getAllQueueTypes()) {
if (config.positionMessageChat()) {
Expand Down Expand Up @@ -126,25 +126,24 @@ default void scheduleTasks(QueueListenerShared queueListener) {
List<String> servers = new ArrayList<>(config.kickWhenDownServers());
CountDownLatch latch = new CountDownLatch(servers.size());
for (String server : servers) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
CompletableFuture.runAsync(() -> {
try {
Optional<ServerInfoWrapper> serverInfoWrapper = getServer(server);

if (serverInfoWrapper.isPresent()) {
if (serverInfoWrapper.get().isOnline()) {
queueListener.getOnlineServers().add(server);
queueListener.getServerStatusManager().online(server);
} else {
warning("Server %s is down!!!".formatted(server));
queueListener.getOnlineServers().remove(server);
queueListener.getServerStatusManager().offline(server);
}
} else {
warning("Server \"%s\" not set up!!! Check out: https://github.com/AlexProgrammerDE/PistonQueue/wiki/FAQ#server-not-set-up-error".formatted(server));
}
} finally {
latch.countDown();
}
});
future.exceptionally(ex -> {
}).exceptionally(ex -> {
error("Failed to check status of server " + server + ": " + ex.getMessage());
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public abstract class QueueListenerShared {
private final PistonQueuePlugin plugin;
@Getter
private final Set<String> onlineServers = ConcurrentHashMap.newKeySet();
private final ServerStatusManager serverStatusManager;
private final QueueEnvironment queueEnvironment;
private final QueuePlacementCoordinator queuePlacementCoordinator;
private final QueueMoveProcessor queueMoveProcessor;
Expand All @@ -57,8 +57,9 @@ public abstract class QueueListenerShared {

protected QueueListenerShared(PistonQueuePlugin plugin) {
this.plugin = plugin;
this.queueEnvironment = new QueueEnvironment(plugin, this::currentConfig, onlineServers);
Config config = currentConfig();
this.serverStatusManager = new ServerStatusManager(this::currentConfig);
this.queueEnvironment = new QueueEnvironment(plugin, this::currentConfig, serverStatusManager::getOnlineServers);
this.usernameValidator = new UsernameValidator(config);
this.shadowBanKickHandler = new ShadowBanKickHandler(config);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package net.pistonmaster.pistonqueue.shared.queue;

import net.pistonmaster.pistonqueue.shared.config.Config;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class ServerStatusManager {
private final Supplier<Config> configSupplier;

private final Map<String, Integer> onlinePingCounts = new ConcurrentHashMap<>();

public ServerStatusManager(Supplier<Config> configSupplier) {
this.configSupplier = configSupplier;
}

public void online(String server) {
onlinePingCounts.merge(server, 1, (oldValue, newValue) -> {
//this prevents overflows, count never goes over the configured amount
final int sum = Integer.sum(oldValue, newValue);
if (sum > configSupplier.get().minOnlineChecks()) {
return oldValue;
}
return sum;
});
}

public void offline(String server) {
onlinePingCounts.remove(server);
}

public Set<String> getOnlineServers() {
return onlinePingCounts.entrySet().stream()
.filter(entry -> entry.getValue() >= configSupplier.get().minOnlineChecks())
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

// For testing purposes so we can assert there is no overflow risk
public int getOnlinePingCount(String server) {
return onlinePingCounts.getOrDefault(server, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import net.pistonmaster.pistonqueue.shared.config.Config;
import net.pistonmaster.pistonqueue.shared.plugin.PistonQueuePlugin;
import net.pistonmaster.pistonqueue.shared.queue.QueueGroup;
import net.pistonmaster.pistonqueue.shared.queue.QueueType;
import net.pistonmaster.pistonqueue.shared.plugin.PistonQueuePlugin;

import java.util.List;
import java.util.Objects;
Expand All @@ -35,12 +35,12 @@
public final class QueueEnvironment {
private final PistonQueuePlugin plugin;
private final Supplier<Config> configSupplier;
private final Set<String> onlineServers;
private final Supplier<Set<String>> onlineServersSupplier;

public QueueEnvironment(PistonQueuePlugin plugin, Supplier<Config> configSupplier, Set<String> onlineServers) {
public QueueEnvironment(PistonQueuePlugin plugin, Supplier<Config> configSupplier, Supplier<Set<String>> onlineServersSupplier) {
this.plugin = Objects.requireNonNull(plugin, "plugin");
this.configSupplier = Objects.requireNonNull(configSupplier, "configSupplier");
this.onlineServers = Objects.requireNonNull(onlineServers, "onlineServers");
this.onlineServersSupplier = Objects.requireNonNull(onlineServersSupplier, "onlineServersSupplier");
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Plugin API is immutable for consumers")
Expand All @@ -52,9 +52,8 @@ public Config config() {
return configSupplier.get();
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Caller needs live view for synchronization")
public Set<String> onlineServers() {
return onlineServers;
return onlineServersSupplier.get();
}

public QueueGroup defaultGroup() {
Expand Down Expand Up @@ -94,14 +93,14 @@ public String defaultTarget(QueueGroup group) {
}

public boolean isGroupTargetOnline(QueueGroup group) {
return group.targetServers().stream().anyMatch(onlineServers::contains);
return group.targetServers().stream().anyMatch(onlineServers()::contains);
}

/// Checks if at least one queue server in the group is online.
///
/// @param group the queue group to check
/// @return true if at least one queue server is online
public boolean isGroupQueueServerOnline(QueueGroup group) {
return group.queueServers().stream().anyMatch(onlineServers::contains);
return group.queueServers().stream().anyMatch(onlineServers()::contains);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void redirectsToQueueWhenKickedFromDownTargetServer() {
QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
QueueGroup group = QueueTestUtils.defaultGroup(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -65,7 +65,7 @@ void doesNotRedirectWhenQueueRedirectionDisabled() {

QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -87,7 +87,7 @@ void doesNotRedirectWhenKickedFromNonTargetServer() {

QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -109,7 +109,7 @@ void doesNotRedirectWhenKickReasonDoesNotMatch() {

QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -130,7 +130,7 @@ void setsCustomKickMessageWhenEnabledAndWillDisconnect() {

QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -151,7 +151,7 @@ void doesNotSetKickMessageWhenDisabled() {

QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -172,7 +172,7 @@ void doesNotSetKickMessageWhenNotDisconnecting() {

QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -194,7 +194,7 @@ void handlesKickWithoutReason() {

QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand All @@ -217,7 +217,7 @@ void caseInsensitiveKickReasonMatching() {
QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
QueueGroup group = QueueTestUtils.defaultGroup(config);
Set<String> onlineServers = QueueTestUtils.onlineServers("queue", "target");
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueServerSelector selector = new QueueServerSelector(environment);
KickEventHandler handler = new KickEventHandler(config, environment, selector);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void removesStaleEntriesWhenPlayerDisconnected() {
QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
QueueGroup group = QueueTestUtils.defaultGroup(config);
Set<String> onlineServers = QueueTestUtils.onlineServers(group.queueServers().toArray(String[]::new));
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueCleaner cleaner = new QueueCleaner(environment);

QueueType type = QueueTestUtils.defaultQueueType(config);
Expand All @@ -56,7 +56,7 @@ void keepsActiveQueueEntries() {
QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
QueueGroup group = QueueTestUtils.defaultGroup(config);
Set<String> onlineServers = QueueTestUtils.onlineServers(group.queueServers().toArray(String[]::new));
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueCleaner cleaner = new QueueCleaner(environment);

QueueType type = QueueTestUtils.defaultQueueType(config);
Expand All @@ -76,7 +76,7 @@ void handlesEmptyQueue() {
QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
QueueGroup group = QueueTestUtils.defaultGroup(config);
Set<String> onlineServers = QueueTestUtils.onlineServers(group.queueServers().toArray(String[]::new));
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueCleaner cleaner = new QueueCleaner(environment);

cleaner.cleanGroup(group);
Expand All @@ -90,7 +90,7 @@ void removesMultipleStaleEntries() {
QueueTestUtils.TestQueuePlugin plugin = new QueueTestUtils.TestQueuePlugin(config);
QueueGroup group = QueueTestUtils.defaultGroup(config);
Set<String> onlineServers = QueueTestUtils.onlineServers(group.queueServers().toArray(String[]::new));
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, onlineServers);
QueueEnvironment environment = new QueueEnvironment(plugin, plugin::getConfiguration, () -> onlineServers);
QueueCleaner cleaner = new QueueCleaner(environment);

QueueType type = QueueTestUtils.defaultQueueType(config);
Expand Down
Loading