From 467e73594ff8fcdbcf10fe07c8798e854de4ba93 Mon Sep 17 00:00:00 2001 From: Pablete1234 Date: Wed, 22 Feb 2017 05:38:03 +0100 Subject: [PATCH 1/2] Add local queue system --- .../api/message/AbstractMessageService.java | 166 +++++++++++++++++ .../oc/api/message/LocalMessageService.java | 21 +++ .../tc/oc/api/message/MessageHandler.java | 4 +- ...{MessageQueue.java => MessageService.java} | 3 +- .../tc/oc/api/message/MessagesManifest.java | 6 +- .../tc/oc/api/message/NullMessageQueue.java | 24 --- .../main/java/tc/oc/api/model/ModelStore.java | 5 +- .../src/main/java/tc/oc/api/queue/Queue.java | 173 +----------------- .../java/tc/oc/api/queue/QueueManifest.java | 4 +- .../java/tc/oc/api/queue/Transaction.java | 3 +- .../api/minecraft/MinecraftServiceImpl.java | 6 +- .../bukkit/punishment/PunishmentEnforcer.java | 6 +- .../bukkit/report/ReportAnnouncer.java | 6 +- .../bukkit/teleport/TeleportListener.java | 6 +- .../bukkit/users/JoinMessageAnnouncer.java | 6 +- .../bukkit/whisper/WhisperDispatcher.java | 7 +- .../bungee/listeners/TeleportListener.java | 6 +- .../tc/oc/commons/core/inject/TestModule.java | 8 +- 18 files changed, 234 insertions(+), 226 deletions(-) create mode 100644 API/api/src/main/java/tc/oc/api/message/AbstractMessageService.java create mode 100644 API/api/src/main/java/tc/oc/api/message/LocalMessageService.java rename API/api/src/main/java/tc/oc/api/message/{MessageQueue.java => MessageService.java} (96%) delete mode 100644 API/api/src/main/java/tc/oc/api/message/NullMessageQueue.java diff --git a/API/api/src/main/java/tc/oc/api/message/AbstractMessageService.java b/API/api/src/main/java/tc/oc/api/message/AbstractMessageService.java new file mode 100644 index 0000000..48eb09e --- /dev/null +++ b/API/api/src/main/java/tc/oc/api/message/AbstractMessageService.java @@ -0,0 +1,166 @@ +package tc.oc.api.message; + +import java.util.HashSet; +import java.util.Set; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.concurrent.Executor; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import javax.inject.Inject; + +import com.google.common.reflect.TypeToken; +import tc.oc.commons.core.exception.ExceptionHandler; +import tc.oc.commons.core.logging.Loggers; +import tc.oc.commons.core.reflect.Methods; +import tc.oc.commons.core.reflect.Types; +import tc.oc.commons.core.util.CachingTypeMap; + +public abstract class AbstractMessageService implements MessageService { + + protected static class RegisteredHandler { + final @Nullable MessageListener listener; + final MessageHandler handler; + final @Nullable Executor executor; + + private RegisteredHandler(@Nullable MessageListener listener, MessageHandler handler, @Nullable Executor executor) { + this.listener = listener; + this.handler = handler; + this.executor = executor; + } + } + + protected Logger logger; + @Inject protected MessageRegistry messageRegistry; + @Inject protected ExceptionHandler exceptionHandler; + + protected final Set> handlers = new HashSet<>(); + public final CachingTypeMap> handlersByType = CachingTypeMap.create(); + protected volatile boolean suspended; + + @Inject void init(Loggers loggers) { + logger = loggers.get(getClass()); + } + + @Override + public void subscribe(TypeToken messageType, MessageHandler handler, @Nullable Executor executor) { + subscribe(messageType, null, handler, executor); + } + + private void subscribe(TypeToken messageType, @Nullable MessageListener listener, MessageHandler handler, @Nullable Executor executor) { + logger.fine("Subscribing handler " + handler); + synchronized(handlers) { + final RegisteredHandler registered = new RegisteredHandler<>(listener, handler, executor); + handlers.add(registered); + handlersByType.put(messageType, registered); + handlersByType.invalidate(); + } + } + + private TypeToken getMessageType(TypeToken decl, Method method) { + if(method.getParameterTypes().length != 1) { + throw new IllegalStateException("Message handler method must take 1 parameter"); + } + + final TypeToken base = new TypeToken(){}; + + for(Type param : method.getGenericParameterTypes()) { + final TypeToken paramToken = decl.resolveType(param); + Types.assertFullySpecified(paramToken); + if(base.isAssignableFrom(paramToken)) { + messageRegistry.typeName(paramToken.getRawType()); // Verify message type is registered + return paramToken; + } + } + + throw new IllegalStateException("Message handler has no message parameter"); + } + + @Override + public void subscribe(final MessageListener listener, @Nullable Executor executor) { + logger.fine("Subscribing listener " + listener); + + final TypeToken listenerType = TypeToken.of(listener.getClass()); + Methods.declaredMethodsInAncestors(listener.getClass()).forEach(method -> { + final MessageListener.HandleMessage annot = method.getAnnotation(MessageListener.HandleMessage.class); + if(annot != null) { + method.setAccessible(true); + final TypeToken messageType = getMessageType(listenerType, method); + + logger.fine(" dispatching " + messageType.getRawType().getSimpleName() + " to method " + method.getName()); + + MessageHandler handler = new MessageHandler() { + @Override + public void handleDelivery(Message message, TypeToken type) { + try { + method.invoke(listener, message); + } catch(IllegalAccessException e) { + throw new IllegalStateException(e); + } catch(InvocationTargetException e) { + if(e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new IllegalStateException(e); + } + } + } + + @Override + public String toString() { + return listener + "." + method.getName(); + } + }; + + subscribe(messageType, listener, handler, executor); + } + }); + } + + @Override + public void unsubscribe(MessageHandler handler) { + synchronized(handlers) { + handlers.removeIf(registered -> registered.handler == handler); + handlersByType.entries().removeIf(registered -> registered.getValue().handler == handler); + } + } + + @Override + public void unsubscribe(MessageListener listener) { + if(listener == null) return; + synchronized(handlers) { + handlers.removeIf(registered -> registered.listener == listener); + handlersByType.entries().removeIf(registered -> registered.getValue().listener == listener); + } + } + + protected void dispatchMessage(Message message, TypeToken type) { + final Collection> matchingHandlers; + synchronized (handlers) { + matchingHandlers = handlersByType.allAssignableFrom(type); + } + if (matchingHandlers.isEmpty()) return; + + for (final RegisteredHandler handler : matchingHandlers) { + if (suspended && handler.listener != null && + !handler.listener.listenWhileSuspended()) continue; + + logger.fine("Dispatching " + type.getType() + " to " + handler.handler.getClass()); + if (handler.executor == null) { + exceptionHandler.run(() -> handler.handler.handleDelivery(message, type)); + } else { + handler.executor.execute(() -> { + synchronized (handlers) { + // Double check from the handler's executor that it is still registered. + // This makes it much less likely to dispatch a message to a handler + // after it unsubs. It should work perfectly if the handler unsubs on + // the same thread it handles messages on. + if (!handlers.contains(handler)) return; + } + exceptionHandler.run(() -> handler.handler.handleDelivery(message, type)); + }); + } + } + } +} diff --git a/API/api/src/main/java/tc/oc/api/message/LocalMessageService.java b/API/api/src/main/java/tc/oc/api/message/LocalMessageService.java new file mode 100644 index 0000000..6779266 --- /dev/null +++ b/API/api/src/main/java/tc/oc/api/message/LocalMessageService.java @@ -0,0 +1,21 @@ +package tc.oc.api.message; + +import com.google.common.reflect.TypeToken; + +import javax.inject.Singleton; + +@Singleton +public class LocalMessageService extends AbstractMessageService { + + @Override + public void bind(Class type) {} + + public void receive(Message message, TypeToken type) { + dispatchMessage(message, type); + } + + public void receive(Message message) { + dispatchMessage(message, TypeToken.of(message.getClass())); + } + +} diff --git a/API/api/src/main/java/tc/oc/api/message/MessageHandler.java b/API/api/src/main/java/tc/oc/api/message/MessageHandler.java index 78c58d2..ca70607 100644 --- a/API/api/src/main/java/tc/oc/api/message/MessageHandler.java +++ b/API/api/src/main/java/tc/oc/api/message/MessageHandler.java @@ -1,9 +1,7 @@ package tc.oc.api.message; import com.google.common.reflect.TypeToken; -import tc.oc.api.queue.Delivery; -import tc.oc.api.queue.Metadata; public interface MessageHandler { - void handleDelivery(T message, TypeToken type, Metadata properties, Delivery delivery); + void handleDelivery(T message, TypeToken type); } diff --git a/API/api/src/main/java/tc/oc/api/message/MessageQueue.java b/API/api/src/main/java/tc/oc/api/message/MessageService.java similarity index 96% rename from API/api/src/main/java/tc/oc/api/message/MessageQueue.java rename to API/api/src/main/java/tc/oc/api/message/MessageService.java index ce96671..039af82 100644 --- a/API/api/src/main/java/tc/oc/api/message/MessageQueue.java +++ b/API/api/src/main/java/tc/oc/api/message/MessageService.java @@ -5,7 +5,7 @@ import javax.annotation.Nullable; import com.google.common.reflect.TypeToken; -public interface MessageQueue { +public interface MessageService { /** * Tell the queue to receive messages of the given type @@ -35,4 +35,5 @@ public interface MessageQueue { void unsubscribe(MessageHandler handler); void unsubscribe(MessageListener listener); + } diff --git a/API/api/src/main/java/tc/oc/api/message/MessagesManifest.java b/API/api/src/main/java/tc/oc/api/message/MessagesManifest.java index ce1b6d8..96f09d9 100644 --- a/API/api/src/main/java/tc/oc/api/message/MessagesManifest.java +++ b/API/api/src/main/java/tc/oc/api/message/MessagesManifest.java @@ -23,8 +23,10 @@ public class MessagesManifest extends HybridManifest { public void configure() { bindAndExpose(MessageRegistry.class); - publicBinder().forOptional(MessageQueue.class) - .setDefault().to(NullMessageQueue.class); + publicBinder().forOptional(MessageService.class) + .setDefault().to(LocalMessageService.class); + + bindAndExpose(LocalMessageService.class); final MessageBinder messages = new MessageBinder(publicBinder()); diff --git a/API/api/src/main/java/tc/oc/api/message/NullMessageQueue.java b/API/api/src/main/java/tc/oc/api/message/NullMessageQueue.java deleted file mode 100644 index f500b94..0000000 --- a/API/api/src/main/java/tc/oc/api/message/NullMessageQueue.java +++ /dev/null @@ -1,24 +0,0 @@ -package tc.oc.api.message; - -import java.util.concurrent.Executor; -import javax.annotation.Nullable; - -import com.google.common.reflect.TypeToken; - -public class NullMessageQueue implements MessageQueue { - - @Override - public void bind(Class type) {} - - @Override - public void subscribe(TypeToken messageType, MessageHandler handler, @Nullable Executor executor) {} - - @Override - public void subscribe(MessageListener listener, @Nullable Executor executor) {} - - @Override - public void unsubscribe(MessageHandler handler) {} - - @Override - public void unsubscribe(MessageListener listener) {} -} diff --git a/API/api/src/main/java/tc/oc/api/model/ModelStore.java b/API/api/src/main/java/tc/oc/api/model/ModelStore.java index 7c16cf4..11940a6 100644 --- a/API/api/src/main/java/tc/oc/api/model/ModelStore.java +++ b/API/api/src/main/java/tc/oc/api/model/ModelStore.java @@ -27,7 +27,7 @@ import tc.oc.api.docs.virtual.DeletableModel; import tc.oc.api.docs.virtual.Model; import tc.oc.api.document.DocumentGenerator; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.message.types.FindMultiResponse; import tc.oc.api.message.types.FindRequest; import tc.oc.api.message.types.ModelDelete; @@ -42,7 +42,8 @@ public abstract class ModelStore implements MessageListener, Co protected Logger logger; protected @Inject QueryService queryService; - protected @Inject MessageQueue primaryQueue; + protected @Inject + MessageService primaryQueue; protected @Inject @ModelSync ExecutorService modelSync; protected @Inject ModelDispatcher dispatcher; diff --git a/API/api/src/main/java/tc/oc/api/queue/Queue.java b/API/api/src/main/java/tc/oc/api/queue/Queue.java index 3846e35..9d0075d 100644 --- a/API/api/src/main/java/tc/oc/api/queue/Queue.java +++ b/API/api/src/main/java/tc/oc/api/queue/Queue.java @@ -1,19 +1,12 @@ package tc.oc.api.queue; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Type; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; -import java.util.logging.Logger; import javax.annotation.Nullable; import javax.inject.Inject; @@ -28,15 +21,11 @@ import tc.oc.api.connectable.Connectable; import tc.oc.api.message.Message; import tc.oc.api.message.MessageHandler; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; -import tc.oc.api.message.MessageRegistry; +import tc.oc.api.message.MessageService; import tc.oc.api.message.NoSuchMessageException; +import tc.oc.api.message.AbstractMessageService; import tc.oc.api.serialization.Pretty; -import tc.oc.commons.core.exception.ExceptionHandler; -import tc.oc.commons.core.logging.Loggers; -import tc.oc.commons.core.reflect.Methods; -import tc.oc.commons.core.reflect.Types; -import tc.oc.commons.core.util.CachingTypeMap; +import tc.oc.commons.core.util.Threadable; import tc.oc.minecraft.suspend.Suspendable; /** @@ -78,37 +67,20 @@ import tc.oc.minecraft.suspend.Suspendable; * * MY_QUEUE.subscribe(new MyWorker(), syncExecutor); */ -public class Queue implements MessageQueue, Connectable, Suspendable { +public class Queue extends AbstractMessageService implements MessageService, Connectable, Suspendable { - private static class RegisteredHandler { - final @Nullable MessageListener listener; - final MessageHandler handler; - final @Nullable Executor executor; - - private RegisteredHandler(@Nullable MessageListener listener, MessageHandler handler, @Nullable Executor executor) { - this.listener = listener; - this.handler = handler; - this.executor = executor; - } - } - - protected Logger logger; - @Inject protected MessageRegistry messageRegistry; @Inject protected Gson gson; @Inject @Pretty protected Gson prettyGson; @Inject protected QueueClient client; @Inject protected Exchange.Topic topic; - @Inject protected ExceptionHandler exceptionHandler; + + protected static Threadable METADATA = new Threadable<>(); protected final Consume consume; @Nullable String consumerTag; private MultiDispatcher dispatcher; - private final Set> handlers = new HashSet<>(); - private final CachingTypeMap> handlersByType = CachingTypeMap.create(); - private volatile boolean suspended; - public Consume consume() { return consume; } public String name() { return consume.name(); } public QueueClient client() { return client; } @@ -117,10 +89,6 @@ public class Queue implements MessageQueue, Connectable, Suspendable { this.consume = consume; } - @Inject void init(Loggers loggers) { - logger = loggers.get(getClass()); - } - @Override public void connect() throws IOException { logger.fine("Declaring queue"); @@ -169,112 +137,6 @@ public class Queue implements MessageQueue, Connectable, Suspendable { bind(topic, messageRegistry.typeName(type)); } - @Override - public void subscribe(TypeToken messageType, MessageHandler handler, @Nullable Executor executor) { - subscribe(messageType, null, handler, executor); - } - - private void subscribe(TypeToken messageType, @Nullable MessageListener listener, MessageHandler handler, @Nullable Executor executor) { - logger.fine("Subscribing handler " + handler); - synchronized(handlers) { - final RegisteredHandler registered = new RegisteredHandler<>(listener, handler, executor); - handlers.add(registered); - handlersByType.put(messageType, registered); - handlersByType.invalidate(); - } - } - - private TypeToken getMessageType(TypeToken decl, Method method) { - if(method.getParameterTypes().length < 1 || method.getParameterTypes().length > 3) { - throw new IllegalStateException("Message handler method must take 1 to 3 parameters"); - } - - final TypeToken base = new TypeToken(){}; - - for(Type param : method.getGenericParameterTypes()) { - final TypeToken paramToken = decl.resolveType(param); - Types.assertFullySpecified(paramToken); - if(base.isAssignableFrom(paramToken)) { - messageRegistry.typeName(paramToken.getRawType()); // Verify message type is registered - return paramToken; - } - } - - throw new IllegalStateException("Message handler has no message parameter"); - } - - @Override - public void subscribe(final MessageListener listener, @Nullable Executor executor) { - logger.fine("Subscribing listener " + listener); - - final TypeToken listenerType = TypeToken.of(listener.getClass()); - Methods.declaredMethodsInAncestors(listener.getClass()).forEach(method -> { - final MessageListener.HandleMessage annot = method.getAnnotation(MessageListener.HandleMessage.class); - if(annot != null) { - method.setAccessible(true); - final TypeToken messageType = getMessageType(listenerType, method); - - logger.fine(" dispatching " + messageType.getRawType().getSimpleName() + " to method " + method.getName()); - - MessageHandler handler = new MessageHandler() { - @Override - public void handleDelivery(Message message, TypeToken type, Metadata properties, Delivery delivery) { - try { - if(annot.protocolVersion() != -1 && annot.protocolVersion() != properties.protocolVersion()) { - return; - } - - final Class[] paramTypes = method.getParameterTypes(); - Object[] params = new Object[paramTypes.length]; - for(int i = 0; i < paramTypes.length; i++) { - if(paramTypes[i].isAssignableFrom(message.getClass())) { - params[i] = message; - } else if(paramTypes[i].isAssignableFrom(Metadata.class)) { - params[i] = properties; - } else if(paramTypes[i].isAssignableFrom(Delivery.class)) { - params[i] = delivery; - } - } - method.invoke(listener, params); - } catch(IllegalAccessException e) { - throw new IllegalStateException(e); - } catch(InvocationTargetException e) { - if(e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else { - throw new IllegalStateException(e); - } - } - } - - @Override - public String toString() { - return listener + "." + method.getName(); - } - }; - - subscribe(messageType, listener, handler, executor); - } - }); - } - - @Override - public void unsubscribe(MessageHandler handler) { - synchronized(handlers) { - handlers.removeIf(registered -> registered.handler == handler); - handlersByType.entries().removeIf(registered -> registered.getValue().handler == handler); - } - } - - @Override - public void unsubscribe(MessageListener listener) { - if(listener == null) return; - synchronized(handlers) { - handlers.removeIf(registered -> registered.listener == listener); - handlersByType.entries().removeIf(registered -> registered.getValue().listener == listener); - } - } - private class MultiDispatcher extends DefaultConsumer { private SettableFuture cancelled; @@ -329,27 +191,8 @@ public class Queue implements MessageQueue, Connectable, Suspendable { if(logger.isLoggable(Level.FINE)) { logger.fine("Received message " + properties.getType() + "\nMetadata: " + properties + "\n" + prettyGson.toJson(json)); } - - for(final RegisteredHandler handler : matchingHandlers) { - if(suspended && handler.listener != null && - !handler.listener.listenWhileSuspended()) continue; - - logger.fine("Dispatching " + amqProperties.getType() + " to " + handler.handler.getClass()); - if(handler.executor == null) { - exceptionHandler.run(() -> handler.handler.handleDelivery(message, type, properties, delivery)); - } else { - handler.executor.execute(() -> { - synchronized(handlers) { - // Double check from the handler's executor that it is still registered. - // This makes it much less likely to dispatch a message to a handler - // after it unsubs. It should work perfectly if the handler unsubs on - // the same thread it handles messages on. - if(!handlers.contains(handler)) return; - } - exceptionHandler.run(() -> handler.handler.handleDelivery(message, type, properties, delivery)); - }); - } - } + + METADATA.let(properties, () -> dispatchMessage(message, type)); } catch(Throwable t) { logger.log(Level.SEVERE, "Exception dispatching AMQP message", t); // Don't let any exceptions through to the AMQP driver or it will close the channel diff --git a/API/api/src/main/java/tc/oc/api/queue/QueueManifest.java b/API/api/src/main/java/tc/oc/api/queue/QueueManifest.java index facc75a..65120ff 100644 --- a/API/api/src/main/java/tc/oc/api/queue/QueueManifest.java +++ b/API/api/src/main/java/tc/oc/api/queue/QueueManifest.java @@ -1,6 +1,6 @@ package tc.oc.api.queue; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.commons.core.inject.HybridManifest; import tc.oc.minecraft.suspend.SuspendableBinder; @@ -17,7 +17,7 @@ public class QueueManifest extends HybridManifest { bindAndExpose(Exchange.Topic.class).asEagerSingleton(); bindAndExpose(PrimaryQueue.class).asEagerSingleton(); - publicBinder().forOptional(MessageQueue.class) + publicBinder().forOptional(MessageService.class) .setBinding().to(PrimaryQueue.class); final SuspendableBinder suspendables = new SuspendableBinder(publicBinder()); diff --git a/API/api/src/main/java/tc/oc/api/queue/Transaction.java b/API/api/src/main/java/tc/oc/api/queue/Transaction.java index ee12b11..20a9dd8 100644 --- a/API/api/src/main/java/tc/oc/api/queue/Transaction.java +++ b/API/api/src/main/java/tc/oc/api/queue/Transaction.java @@ -123,7 +123,8 @@ public class Transaction extends TimeoutFuture { this.requestType = request.getClass(); this.messageHandler = new MessageHandler() { - @Override public void handleDelivery(Message message, TypeToken type, Metadata replyProps, Delivery delivery) { + @Override public void handleDelivery(Message message, TypeToken type) { + Metadata replyProps = Queue.METADATA.need(); if(requestId.equals(replyProps.getCorrelationId())) { Transaction.this.replyQueue.unsubscribe(messageHandler); diff --git a/API/minecraft/src/main/java/tc/oc/api/minecraft/MinecraftServiceImpl.java b/API/minecraft/src/main/java/tc/oc/api/minecraft/MinecraftServiceImpl.java index b1a78b9..74e3b23 100644 --- a/API/minecraft/src/main/java/tc/oc/api/minecraft/MinecraftServiceImpl.java +++ b/API/minecraft/src/main/java/tc/oc/api/minecraft/MinecraftServiceImpl.java @@ -18,7 +18,7 @@ import tc.oc.api.docs.Server; import tc.oc.api.docs.virtual.ServerDoc; import tc.oc.api.exceptions.ApiNotConnected; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.message.types.ModelUpdate; import tc.oc.api.minecraft.config.MinecraftApiConfiguration; import tc.oc.api.minecraft.servers.LocalServerDocument; @@ -40,7 +40,7 @@ public class MinecraftServiceImpl implements MinecraftService, MessageListener, private final ServerService serverService; private final MinecraftApiConfiguration apiConfiguration; private final StartupServerDocument startupDocument; - private final MessageQueue serverQueue; + private final MessageService serverQueue; private final Server everfreshLocalServer; private @Nullable Server server; @@ -50,7 +50,7 @@ public class MinecraftServiceImpl implements MinecraftService, MessageListener, SyncExecutor syncExecutor, ServerService serverService, MinecraftApiConfiguration apiConfiguration, - MessageQueue serverQueue, + MessageService serverQueue, LocalServerDocument localServerDocument, StartupServerDocument startupDocument) { diff --git a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/punishment/PunishmentEnforcer.java b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/punishment/PunishmentEnforcer.java index 32aba19..8646185 100644 --- a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/punishment/PunishmentEnforcer.java +++ b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/punishment/PunishmentEnforcer.java @@ -12,7 +12,7 @@ import tc.oc.api.docs.Server; import tc.oc.api.docs.virtual.PunishmentDoc; import tc.oc.api.docs.virtual.ServerDoc; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.message.types.ModelUpdate; import tc.oc.api.model.UpdateService; import tc.oc.api.util.Permissions; @@ -35,7 +35,7 @@ public class PunishmentEnforcer implements Enableable, MessageListener { private final PunishmentFormatter punishmentFormatter; private final UpdateService punishmentService; - private final MessageQueue queue; + private final MessageService queue; private final Flexecutor executor; private final Server localServer; private final OnlinePlayers players; @@ -46,7 +46,7 @@ public class PunishmentEnforcer implements Enableable, MessageListener { @Inject PunishmentEnforcer(PunishmentFormatter punishmentFormatter, UpdateService punishmentService, - MessageQueue queue, + MessageService queue, @Sync Flexecutor executor, Server localServer, OnlinePlayers players, diff --git a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/report/ReportAnnouncer.java b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/report/ReportAnnouncer.java index 33ea089..f1c97a9 100644 --- a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/report/ReportAnnouncer.java +++ b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/report/ReportAnnouncer.java @@ -8,7 +8,7 @@ import net.md_5.bungee.api.chat.BaseComponent; import tc.oc.api.docs.Report; import tc.oc.api.docs.Server; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.message.types.ModelUpdate; import tc.oc.commons.bukkit.channels.AdminChannel; import tc.oc.commons.bukkit.chat.Audiences; @@ -20,13 +20,13 @@ public class ReportAnnouncer implements PluginFacet, MessageListener { private final ReportConfiguration config; private final ReportFormatter reportFormatter; - private final MessageQueue primaryQueue; + private final MessageService primaryQueue; private final MainThreadExecutor executor; private final Server localServer; private final AdminChannel adminChannel; private final Audiences audiences; - @Inject ReportAnnouncer(ReportConfiguration config, ReportFormatter reportFormatter, MessageQueue primaryQueue, MainThreadExecutor executor, Server localServer, AdminChannel adminChannel, Audiences audiences) { + @Inject ReportAnnouncer(ReportConfiguration config, ReportFormatter reportFormatter, MessageService primaryQueue, MainThreadExecutor executor, Server localServer, AdminChannel adminChannel, Audiences audiences) { this.config = config; this.reportFormatter = reportFormatter; this.primaryQueue = primaryQueue; diff --git a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/teleport/TeleportListener.java b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/teleport/TeleportListener.java index e2af2e0..ea5cbca 100644 --- a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/teleport/TeleportListener.java +++ b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/teleport/TeleportListener.java @@ -16,7 +16,7 @@ import org.bukkit.event.Listener; import org.bukkit.event.player.PlayerJoinEvent; import tc.oc.api.bukkit.users.OnlinePlayers; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.message.types.PlayerTeleportRequest; import tc.oc.commons.bukkit.permissions.PermissionRegistry; import tc.oc.commons.core.logging.Loggers; @@ -51,7 +51,7 @@ public class TeleportListener implements MessageListener, Listener, PluginFacet, } private final Logger logger; - private final MessageQueue primaryQueue; + private final MessageService primaryQueue; private final Teleporter teleporter; private final SyncExecutor syncExecutor; private final Scheduler scheduler; @@ -60,7 +60,7 @@ public class TeleportListener implements MessageListener, Listener, PluginFacet, private final Map requests = new HashMap<>(); - @Inject TeleportListener(Loggers loggers, MessageQueue primaryQueue, Teleporter teleporter, SyncExecutor syncExecutor, Scheduler scheduler, PermissionRegistry permissionRegistry, OnlinePlayers onlinePlayers) { + @Inject TeleportListener(Loggers loggers, MessageService primaryQueue, Teleporter teleporter, SyncExecutor syncExecutor, Scheduler scheduler, PermissionRegistry permissionRegistry, OnlinePlayers onlinePlayers) { this.logger = loggers.get(getClass()); this.primaryQueue = primaryQueue; this.onlinePlayers = onlinePlayers; diff --git a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/users/JoinMessageAnnouncer.java b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/users/JoinMessageAnnouncer.java index b242556..5b8d5a3 100644 --- a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/users/JoinMessageAnnouncer.java +++ b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/users/JoinMessageAnnouncer.java @@ -29,7 +29,7 @@ import tc.oc.api.docs.User; import tc.oc.api.docs.UserId; import tc.oc.api.docs.virtual.ServerDoc; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.minecraft.MinecraftService; import tc.oc.api.servers.ServerStore; import tc.oc.api.sessions.SessionChange; @@ -72,7 +72,7 @@ import static tc.oc.commons.core.util.Utils.notEqual; */ public class JoinMessageAnnouncer implements MessageListener, Listener, PluginFacet { - private final MessageQueue queue; + private final MessageService queue; private final OnlineFriends onlineFriends; private final IdentityProvider identityProvider; private final SettingManagerProvider playerSettings; @@ -91,7 +91,7 @@ public class JoinMessageAnnouncer implements MessageListener, Listener, PluginFa private final Cache pendingJoins = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build(); private final Cache pendingQuits = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build(); - @Inject JoinMessageAnnouncer(MessageQueue queue, + @Inject JoinMessageAnnouncer(MessageService queue, OnlineFriends onlineFriends, IdentityProvider identityProvider, SettingManagerProvider playerSettings, diff --git a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/whisper/WhisperDispatcher.java b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/whisper/WhisperDispatcher.java index 13a8529..bd41e1e 100644 --- a/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/whisper/WhisperDispatcher.java +++ b/Commons/bukkit/src/main/java/tc/oc/commons/bukkit/whisper/WhisperDispatcher.java @@ -7,6 +7,7 @@ import javax.inject.Inject; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; +import org.bukkit.Bukkit; import org.bukkit.command.CommandSender; import org.bukkit.entity.Player; import org.bukkit.event.EventHandler; @@ -19,7 +20,7 @@ import tc.oc.api.docs.Server; import tc.oc.api.docs.Whisper; import tc.oc.api.docs.virtual.WhisperDoc; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.message.types.ModelUpdate; import tc.oc.api.model.IdFactory; import tc.oc.api.users.UserService; @@ -39,7 +40,7 @@ public class WhisperDispatcher implements WhisperSender, Listener, MessageListen // at the same time they received it. private static final Duration MARK_READ_DELAY = Duration.ofSeconds(3); - private final MessageQueue queue; + private final MessageService queue; private final OnlinePlayers onlinePlayers; private final MainThreadExecutor executor; private final Scheduler scheduler; @@ -50,7 +51,7 @@ public class WhisperDispatcher implements WhisperSender, Listener, MessageListen private final SettingManagerProvider playerSettings; private final UserService userService; - @Inject WhisperDispatcher(MessageQueue queue, + @Inject WhisperDispatcher(MessageService queue, OnlinePlayers onlinePlayers, MainThreadExecutor executor, Scheduler scheduler, diff --git a/Commons/bungee/src/main/java/tc/oc/commons/bungee/listeners/TeleportListener.java b/Commons/bungee/src/main/java/tc/oc/commons/bungee/listeners/TeleportListener.java index b1ed388..b7d0280 100644 --- a/Commons/bungee/src/main/java/tc/oc/commons/bungee/listeners/TeleportListener.java +++ b/Commons/bungee/src/main/java/tc/oc/commons/bungee/listeners/TeleportListener.java @@ -10,7 +10,7 @@ import net.md_5.bungee.api.connection.ProxiedPlayer; import tc.oc.api.docs.Server; import tc.oc.api.docs.virtual.ServerDoc; import tc.oc.api.message.MessageListener; -import tc.oc.api.message.MessageQueue; +import tc.oc.api.message.MessageService; import tc.oc.api.message.types.PlayerTeleportRequest; import tc.oc.api.model.ModelSync; import tc.oc.commons.bungee.servers.ServerTracker; @@ -25,10 +25,10 @@ public class TeleportListener implements MessageListener, PluginFacet { private final ProxyServer proxy; private final Logger logger; private final ServerTracker serverTracker; - private final MessageQueue primaryQueue; + private final MessageService primaryQueue; private final ExecutorService executor; - @Inject TeleportListener(Loggers loggers, ProxyServer proxy, ServerTracker serverTracker, MessageQueue primaryQueue, @ModelSync ExecutorService executor) { + @Inject TeleportListener(Loggers loggers, ProxyServer proxy, ServerTracker serverTracker, MessageService primaryQueue, @ModelSync ExecutorService executor) { this.proxy = proxy; this.executor = executor; this.logger = loggers.get(getClass()); diff --git a/Util/core/src/main/java/tc/oc/commons/core/inject/TestModule.java b/Util/core/src/main/java/tc/oc/commons/core/inject/TestModule.java index 9e2265c..f270540 100644 --- a/Util/core/src/main/java/tc/oc/commons/core/inject/TestModule.java +++ b/Util/core/src/main/java/tc/oc/commons/core/inject/TestModule.java @@ -13,13 +13,11 @@ import tc.oc.minecraft.api.configuration.Configuration; /** * TODO: Should this be in the test source root? That seems to make it unavailable to downstream modules. */ -public class TestModule extends AbstractModule { +public class TestModule extends HybridManifest { @Override protected void configure() { - bind(Loggers.class).to(SimpleLoggerFactory.class); + bindAndExpose(Loggers.class).to(SimpleLoggerFactory.class); bind(Configuration.class).to(YamlConfiguration.class); - bind(ExceptionHandler.class).to(LoggingExceptionHandler.class).in(Singleton.class); - - + bindAndExpose(ExceptionHandler.class).to(LoggingExceptionHandler.class).in(Singleton.class); } } From 11b89e0dcb8249a772e1c5fff8bfcba24250fc17 Mon Sep 17 00:00:00 2001 From: Pablete1234 Date: Wed, 22 Feb 2017 08:22:59 +0100 Subject: [PATCH 2/2] Add local message system --- .../oc/api/whispers/LocalWhisperService.java | 35 +++++++++++++++++++ .../oc/api/whispers/NullWhisperService.java | 17 --------- .../oc/api/whispers/WhisperModelManifest.java | 2 +- 3 files changed, 36 insertions(+), 18 deletions(-) create mode 100644 API/api/src/main/java/tc/oc/api/whispers/LocalWhisperService.java delete mode 100644 API/api/src/main/java/tc/oc/api/whispers/NullWhisperService.java diff --git a/API/api/src/main/java/tc/oc/api/whispers/LocalWhisperService.java b/API/api/src/main/java/tc/oc/api/whispers/LocalWhisperService.java new file mode 100644 index 0000000..192c837 --- /dev/null +++ b/API/api/src/main/java/tc/oc/api/whispers/LocalWhisperService.java @@ -0,0 +1,35 @@ +package tc.oc.api.whispers; + +import com.google.common.reflect.TypeToken; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import tc.oc.api.docs.PlayerId; +import tc.oc.api.docs.Whisper; +import tc.oc.api.docs.virtual.WhisperDoc; +import tc.oc.api.exceptions.NotFound; +import tc.oc.api.message.LocalMessageService; +import tc.oc.api.message.types.ModelUpdate; +import tc.oc.api.model.NullModelService; + +public class LocalWhisperService extends NullModelService implements WhisperService { + + @Inject private LocalMessageService queue; + + @Override + public ListenableFuture forReply(PlayerId user) { + return Futures.immediateFailedFuture(new NotFound()); + } + + @Override + public ListenableFuture update(WhisperDoc.Partial partial) { + // Receives an Out object from WhisperDispatcher, and it implements Whisper, so we can just cast it. + if (partial instanceof Whisper) { + Whisper whisper = (Whisper) partial; + queue.receive((ModelUpdate) () -> whisper, new TypeToken>(){}); + return Futures.immediateFuture(whisper); + } + return Futures.immediateFailedFuture(new NotFound()); + } + +} diff --git a/API/api/src/main/java/tc/oc/api/whispers/NullWhisperService.java b/API/api/src/main/java/tc/oc/api/whispers/NullWhisperService.java deleted file mode 100644 index 6018511..0000000 --- a/API/api/src/main/java/tc/oc/api/whispers/NullWhisperService.java +++ /dev/null @@ -1,17 +0,0 @@ -package tc.oc.api.whispers; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import tc.oc.api.docs.PlayerId; -import tc.oc.api.docs.Whisper; -import tc.oc.api.docs.virtual.WhisperDoc; -import tc.oc.api.exceptions.NotFound; -import tc.oc.api.model.NullModelService; - -public class NullWhisperService extends NullModelService implements WhisperService { - - @Override - public ListenableFuture forReply(PlayerId user) { - return Futures.immediateFailedFuture(new NotFound()); - } -} diff --git a/API/api/src/main/java/tc/oc/api/whispers/WhisperModelManifest.java b/API/api/src/main/java/tc/oc/api/whispers/WhisperModelManifest.java index 7f7aac7..7d4500d 100644 --- a/API/api/src/main/java/tc/oc/api/whispers/WhisperModelManifest.java +++ b/API/api/src/main/java/tc/oc/api/whispers/WhisperModelManifest.java @@ -15,7 +15,7 @@ public class WhisperModelManifest extends HybridManifest implements ModelBinders }); OptionalBinder.newOptionalBinder(publicBinder(), WhisperService.class) - .setDefault().to(NullWhisperService.class); + .setDefault().to(LocalWhisperService.class); } }