diff --git a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/DebuggerClient.java b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/DebuggerClient.java index 41250274a0..bdef3d2de6 100644 --- a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/DebuggerClient.java +++ b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/DebuggerClient.java @@ -107,7 +107,7 @@ public class DebuggerClient implements Debugger { if(loggingListener != null) { loggingListener.logQueries((QueryLoggingData)args[0]); } - return null; + return 1; } } diff --git a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java index 7569c3b892..cfd5648f45 100644 --- a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java +++ b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java @@ -50,6 +50,8 @@ public abstract class Message implements Serializable { private int id = nextID++; private boolean isSyncExec; + private long threadID; + private boolean isOriginatingSide; /** * Create an empty message. @@ -60,6 +62,9 @@ public abstract class Message implements Serializable { private transient CommunicationInterface communicationInterface; void setCommunicationInterface(CommunicationInterface communicationInterface) { + if(communicationInterface == null) { + System.err.println("in"); + } this.communicationInterface = communicationInterface; } @@ -75,6 +80,22 @@ public abstract class Message implements Serializable { this.isSyncExec = isSyncExec; } + void setThreadID(long threadID) { + this.threadID = threadID; + } + + long getThreadID() { + return threadID; + } + + public void setOriginatingSide(boolean isOriginatingSide) { + this.isOriginatingSide = isOriginatingSide; + } + + public boolean isOriginatingSide() { + return isOriginatingSide; + } + boolean isSyncExec() { return isSyncExec; } diff --git a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java index 5b7a5e88e2..fe20e7ec5a 100644 --- a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java +++ b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java @@ -42,8 +42,10 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; /** @@ -104,6 +106,61 @@ public class MessagingInterface { } } + private class MessageProcessingThread extends Thread { + + private long originatorThreadID; + private List messageList = new LinkedList(); + + public MessageProcessingThread(long originatorThreadID) { + super("Communication Interface Message Dispatcher [" + originatorThreadID + "]"); + this.originatorThreadID = originatorThreadID; + } + + public void addMessage(Message message) { + synchronized (messageList) { + messageList.add(message); + } + } + + public long getOriginatorThreadID() { + return originatorThreadID; + } + + @Override + public void run() { + while(true) { + Message message; + synchronized(messageList) { + if(messageList.isEmpty()) { + message = null; + } else { + message = messageList.remove(0); + } + } + // When there are no more messages to process, we try to de-register the current thread. + // We need to make sure that no message is posted while we do this. + if(message == null) { + synchronized (originatorThreadIDToThreadMap) { + synchronized(messageList) { + if(messageList.isEmpty()) { + originatorThreadIDToThreadMap.remove(originatorThreadID); + } else { + message = messageList.remove(0); + } + } + } + } + if(message == null) { + break; + } + runMessage(message); + } + } + + } + + private Map originatorThreadIDToThreadMap = new HashMap(); + private CommunicationInterface communicationInterface; public MessagingInterface(final CommunicationInterface communicationInterface, final Socket socket) { @@ -148,23 +205,50 @@ public class MessagingInterface { receivedMessageList.clear(); RECEIVER_LOCK.notify(); } - for(int instanceID: syncThreadRegistry.getInstanceIDs()) { - Thread thread = (Thread)syncThreadRegistry.get(instanceID); - if(thread != null) { - synchronized(thread) { - thread.notify(); + synchronized (idToThreadInfo) { + for(ThreadInfo threadInfo: idToThreadInfo.values()) { + synchronized (threadInfo) { + threadInfo.notify(); } } } } if(message != null) { - final Message message_ = message; - new Thread("Communication Interface Async") { - @Override - public void run() { - runMessage(message_); + long threadID = message.getThreadID(); + boolean isOriginatingSide = message.isOriginatingSide(); + if(isOriginatingSide) { + ThreadInfo threadInfo; + synchronized (idToThreadInfo) { + threadInfo = idToThreadInfo.get(threadID); } - }.start(); + if(threadInfo != null) { + // TODO: process message by original thread. + if(message instanceof CommandResultMessage) { + throw new IllegalStateException("I need to indicate if command result message."); + } + synchronized (threadInfo) { + threadInfo.setMessage(message); + threadInfo.notify(); + } + } else { + System.err.println("What to do?"); + } + } else { + MessageProcessingThread messageProcessingThread; + boolean isNew = false; + synchronized (originatorThreadIDToThreadMap) { + messageProcessingThread = originatorThreadIDToThreadMap.get(threadID); + if(messageProcessingThread == null) { + messageProcessingThread = new MessageProcessingThread(threadID); + originatorThreadIDToThreadMap.put(threadID, messageProcessingThread); + isNew = true; + } + messageProcessingThread.addMessage(message); + } + if(isNew) { + messageProcessingThread.start(); + } + } } } try { @@ -229,15 +313,21 @@ public class MessagingInterface { @Override public Object run(Object[] args) { MessagingInterface messagingInterface = getCommunicationInterface().getMessagingInterface(); - int instanceID = (Integer)args[0]; - Thread thread = (Thread)messagingInterface.syncThreadRegistry.get(instanceID); - messagingInterface.syncThreadRegistry.remove(instanceID); - if(thread == null) { + long threadID = (Long)args[0]; + CommandResultMessage commandResultMessage = (CommandResultMessage)args[1]; + if(!commandResultMessage.isOriginatingSide()) { + return messagingInterface.processCommandResult(commandResultMessage); + } + ThreadInfo threadInfo; + synchronized (messagingInterface.idToThreadInfo) { + threadInfo = messagingInterface.idToThreadInfo.get(threadID); + } + if(threadInfo == null) { return null; } - synchronized(thread) { - messagingInterface.syncThreadRegistry.add(args[1], instanceID); - thread.notify(); + synchronized(threadInfo) { + threadInfo.setMessage(commandResultMessage); + threadInfo.notify(); } return null; } @@ -250,6 +340,7 @@ public class MessagingInterface { Message message = (Message)args[1]; message.setSyncExec(false); CommunicationInterface communicationInterface = getCommunicationInterface(); + message.setCommunicationInterface(communicationInterface); MessagingInterface messagingInterface = communicationInterface.getMessagingInterface(); CM_asyncExecResponse asyncExecResponse = new CM_asyncExecResponse(); asyncExecResponse.setArgs(args[0], messagingInterface.runMessage(message)); @@ -258,7 +349,26 @@ public class MessagingInterface { } } - private ObjectRegistry syncThreadRegistry = new ObjectRegistry(); + private static class ThreadInfo { + + private boolean isValuePresent; + private Message message; + + public Message getMessage() { + return message; + } + + public void setMessage(Message message) { + isValuePresent = true; + this.message = message; + } + + public boolean isValuePresent() { + return isValuePresent; + } + } + + private Map idToThreadInfo = new HashMap(); private void printFailedInvocation(Message message) { System.err.println("Failed messaging: " + message); @@ -266,26 +376,46 @@ public class MessagingInterface { public Object syncSend(Message message) { Thread thread = Thread.currentThread(); - final int instanceID = syncThreadRegistry.add(Thread.currentThread()); + long threadID = thread.getId(); + ThreadInfo threadInfo = new ThreadInfo(); + boolean isAdded; + synchronized (idToThreadInfo) { + isAdded = idToThreadInfo.put(threadID, threadInfo) != null; + } CM_asyncExec asyncExec = new CM_asyncExec(); - asyncExec.setArgs(instanceID, message); + asyncExec.setArgs(threadID, message); asyncSend(asyncExec); - synchronized(thread) { - while(syncThreadRegistry.get(instanceID) instanceof Thread) { - try { - thread.wait(); - } catch(Exception e) { + CommandResultMessage commandResultMessage = null; + synchronized(threadInfo) { + while(commandResultMessage == null) { + while(!threadInfo.isValuePresent()) { + try { + threadInfo.wait(); + } catch(Exception e) { + } + if(!isAlive()) { + idToThreadInfo.remove(threadID); + printFailedInvocation(message); + return null; + } } - if(!isAlive()) { - syncThreadRegistry.remove(instanceID); - printFailedInvocation(message); - return null; + Message value = threadInfo.getMessage(); + if(value instanceof CommandResultMessage) { + commandResultMessage = (CommandResultMessage)value; + } else { + // TODO: is that correct? + commandResultMessage = runMessage(value); +// message.setCommunicationInterface(this.communicationInterface); +// runMessage(message); } } - CommandResultMessage commandResultMessage = (CommandResultMessage)syncThreadRegistry.get(instanceID); - syncThreadRegistry.remove(instanceID); - return processCommandResult(commandResultMessage); } + if(isAdded) { + synchronized (idToThreadInfo) { + idToThreadInfo.remove(threadID); + } + } + return processCommandResult(commandResultMessage); } private Object processCommandResult(CommandResultMessage commandResultMessage) { @@ -294,9 +424,9 @@ public class MessagingInterface { } Throwable exception = commandResultMessage.getException(); if(exception != null) { - // if(exception instanceof RuntimeException) { - // throw (RuntimeException)exception; - // } +// if(exception instanceof RuntimeException) { +// throw (RuntimeException)exception; +// } throw new RuntimeException(exception); } return commandResultMessage.getResult(); @@ -304,6 +434,15 @@ public class MessagingInterface { public void asyncSend(Message message) { message.setSyncExec(false); + Thread thread = Thread.currentThread(); + // If the message was sent by the other side, all returning messages need to know which originating thread they are bound to. + if(thread instanceof MessageProcessingThread) { + message.setThreadID(((MessageProcessingThread) thread).getOriginatorThreadID()); + message.setOriginatingSide(true); + } else { + message.setThreadID(thread.getId()); + message.setOriginatingSide(false); + } try { writeMessage(message); } catch(Exception e) {