Add local queue system

This commit is contained in:
Pablete1234 2017-02-22 05:38:03 +01:00
parent 0562d80699
commit 467e73594f
No known key found for this signature in database
GPG Key ID: DAFF9A337EF9A5FA
18 changed files with 234 additions and 226 deletions

View File

@ -0,0 +1,166 @@
package tc.oc.api.message;
import java.util.HashSet;
import java.util.Set;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;
import com.google.common.reflect.TypeToken;
import tc.oc.commons.core.exception.ExceptionHandler;
import tc.oc.commons.core.logging.Loggers;
import tc.oc.commons.core.reflect.Methods;
import tc.oc.commons.core.reflect.Types;
import tc.oc.commons.core.util.CachingTypeMap;
public abstract class AbstractMessageService implements MessageService {
protected static class RegisteredHandler<T extends Message> {
final @Nullable MessageListener listener;
final MessageHandler<T> handler;
final @Nullable Executor executor;
private RegisteredHandler(@Nullable MessageListener listener, MessageHandler<T> handler, @Nullable Executor executor) {
this.listener = listener;
this.handler = handler;
this.executor = executor;
}
}
protected Logger logger;
@Inject protected MessageRegistry messageRegistry;
@Inject protected ExceptionHandler exceptionHandler;
protected final Set<RegisteredHandler<?>> handlers = new HashSet<>();
public final CachingTypeMap<Message, RegisteredHandler<?>> handlersByType = CachingTypeMap.create();
protected volatile boolean suspended;
@Inject void init(Loggers loggers) {
logger = loggers.get(getClass());
}
@Override
public <T extends Message> void subscribe(TypeToken<T> messageType, MessageHandler<T> handler, @Nullable Executor executor) {
subscribe(messageType, null, handler, executor);
}
private <T extends Message> void subscribe(TypeToken<T> messageType, @Nullable MessageListener listener, MessageHandler<T> handler, @Nullable Executor executor) {
logger.fine("Subscribing handler " + handler);
synchronized(handlers) {
final RegisteredHandler<T> registered = new RegisteredHandler<>(listener, handler, executor);
handlers.add(registered);
handlersByType.put(messageType, registered);
handlersByType.invalidate();
}
}
private TypeToken<? extends Message> getMessageType(TypeToken decl, Method method) {
if(method.getParameterTypes().length != 1) {
throw new IllegalStateException("Message handler method must take 1 parameter");
}
final TypeToken<Message> base = new TypeToken<Message>(){};
for(Type param : method.getGenericParameterTypes()) {
final TypeToken paramToken = decl.resolveType(param);
Types.assertFullySpecified(paramToken);
if(base.isAssignableFrom(paramToken)) {
messageRegistry.typeName(paramToken.getRawType()); // Verify message type is registered
return paramToken;
}
}
throw new IllegalStateException("Message handler has no message parameter");
}
@Override
public void subscribe(final MessageListener listener, @Nullable Executor executor) {
logger.fine("Subscribing listener " + listener);
final TypeToken<? extends MessageListener> listenerType = TypeToken.of(listener.getClass());
Methods.declaredMethodsInAncestors(listener.getClass()).forEach(method -> {
final MessageListener.HandleMessage annot = method.getAnnotation(MessageListener.HandleMessage.class);
if(annot != null) {
method.setAccessible(true);
final TypeToken<? extends Message> messageType = getMessageType(listenerType, method);
logger.fine(" dispatching " + messageType.getRawType().getSimpleName() + " to method " + method.getName());
MessageHandler handler = new MessageHandler() {
@Override
public void handleDelivery(Message message, TypeToken type) {
try {
method.invoke(listener, message);
} catch(IllegalAccessException e) {
throw new IllegalStateException(e);
} catch(InvocationTargetException e) {
if(e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new IllegalStateException(e);
}
}
}
@Override
public String toString() {
return listener + "." + method.getName();
}
};
subscribe(messageType, listener, handler, executor);
}
});
}
@Override
public void unsubscribe(MessageHandler<?> handler) {
synchronized(handlers) {
handlers.removeIf(registered -> registered.handler == handler);
handlersByType.entries().removeIf(registered -> registered.getValue().handler == handler);
}
}
@Override
public void unsubscribe(MessageListener listener) {
if(listener == null) return;
synchronized(handlers) {
handlers.removeIf(registered -> registered.listener == listener);
handlersByType.entries().removeIf(registered -> registered.getValue().listener == listener);
}
}
protected void dispatchMessage(Message message, TypeToken<? extends Message> type) {
final Collection<RegisteredHandler<?>> matchingHandlers;
synchronized (handlers) {
matchingHandlers = handlersByType.allAssignableFrom(type);
}
if (matchingHandlers.isEmpty()) return;
for (final RegisteredHandler handler : matchingHandlers) {
if (suspended && handler.listener != null &&
!handler.listener.listenWhileSuspended()) continue;
logger.fine("Dispatching " + type.getType() + " to " + handler.handler.getClass());
if (handler.executor == null) {
exceptionHandler.run(() -> handler.handler.handleDelivery(message, type));
} else {
handler.executor.execute(() -> {
synchronized (handlers) {
// Double check from the handler's executor that it is still registered.
// This makes it much less likely to dispatch a message to a handler
// after it unsubs. It should work perfectly if the handler unsubs on
// the same thread it handles messages on.
if (!handlers.contains(handler)) return;
}
exceptionHandler.run(() -> handler.handler.handleDelivery(message, type));
});
}
}
}
}

