[#1177] Add SQL Console module to jOOQ - improved messaging interface.

This commit is contained in:
Chrriis 2012-05-02 23:35:04 +02:00
parent e3c4e5bc7d
commit e595e87763
3 changed files with 197 additions and 37 deletions

View File

@ -107,7 +107,7 @@ public class DebuggerClient implements Debugger {
if(loggingListener != null) {
loggingListener.logQueries((QueryLoggingData)args[0]);
}
return null;
return 1;
}
}

View File

@ -50,6 +50,8 @@ public abstract class Message implements Serializable {
private int id = nextID++;
private boolean isSyncExec;
private long threadID;
private boolean isOriginatingSide;
/**
* Create an empty message.
@ -60,6 +62,9 @@ public abstract class Message implements Serializable {
private transient CommunicationInterface communicationInterface;
void setCommunicationInterface(CommunicationInterface communicationInterface) {
if(communicationInterface == null) {
System.err.println("in");
}
this.communicationInterface = communicationInterface;
}
@ -75,6 +80,22 @@ public abstract class Message implements Serializable {
this.isSyncExec = isSyncExec;
}
void setThreadID(long threadID) {
this.threadID = threadID;
}
long getThreadID() {
return threadID;
}
public void setOriginatingSide(boolean isOriginatingSide) {
this.isOriginatingSide = isOriginatingSide;
}
public boolean isOriginatingSide() {
return isOriginatingSide;
}
boolean isSyncExec() {
return isSyncExec;
}

View File

@ -42,8 +42,10 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
@ -104,6 +106,61 @@ public class MessagingInterface {
}
}
private class MessageProcessingThread extends Thread {
private long originatorThreadID;
private List<Message> messageList = new LinkedList<Message>();
public MessageProcessingThread(long originatorThreadID) {
super("Communication Interface Message Dispatcher [" + originatorThreadID + "]");
this.originatorThreadID = originatorThreadID;
}
public void addMessage(Message message) {
synchronized (messageList) {
messageList.add(message);
}
}
public long getOriginatorThreadID() {
return originatorThreadID;
}
@Override
public void run() {
while(true) {
Message message;
synchronized(messageList) {
if(messageList.isEmpty()) {
message = null;
} else {
message = messageList.remove(0);
}
}
// When there are no more messages to process, we try to de-register the current thread.
// We need to make sure that no message is posted while we do this.
if(message == null) {
synchronized (originatorThreadIDToThreadMap) {
synchronized(messageList) {
if(messageList.isEmpty()) {
originatorThreadIDToThreadMap.remove(originatorThreadID);
} else {
message = messageList.remove(0);
}
}
}
}
if(message == null) {
break;
}
runMessage(message);
}
}
}
private Map<Long, MessageProcessingThread> originatorThreadIDToThreadMap = new HashMap<Long, MessagingInterface.MessageProcessingThread>();
private CommunicationInterface communicationInterface;
public MessagingInterface(final CommunicationInterface communicationInterface, final Socket socket) {
@ -148,23 +205,50 @@ public class MessagingInterface {
receivedMessageList.clear();
RECEIVER_LOCK.notify();
}
for(int instanceID: syncThreadRegistry.getInstanceIDs()) {
Thread thread = (Thread)syncThreadRegistry.get(instanceID);
if(thread != null) {
synchronized(thread) {
thread.notify();
synchronized (idToThreadInfo) {
for(ThreadInfo threadInfo: idToThreadInfo.values()) {
synchronized (threadInfo) {
threadInfo.notify();
}
}
}
}
if(message != null) {
final Message message_ = message;
new Thread("Communication Interface Async") {
@Override
public void run() {
runMessage(message_);
long threadID = message.getThreadID();
boolean isOriginatingSide = message.isOriginatingSide();
if(isOriginatingSide) {
ThreadInfo threadInfo;
synchronized (idToThreadInfo) {
threadInfo = idToThreadInfo.get(threadID);
}
}.start();
if(threadInfo != null) {
// TODO: process message by original thread.
if(message instanceof CommandResultMessage) {
throw new IllegalStateException("I need to indicate if command result message.");
}
synchronized (threadInfo) {
threadInfo.setMessage(message);
threadInfo.notify();
}
} else {
System.err.println("What to do?");
}
} else {
MessageProcessingThread messageProcessingThread;
boolean isNew = false;
synchronized (originatorThreadIDToThreadMap) {
messageProcessingThread = originatorThreadIDToThreadMap.get(threadID);
if(messageProcessingThread == null) {
messageProcessingThread = new MessageProcessingThread(threadID);
originatorThreadIDToThreadMap.put(threadID, messageProcessingThread);
isNew = true;
}
messageProcessingThread.addMessage(message);
}
if(isNew) {
messageProcessingThread.start();
}
}
}
}
try {
@ -229,15 +313,21 @@ public class MessagingInterface {
@Override
public Object run(Object[] args) {
MessagingInterface messagingInterface = getCommunicationInterface().getMessagingInterface();
int instanceID = (Integer)args[0];
Thread thread = (Thread)messagingInterface.syncThreadRegistry.get(instanceID);
messagingInterface.syncThreadRegistry.remove(instanceID);
if(thread == null) {
long threadID = (Long)args[0];
CommandResultMessage commandResultMessage = (CommandResultMessage)args[1];
if(!commandResultMessage.isOriginatingSide()) {
return messagingInterface.processCommandResult(commandResultMessage);
}
ThreadInfo threadInfo;
synchronized (messagingInterface.idToThreadInfo) {
threadInfo = messagingInterface.idToThreadInfo.get(threadID);
}
if(threadInfo == null) {
return null;
}
synchronized(thread) {
messagingInterface.syncThreadRegistry.add(args[1], instanceID);
thread.notify();
synchronized(threadInfo) {
threadInfo.setMessage(commandResultMessage);
threadInfo.notify();
}
return null;
}
@ -250,6 +340,7 @@ public class MessagingInterface {
Message message = (Message)args[1];
message.setSyncExec(false);
CommunicationInterface communicationInterface = getCommunicationInterface();
message.setCommunicationInterface(communicationInterface);
MessagingInterface messagingInterface = communicationInterface.getMessagingInterface();
CM_asyncExecResponse asyncExecResponse = new CM_asyncExecResponse();
asyncExecResponse.setArgs(args[0], messagingInterface.runMessage(message));
@ -258,7 +349,26 @@ public class MessagingInterface {
}
}
private ObjectRegistry syncThreadRegistry = new ObjectRegistry();
private static class ThreadInfo {
private boolean isValuePresent;
private Message message;
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
isValuePresent = true;
this.message = message;
}
public boolean isValuePresent() {
return isValuePresent;
}
}
private Map<Long, ThreadInfo> idToThreadInfo = new HashMap<Long, ThreadInfo>();
private void printFailedInvocation(Message message) {
System.err.println("Failed messaging: " + message);
@ -266,26 +376,46 @@ public class MessagingInterface {
public Object syncSend(Message message) {
Thread thread = Thread.currentThread();
final int instanceID = syncThreadRegistry.add(Thread.currentThread());
long threadID = thread.getId();
ThreadInfo threadInfo = new ThreadInfo();
boolean isAdded;
synchronized (idToThreadInfo) {
isAdded = idToThreadInfo.put(threadID, threadInfo) != null;
}
CM_asyncExec asyncExec = new CM_asyncExec();
asyncExec.setArgs(instanceID, message);
asyncExec.setArgs(threadID, message);
asyncSend(asyncExec);
synchronized(thread) {
while(syncThreadRegistry.get(instanceID) instanceof Thread) {
try {
thread.wait();
} catch(Exception e) {
CommandResultMessage commandResultMessage = null;
synchronized(threadInfo) {
while(commandResultMessage == null) {
while(!threadInfo.isValuePresent()) {
try {
threadInfo.wait();
} catch(Exception e) {
}
if(!isAlive()) {
idToThreadInfo.remove(threadID);
printFailedInvocation(message);
return null;
}
}
if(!isAlive()) {
syncThreadRegistry.remove(instanceID);
printFailedInvocation(message);
return null;
Message value = threadInfo.getMessage();
if(value instanceof CommandResultMessage) {
commandResultMessage = (CommandResultMessage)value;
} else {
// TODO: is that correct?
commandResultMessage = runMessage(value);
// message.setCommunicationInterface(this.communicationInterface);
// runMessage(message);
}
}
CommandResultMessage commandResultMessage = (CommandResultMessage)syncThreadRegistry.get(instanceID);
syncThreadRegistry.remove(instanceID);
return processCommandResult(commandResultMessage);
}
if(isAdded) {
synchronized (idToThreadInfo) {
idToThreadInfo.remove(threadID);
}
}
return processCommandResult(commandResultMessage);
}
private Object processCommandResult(CommandResultMessage commandResultMessage) {
@ -294,9 +424,9 @@ public class MessagingInterface {
}
Throwable exception = commandResultMessage.getException();
if(exception != null) {
// if(exception instanceof RuntimeException) {
// throw (RuntimeException)exception;
// }
// if(exception instanceof RuntimeException) {
// throw (RuntimeException)exception;
// }
throw new RuntimeException(exception);
}
return commandResultMessage.getResult();
@ -304,6 +434,15 @@ public class MessagingInterface {
public void asyncSend(Message message) {
message.setSyncExec(false);
Thread thread = Thread.currentThread();
// If the message was sent by the other side, all returning messages need to know which originating thread they are bound to.
if(thread instanceof MessageProcessingThread) {
message.setThreadID(((MessageProcessingThread) thread).getOriginatorThreadID());
message.setOriginatingSide(true);
} else {
message.setThreadID(thread.getId());
message.setOriginatingSide(false);
}
try {
writeMessage(message);
} catch(Exception e) {