[#1177] Add SQL Console module to jOOQ - Communication interface

implementation for re-entrant calls.
This commit is contained in:
Chrriis 2012-05-03 22:53:12 +02:00
parent 6d8d46dd58
commit a7b3b5effe
3 changed files with 88 additions and 54 deletions

View File

@ -115,7 +115,7 @@ public class CommunicationInterface {
if(socket == null) {
throw new IllegalStateException("Failed to connect to " + ip + "!");
}
messagingInterface = new MessagingInterface(this, socket);
messagingInterface = new MessagingInterface(this, socket, true);
notifyOpen();
}
@ -133,6 +133,7 @@ public class CommunicationInterface {
checkOpen();
if(message instanceof LocalMessage) {
LocalMessage localMessage = (LocalMessage)message;
localMessage.setCommunicationInterface(this);
return localMessage.runCommand();
}
return messagingInterface.syncSend(message);
@ -145,6 +146,7 @@ public class CommunicationInterface {
checkOpen();
if(message instanceof LocalMessage) {
LocalMessage localMessage = (LocalMessage)message;
localMessage.setCommunicationInterface(this);
localMessage.runCommand();
return;
}
@ -172,7 +174,7 @@ public class CommunicationInterface {
@Override
public void run() {
CommunicationInterface communicationInterface = communicationInterfaceFactory.createCommunicationInterface(port);
communicationInterface.messagingInterface = new MessagingInterface(communicationInterface, socket);
communicationInterface.messagingInterface = new MessagingInterface(communicationInterface, socket, false);
communicationInterface.notifyOpen();
}
}.start();

View File

@ -51,7 +51,7 @@ public abstract class Message implements Serializable {
private int id = nextID++;
private boolean isSyncExec;
private long threadID;
private boolean isOriginatingSide;
private boolean isProcessorToOriginator;
/**
* Create an empty message.
@ -62,9 +62,6 @@ public abstract class Message implements Serializable {
private transient CommunicationInterface communicationInterface;
void setCommunicationInterface(CommunicationInterface communicationInterface) {
if(communicationInterface == null) {
System.err.println("in");
}
this.communicationInterface = communicationInterface;
}
@ -88,12 +85,12 @@ public abstract class Message implements Serializable {
return threadID;
}
public void setOriginatingSide(boolean isOriginatingSide) {
this.isOriginatingSide = isOriginatingSide;
void setProcessorToOriginator(boolean isProcessorToOriginator) {
this.isProcessorToOriginator = isProcessorToOriginator;
}
public boolean isOriginatingSide() {
return isOriginatingSide;
boolean isProcessorToOriginator() {
return isProcessorToOriginator;
}
boolean isSyncExec() {

View File

@ -112,7 +112,7 @@ public class MessagingInterface {
private List<Message> messageList = new LinkedList<Message>();
public MessageProcessingThread(long originatorThreadID) {
super("Communication Interface Message Dispatcher [" + originatorThreadID + "]");
setName("Communication Interface Message Dispatcher-" + getId() + " [" + originatorThreadID + "]");
this.originatorThreadID = originatorThreadID;
}
@ -126,6 +126,16 @@ public class MessagingInterface {
return originatorThreadID;
}
private boolean isWaitingOnSyncCall;
public boolean isWaitingOnSyncCall() {
return isWaitingOnSyncCall;
}
public void setWaitingOnSyncCall(boolean isWaitingOnSyncCall) {
this.isWaitingOnSyncCall = isWaitingOnSyncCall;
}
@Override
public void run() {
while(true) {
@ -162,9 +172,11 @@ public class MessagingInterface {
private Map<Long, MessageProcessingThread> originatorThreadIDToThreadMap = new HashMap<Long, MessagingInterface.MessageProcessingThread>();
private CommunicationInterface communicationInterface;
private boolean isClient;
public MessagingInterface(final CommunicationInterface communicationInterface, final Socket socket) {
public MessagingInterface(final CommunicationInterface communicationInterface, final Socket socket, boolean isClient) {
this.communicationInterface = communicationInterface;
this.isClient = isClient;
try {
oos = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()) {
@Override
@ -183,7 +195,7 @@ public class MessagingInterface {
} catch(IOException e) {
throw new RuntimeException(e);
}
Thread receiverThread = new Thread("MessagingInterface Receiver") {
Thread receiverThread = new Thread("MessagingInterface Receiver (" + (isClient? "client": "server") + ")") {
@Override
public void run() {
while(isAlive) {
@ -215,14 +227,37 @@ public class MessagingInterface {
}
if(message != null) {
long threadID = message.getThreadID();
boolean isOriginatingSide = message.isOriginatingSide();
if(isOriginatingSide) {
// boolean isOriginatingSide = message instanceof CM_asyncExecResponse;
boolean isProcessorToOriginator = message.isProcessorToOriginator();
if(!isProcessorToOriginator) {
MessageProcessingThread messageProcessingThread;
boolean isNew = false;
synchronized (originatorThreadIDToThreadMap) {
messageProcessingThread = originatorThreadIDToThreadMap.get(threadID);
if(messageProcessingThread == null) {
messageProcessingThread = new MessageProcessingThread(threadID);
originatorThreadIDToThreadMap.put(threadID, messageProcessingThread);
isNew = true;
messageProcessingThread.addMessage(message);
} else {
if(messageProcessingThread.isWaitingOnSyncCall()) {
threadID = messageProcessingThread.getId();
isProcessorToOriginator = true;
} else {
messageProcessingThread.addMessage(message);
}
}
}
if(isNew) {
messageProcessingThread.start();
}
}
if(isProcessorToOriginator) {
ThreadInfo threadInfo;
synchronized (idToThreadInfo) {
threadInfo = idToThreadInfo.get(threadID);
}
if(threadInfo != null) {
// TODO: process message by original thread.
if(message instanceof CommandResultMessage) {
throw new IllegalStateException("I need to indicate if command result message.");
}
@ -233,21 +268,6 @@ public class MessagingInterface {
} else {
System.err.println("What to do?");
}
} else {
MessageProcessingThread messageProcessingThread;
boolean isNew = false;
synchronized (originatorThreadIDToThreadMap) {
messageProcessingThread = originatorThreadIDToThreadMap.get(threadID);
if(messageProcessingThread == null) {
messageProcessingThread = new MessageProcessingThread(threadID);
originatorThreadIDToThreadMap.put(threadID, messageProcessingThread);
isNew = true;
}
messageProcessingThread.addMessage(message);
}
if(isNew) {
messageProcessingThread.start();
}
}
}
}
@ -271,7 +291,7 @@ public class MessagingInterface {
private CommandResultMessage runMessage(Message message) {
if(IS_DEBUGGING_MESSAGES) {
System.err.println(">RUN: " + message.getID() + ", " + message);
System.err.println("[" + (isClient? "client": "server") + "] >RUN: " + message.getID() + ", " + message);
}
CommandResultMessage commandResultMessage;
if(message instanceof CommandMessage) {
@ -280,6 +300,7 @@ public class MessagingInterface {
Throwable throwable = null;
if(message.isValid()) {
try {
commandMessage.setCommunicationInterface(communicationInterface);
result = commandMessage.runCommand();
} catch(Throwable t) {
throwable = t;
@ -301,7 +322,7 @@ public class MessagingInterface {
}
}
if(IS_DEBUGGING_MESSAGES) {
System.err.println("<RUN: " + message.getID());
System.err.println("[" + (isClient? "client": "server") + "] <RUN: " + message.getID());
}
return commandResultMessage;
}
@ -315,14 +336,12 @@ public class MessagingInterface {
MessagingInterface messagingInterface = getCommunicationInterface().getMessagingInterface();
long threadID = (Long)args[0];
CommandResultMessage commandResultMessage = (CommandResultMessage)args[1];
if(!commandResultMessage.isOriginatingSide()) {
return messagingInterface.processCommandResult(commandResultMessage);
}
ThreadInfo threadInfo;
synchronized (messagingInterface.idToThreadInfo) {
threadInfo = messagingInterface.idToThreadInfo.get(threadID);
}
if(threadInfo == null) {
System.err.println("A sync call is missing.");
return null;
}
synchronized(threadInfo) {
@ -340,10 +359,11 @@ public class MessagingInterface {
Message message = (Message)args[1];
message.setSyncExec(false);
CommunicationInterface communicationInterface = getCommunicationInterface();
message.setCommunicationInterface(communicationInterface);
// message.setCommunicationInterface(communicationInterface);
MessagingInterface messagingInterface = communicationInterface.getMessagingInterface();
CM_asyncExecResponse asyncExecResponse = new CM_asyncExecResponse();
asyncExecResponse.setArgs(args[0], messagingInterface.runMessage(message));
CommandResultMessage commandResultMessage = messagingInterface.runMessage(message);
asyncExecResponse.setArgs(args[0], commandResultMessage);
messagingInterface.asyncSend(asyncExecResponse);
return null;
}
@ -363,6 +383,11 @@ public class MessagingInterface {
this.message = message;
}
public void clearMessage() {
isValuePresent = false;
message = null;
}
public boolean isValuePresent() {
return isValuePresent;
}
@ -371,16 +396,21 @@ public class MessagingInterface {
private Map<Long, ThreadInfo> idToThreadInfo = new HashMap<Long, ThreadInfo>();
private void printFailedInvocation(Message message) {
System.err.println("Failed messaging: " + message);
System.err.println("[" + (isClient? "client": "server") + "] Failed messaging: " + message);
}
public Object syncSend(Message message) {
Thread thread = Thread.currentThread();
long threadID = thread.getId();
ThreadInfo threadInfo = new ThreadInfo();
boolean isAdded;
ThreadInfo previousThreadInfo;
synchronized (idToThreadInfo) {
isAdded = idToThreadInfo.put(threadID, threadInfo) == null;
previousThreadInfo = idToThreadInfo.put(threadID, threadInfo);
}
if(thread instanceof MessageProcessingThread) {
synchronized (originatorThreadIDToThreadMap) {
((MessageProcessingThread)thread).setWaitingOnSyncCall(true);
}
}
CM_asyncExec asyncExec = new CM_asyncExec();
asyncExec.setArgs(threadID, message);
@ -400,27 +430,32 @@ public class MessagingInterface {
}
}
Message value = threadInfo.getMessage();
threadInfo.clearMessage();
if(value instanceof CommandResultMessage) {
commandResultMessage = (CommandResultMessage)value;
} else {
// TODO: is that correct?
commandResultMessage = runMessage(value);
// message.setCommunicationInterface(this.communicationInterface);
// runMessage(message);
runMessage(value);
}
}
}
if(isAdded) {
synchronized (idToThreadInfo) {
synchronized (idToThreadInfo) {
if(previousThreadInfo != null) {
idToThreadInfo.put(threadID, previousThreadInfo);
} else {
idToThreadInfo.remove(threadID);
}
}
if(previousThreadInfo == null &&thread instanceof MessageProcessingThread) {
synchronized (originatorThreadIDToThreadMap) {
((MessageProcessingThread)thread).setWaitingOnSyncCall(false);
}
}
return processCommandResult(commandResultMessage);
}
private Object processCommandResult(CommandResultMessage commandResultMessage) {
if(IS_DEBUGGING_MESSAGES) {
System.err.println("<USE: " + commandResultMessage.getID());
System.err.println("[" + (isClient? "client": "server") + "] <USE: " + commandResultMessage.getID());
}
Throwable exception = commandResultMessage.getException();
if(exception != null) {
@ -438,10 +473,10 @@ public class MessagingInterface {
// If the message was sent by the other side, all returning messages need to know which originating thread they are bound to.
if(thread instanceof MessageProcessingThread) {
message.setThreadID(((MessageProcessingThread) thread).getOriginatorThreadID());
message.setOriginatingSide(true);
message.setProcessorToOriginator(true);
} else {
message.setThreadID(thread.getId());
message.setOriginatingSide(false);
message.setProcessorToOriginator(false);
}
try {
writeMessage(message);
@ -469,7 +504,7 @@ public class MessagingInterface {
return;
}
if(IS_DEBUGGING_MESSAGES) {
System.err.println((message.isSyncExec()? "SYNDS": "SYNDA") + ": " + message.getID() + ", " + message);
System.err.println("[" + (isClient? "client": "server") + "] " + (message.isSyncExec()? "SENDS": "SENDA") + ": " + message.getID() + ", " + message);
}
synchronized(oos) {
oos.writeUnshared(message);
@ -487,12 +522,12 @@ public class MessagingInterface {
if(o instanceof Message) {
Message message = (Message)o;
if(IS_DEBUGGING_MESSAGES) {
System.err.println("RECV: " + message.getID() + ", " + message);
System.err.println("[" + (isClient? "client": "server") + "] RECV: " + message.getID() + ", " + message);
}
message.setCommunicationInterface(communicationInterface);
// message.setCommunicationInterface(communicationInterface);
return message;
}
System.err.println("Unknown message: " + o);
System.err.println("[" + (isClient? "client": "server") + "] Unknown message: " + o);
return null;
}