View File

@ -0,0 +1,21 @@
package tc.oc.api.message;
import com.google.common.reflect.TypeToken;
import javax.inject.Singleton;
@Singleton
public class LocalMessageService extends AbstractMessageService {
@Override
public void bind(Class<? extends Message> type) {}
public void receive(Message message, TypeToken<? extends Message> type) {
dispatchMessage(message, type);
}
public void receive(Message message) {
dispatchMessage(message, TypeToken.of(message.getClass()));
}
}

View File

@ -1,9 +1,7 @@
package tc.oc.api.message;
import com.google.common.reflect.TypeToken;
import tc.oc.api.queue.Delivery;
import tc.oc.api.queue.Metadata;
public interface MessageHandler<T extends Message> {
void handleDelivery(T message, TypeToken<? extends T> type, Metadata properties, Delivery delivery);
void handleDelivery(T message, TypeToken<? extends T> type);
}

View File

@ -5,7 +5,7 @@ import javax.annotation.Nullable;
import com.google.common.reflect.TypeToken;
public interface MessageQueue {
public interface MessageService {
/**
* Tell the queue to receive messages of the given type
@ -35,4 +35,5 @@ public interface MessageQueue {
void unsubscribe(MessageHandler<?> handler);
void unsubscribe(MessageListener listener);
}

View File

@ -23,8 +23,10 @@ public class MessagesManifest extends HybridManifest {
public void configure() {
bindAndExpose(MessageRegistry.class);
publicBinder().forOptional(MessageQueue.class)
.setDefault().to(NullMessageQueue.class);
publicBinder().forOptional(MessageService.class)
.setDefault().to(LocalMessageService.class);
bindAndExpose(LocalMessageService.class);
final MessageBinder messages = new MessageBinder(publicBinder());

View File

@ -1,24 +0,0 @@
package tc.oc.api.message;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import com.google.common.reflect.TypeToken;
public class NullMessageQueue implements MessageQueue {
@Override
public void bind(Class<? extends Message> type) {}
@Override
public <T extends Message> void subscribe(TypeToken<T> messageType, MessageHandler<T> handler, @Nullable Executor executor) {}
@Override
public void subscribe(MessageListener listener, @Nullable Executor executor) {}
@Override
public void unsubscribe(MessageHandler<?> handler) {}
@Override
public void unsubscribe(MessageListener listener) {}
}

View File

@ -27,7 +27,7 @@ import tc.oc.api.docs.virtual.DeletableModel;
import tc.oc.api.docs.virtual.Model;
import tc.oc.api.document.DocumentGenerator;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.FindMultiResponse;
import tc.oc.api.message.types.FindRequest;
import tc.oc.api.message.types.ModelDelete;
@ -42,7 +42,8 @@ public abstract class ModelStore<T extends Model> implements MessageListener, Co
protected Logger logger;
protected @Inject QueryService<T> queryService;
protected @Inject MessageQueue primaryQueue;
protected @Inject
MessageService primaryQueue;
protected @Inject @ModelSync ExecutorService modelSync;
protected @Inject ModelDispatcher dispatcher;

View File

@ -1,19 +1,12 @@
package tc.oc.api.queue;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;
@ -28,15 +21,11 @@ import tc.oc.api.connectable.Connectable;
import tc.oc.api.message.Message;
import tc.oc.api.message.MessageHandler;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageRegistry;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.NoSuchMessageException;
import tc.oc.api.message.AbstractMessageService;
import tc.oc.api.serialization.Pretty;
import tc.oc.commons.core.exception.ExceptionHandler;
import tc.oc.commons.core.logging.Loggers;
import tc.oc.commons.core.reflect.Methods;
import tc.oc.commons.core.reflect.Types;
import tc.oc.commons.core.util.CachingTypeMap;
import tc.oc.commons.core.util.Threadable;
import tc.oc.minecraft.suspend.Suspendable;
/**
@ -78,37 +67,20 @@ import tc.oc.minecraft.suspend.Suspendable;
*
* MY_QUEUE.subscribe(new MyWorker(), syncExecutor);
*/
public class Queue implements MessageQueue, Connectable, Suspendable {
public class Queue extends AbstractMessageService implements MessageService, Connectable, Suspendable {
private static class RegisteredHandler<T extends Message> {
final @Nullable MessageListener listener;
final MessageHandler<T> handler;
final @Nullable Executor executor;
private RegisteredHandler(@Nullable MessageListener listener, MessageHandler<T> handler, @Nullable Executor executor) {
this.listener = listener;
this.handler = handler;
this.executor = executor;
}
}
protected Logger logger;
@Inject protected MessageRegistry messageRegistry;
@Inject protected Gson gson;
@Inject @Pretty protected Gson prettyGson;
@Inject protected QueueClient client;
@Inject protected Exchange.Topic topic;
@Inject protected ExceptionHandler exceptionHandler;
protected static Threadable<Metadata> METADATA = new Threadable<>();
protected final Consume consume;
@Nullable String consumerTag;
private MultiDispatcher dispatcher;
private final Set<RegisteredHandler<?>> handlers = new HashSet<>();
private final CachingTypeMap<Message, RegisteredHandler<?>> handlersByType = CachingTypeMap.create();
private volatile boolean suspended;
public Consume consume() { return consume; }
public String name() { return consume.name(); }
public QueueClient client() { return client; }
@ -117,10 +89,6 @@ public class Queue implements MessageQueue, Connectable, Suspendable {
this.consume = consume;
}
@Inject void init(Loggers loggers) {
logger = loggers.get(getClass());
}
@Override
public void connect() throws IOException {
logger.fine("Declaring queue");
@ -169,112 +137,6 @@ public class Queue implements MessageQueue, Connectable, Suspendable {
bind(topic, messageRegistry.typeName(type));
}
@Override
public <T extends Message> void subscribe(TypeToken<T> messageType, MessageHandler<T> handler, @Nullable Executor executor) {
subscribe(messageType, null, handler, executor);
}
private <T extends Message> void subscribe(TypeToken<T> messageType, @Nullable MessageListener listener, MessageHandler<T> handler, @Nullable Executor executor) {
logger.fine("Subscribing handler " + handler);
synchronized(handlers) {
final RegisteredHandler<T> registered = new RegisteredHandler<>(listener, handler, executor);
handlers.add(registered);
handlersByType.put(messageType, registered);
handlersByType.invalidate();
}
}
private TypeToken<? extends Message> getMessageType(TypeToken decl, Method method) {
if(method.getParameterTypes().length < 1 || method.getParameterTypes().length > 3) {
throw new IllegalStateException("Message handler method must take 1 to 3 parameters");
}
final TypeToken<Message> base = new TypeToken<Message>(){};
for(Type param : method.getGenericParameterTypes()) {
final TypeToken paramToken = decl.resolveType(param);
Types.assertFullySpecified(paramToken);
if(base.isAssignableFrom(paramToken)) {
messageRegistry.typeName(paramToken.getRawType()); // Verify message type is registered
return paramToken;
}
}
throw new IllegalStateException("Message handler has no message parameter");
}
@Override
public void subscribe(final MessageListener listener, @Nullable Executor executor) {
logger.fine("Subscribing listener " + listener);
final TypeToken<? extends MessageListener> listenerType = TypeToken.of(listener.getClass());
Methods.declaredMethodsInAncestors(listener.getClass()).forEach(method -> {
final MessageListener.HandleMessage annot = method.getAnnotation(MessageListener.HandleMessage.class);
if(annot != null) {
method.setAccessible(true);
final TypeToken<? extends Message> messageType = getMessageType(listenerType, method);
logger.fine(" dispatching " + messageType.getRawType().getSimpleName() + " to method " + method.getName());
MessageHandler handler = new MessageHandler() {
@Override
public void handleDelivery(Message message, TypeToken type, Metadata properties, Delivery delivery) {
try {
if(annot.protocolVersion() != -1 && annot.protocolVersion() != properties.protocolVersion()) {
return;
}
final Class<?>[] paramTypes = method.getParameterTypes();
Object[] params = new Object[paramTypes.length];
for(int i = 0; i < paramTypes.length; i++) {
if(paramTypes[i].isAssignableFrom(message.getClass())) {
params[i] = message;
} else if(paramTypes[i].isAssignableFrom(Metadata.class)) {
params[i] = properties;
} else if(paramTypes[i].isAssignableFrom(Delivery.class)) {
params[i] = delivery;
}
}
method.invoke(listener, params);
} catch(IllegalAccessException e) {
throw new IllegalStateException(e);
} catch(InvocationTargetException e) {
if(e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new IllegalStateException(e);
}
}
}
@Override
public String toString() {
return listener + "." + method.getName();
}
};
subscribe(messageType, listener, handler, executor);
}
});
}
@Override
public void unsubscribe(MessageHandler<?> handler) {
synchronized(handlers) {
handlers.removeIf(registered -> registered.handler == handler);
handlersByType.entries().removeIf(registered -> registered.getValue().handler == handler);
}
}
@Override
public void unsubscribe(MessageListener listener) {
if(listener == null) return;
synchronized(handlers) {
handlers.removeIf(registered -> registered.listener == listener);
handlersByType.entries().removeIf(registered -> registered.getValue().listener == listener);
}
}
private class MultiDispatcher extends DefaultConsumer {
private SettableFuture cancelled;
@ -329,27 +191,8 @@ public class Queue implements MessageQueue, Connectable, Suspendable {
if(logger.isLoggable(Level.FINE)) {
logger.fine("Received message " + properties.getType() + "\nMetadata: " + properties + "\n" + prettyGson.toJson(json));
}
for(final RegisteredHandler handler : matchingHandlers) {
if(suspended && handler.listener != null &&
!handler.listener.listenWhileSuspended()) continue;
logger.fine("Dispatching " + amqProperties.getType() + " to " + handler.handler.getClass());
if(handler.executor == null) {
exceptionHandler.run(() -> handler.handler.handleDelivery(message, type, properties, delivery));
} else {
handler.executor.execute(() -> {
synchronized(handlers) {
// Double check from the handler's executor that it is still registered.
// This makes it much less likely to dispatch a message to a handler
// after it unsubs. It should work perfectly if the handler unsubs on
// the same thread it handles messages on.
if(!handlers.contains(handler)) return;
}
exceptionHandler.run(() -> handler.handler.handleDelivery(message, type, properties, delivery));
});
}
}
METADATA.let(properties, () -> dispatchMessage(message, type));
} catch(Throwable t) {
logger.log(Level.SEVERE, "Exception dispatching AMQP message", t);
// Don't let any exceptions through to the AMQP driver or it will close the channel

View File

@ -1,6 +1,6 @@
package tc.oc.api.queue;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.commons.core.inject.HybridManifest;
import tc.oc.minecraft.suspend.SuspendableBinder;
@ -17,7 +17,7 @@ public class QueueManifest extends HybridManifest {
bindAndExpose(Exchange.Topic.class).asEagerSingleton();
bindAndExpose(PrimaryQueue.class).asEagerSingleton();
publicBinder().forOptional(MessageQueue.class)
publicBinder().forOptional(MessageService.class)
.setBinding().to(PrimaryQueue.class);
final SuspendableBinder suspendables = new SuspendableBinder(publicBinder());

View File

@ -123,7 +123,8 @@ public class Transaction<T extends Message> extends TimeoutFuture<T> {
this.requestType = request.getClass();
this.messageHandler = new MessageHandler<Message>() {
@Override public void handleDelivery(Message message, TypeToken<? extends Message> type, Metadata replyProps, Delivery delivery) {
@Override public void handleDelivery(Message message, TypeToken type) {
Metadata replyProps = Queue.METADATA.need();
if(requestId.equals(replyProps.getCorrelationId())) {
Transaction.this.replyQueue.unsubscribe(messageHandler);

View File

@ -18,7 +18,7 @@ import tc.oc.api.docs.Server;
import tc.oc.api.docs.virtual.ServerDoc;
import tc.oc.api.exceptions.ApiNotConnected;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.ModelUpdate;
import tc.oc.api.minecraft.config.MinecraftApiConfiguration;
import tc.oc.api.minecraft.servers.LocalServerDocument;
@ -40,7 +40,7 @@ public class MinecraftServiceImpl implements MinecraftService, MessageListener,
private final ServerService serverService;
private final MinecraftApiConfiguration apiConfiguration;
private final StartupServerDocument startupDocument;
private final MessageQueue serverQueue;
private final MessageService serverQueue;
private final Server everfreshLocalServer;
private @Nullable Server server;
@ -50,7 +50,7 @@ public class MinecraftServiceImpl implements MinecraftService, MessageListener,
SyncExecutor syncExecutor,
ServerService serverService,
MinecraftApiConfiguration apiConfiguration,
MessageQueue serverQueue,
MessageService serverQueue,
LocalServerDocument localServerDocument,
StartupServerDocument startupDocument) {

View File

@ -12,7 +12,7 @@ import tc.oc.api.docs.Server;
import tc.oc.api.docs.virtual.PunishmentDoc;
import tc.oc.api.docs.virtual.ServerDoc;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.ModelUpdate;
import tc.oc.api.model.UpdateService;
import tc.oc.api.util.Permissions;
@ -35,7 +35,7 @@ public class PunishmentEnforcer implements Enableable, MessageListener {
private final PunishmentFormatter punishmentFormatter;
private final UpdateService<PunishmentDoc.Partial> punishmentService;
private final MessageQueue queue;
private final MessageService queue;
private final Flexecutor executor;
private final Server localServer;
private final OnlinePlayers players;
@ -46,7 +46,7 @@ public class PunishmentEnforcer implements Enableable, MessageListener {
@Inject PunishmentEnforcer(PunishmentFormatter punishmentFormatter,
UpdateService<PunishmentDoc.Partial> punishmentService,
MessageQueue queue,
MessageService queue,
@Sync Flexecutor executor,
Server localServer,
OnlinePlayers players,

View File

@ -8,7 +8,7 @@ import net.md_5.bungee.api.chat.BaseComponent;
import tc.oc.api.docs.Report;
import tc.oc.api.docs.Server;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.ModelUpdate;
import tc.oc.commons.bukkit.channels.AdminChannel;
import tc.oc.commons.bukkit.chat.Audiences;
@ -20,13 +20,13 @@ public class ReportAnnouncer implements PluginFacet, MessageListener {
private final ReportConfiguration config;
private final ReportFormatter reportFormatter;
private final MessageQueue primaryQueue;
private final MessageService primaryQueue;
private final MainThreadExecutor executor;
private final Server localServer;
private final AdminChannel adminChannel;
private final Audiences audiences;
@Inject ReportAnnouncer(ReportConfiguration config, ReportFormatter reportFormatter, MessageQueue primaryQueue, MainThreadExecutor executor, Server localServer, AdminChannel adminChannel, Audiences audiences) {
@Inject ReportAnnouncer(ReportConfiguration config, ReportFormatter reportFormatter, MessageService primaryQueue, MainThreadExecutor executor, Server localServer, AdminChannel adminChannel, Audiences audiences) {
this.config = config;
this.reportFormatter = reportFormatter;
this.primaryQueue = primaryQueue;

View File

@ -16,7 +16,7 @@ import org.bukkit.event.Listener;
import org.bukkit.event.player.PlayerJoinEvent;
import tc.oc.api.bukkit.users.OnlinePlayers;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.PlayerTeleportRequest;
import tc.oc.commons.bukkit.permissions.PermissionRegistry;
import tc.oc.commons.core.logging.Loggers;
@ -51,7 +51,7 @@ public class TeleportListener implements MessageListener, Listener, PluginFacet,
}
private final Logger logger;
private final MessageQueue primaryQueue;
private final MessageService primaryQueue;
private final Teleporter teleporter;
private final SyncExecutor syncExecutor;
private final Scheduler scheduler;
@ -60,7 +60,7 @@ public class TeleportListener implements MessageListener, Listener, PluginFacet,
private final Map<UUID, Received> requests = new HashMap<>();
@Inject TeleportListener(Loggers loggers, MessageQueue primaryQueue, Teleporter teleporter, SyncExecutor syncExecutor, Scheduler scheduler, PermissionRegistry permissionRegistry, OnlinePlayers onlinePlayers) {
@Inject TeleportListener(Loggers loggers, MessageService primaryQueue, Teleporter teleporter, SyncExecutor syncExecutor, Scheduler scheduler, PermissionRegistry permissionRegistry, OnlinePlayers onlinePlayers) {
this.logger = loggers.get(getClass());
this.primaryQueue = primaryQueue;
this.onlinePlayers = onlinePlayers;

View File

@ -29,7 +29,7 @@ import tc.oc.api.docs.User;
import tc.oc.api.docs.UserId;
import tc.oc.api.docs.virtual.ServerDoc;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.minecraft.MinecraftService;
import tc.oc.api.servers.ServerStore;
import tc.oc.api.sessions.SessionChange;
@ -72,7 +72,7 @@ import static tc.oc.commons.core.util.Utils.notEqual;
*/
public class JoinMessageAnnouncer implements MessageListener, Listener, PluginFacet {
private final MessageQueue queue;
private final MessageService queue;
private final OnlineFriends onlineFriends;
private final IdentityProvider identityProvider;
private final SettingManagerProvider playerSettings;
@ -91,7 +91,7 @@ public class JoinMessageAnnouncer implements MessageListener, Listener, PluginFa
private final Cache<UserId, SessionChange> pendingJoins = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
private final Cache<UserId, SessionChange> pendingQuits = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
@Inject JoinMessageAnnouncer(MessageQueue queue,
@Inject JoinMessageAnnouncer(MessageService queue,
OnlineFriends onlineFriends,
IdentityProvider identityProvider,
SettingManagerProvider playerSettings,

View File

@ -7,6 +7,7 @@ import javax.inject.Inject;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.bukkit.Bukkit;
import org.bukkit.command.CommandSender;
import org.bukkit.entity.Player;
import org.bukkit.event.EventHandler;
@ -19,7 +20,7 @@ import tc.oc.api.docs.Server;
import tc.oc.api.docs.Whisper;
import tc.oc.api.docs.virtual.WhisperDoc;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.ModelUpdate;
import tc.oc.api.model.IdFactory;
import tc.oc.api.users.UserService;
@ -39,7 +40,7 @@ public class WhisperDispatcher implements WhisperSender, Listener, MessageListen
// at the same time they received it.
private static final Duration MARK_READ_DELAY = Duration.ofSeconds(3);
private final MessageQueue queue;
private final MessageService queue;
private final OnlinePlayers onlinePlayers;
private final MainThreadExecutor executor;
private final Scheduler scheduler;
@ -50,7 +51,7 @@ public class WhisperDispatcher implements WhisperSender, Listener, MessageListen
private final SettingManagerProvider playerSettings;
private final UserService userService;
@Inject WhisperDispatcher(MessageQueue queue,
@Inject WhisperDispatcher(MessageService queue,
OnlinePlayers onlinePlayers,
MainThreadExecutor executor,
Scheduler scheduler,

View File

@ -10,7 +10,7 @@ import net.md_5.bungee.api.connection.ProxiedPlayer;
import tc.oc.api.docs.Server;
import tc.oc.api.docs.virtual.ServerDoc;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.PlayerTeleportRequest;
import tc.oc.api.model.ModelSync;
import tc.oc.commons.bungee.servers.ServerTracker;
@ -25,10 +25,10 @@ public class TeleportListener implements MessageListener, PluginFacet {
private final ProxyServer proxy;
private final Logger logger;
private final ServerTracker serverTracker;
private final MessageQueue primaryQueue;
private final MessageService primaryQueue;
private final ExecutorService executor;
@Inject TeleportListener(Loggers loggers, ProxyServer proxy, ServerTracker serverTracker, MessageQueue primaryQueue, @ModelSync ExecutorService executor) {
@Inject TeleportListener(Loggers loggers, ProxyServer proxy, ServerTracker serverTracker, MessageService primaryQueue, @ModelSync ExecutorService executor) {
this.proxy = proxy;
this.executor = executor;
this.logger = loggers.get(getClass());

View File

@ -13,13 +13,11 @@ import tc.oc.minecraft.api.configuration.Configuration;
/**
* TODO: Should this be in the test source root? That seems to make it unavailable to downstream modules.
*/
public class TestModule extends AbstractModule {
public class TestModule extends HybridManifest {
@Override
protected void configure() {
bind(Loggers.class).to(SimpleLoggerFactory.class);
bindAndExpose(Loggers.class).to(SimpleLoggerFactory.class);
bind(Configuration.class).to(YamlConfiguration.class);
bind(ExceptionHandler.class).to(LoggingExceptionHandler.class).in(Singleton.class);
bindAndExpose(ExceptionHandler.class).to(LoggingExceptionHandler.class).in(Singleton.class);
}
}