From a7b3b5effef290d0c6cd0b7a65d50440594bae50 Mon Sep 17 00:00:00 2001 From: Chrriis Date: Thu, 3 May 2012 22:53:12 +0200 Subject: [PATCH] [#1177] Add SQL Console module to jOOQ - Communication interface implementation for re-entrant calls. --- .../messaging/CommunicationInterface.java | 6 +- .../console/remote/messaging/Message.java | 13 +- .../remote/messaging/MessagingInterface.java | 123 +++++++++++------- 3 files changed, 88 insertions(+), 54 deletions(-) diff --git a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/CommunicationInterface.java b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/CommunicationInterface.java index 24d0b18d8f..6ea30df582 100644 --- a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/CommunicationInterface.java +++ b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/CommunicationInterface.java @@ -115,7 +115,7 @@ public class CommunicationInterface { if(socket == null) { throw new IllegalStateException("Failed to connect to " + ip + "!"); } - messagingInterface = new MessagingInterface(this, socket); + messagingInterface = new MessagingInterface(this, socket, true); notifyOpen(); } @@ -133,6 +133,7 @@ public class CommunicationInterface { checkOpen(); if(message instanceof LocalMessage) { LocalMessage localMessage = (LocalMessage)message; + localMessage.setCommunicationInterface(this); return localMessage.runCommand(); } return messagingInterface.syncSend(message); @@ -145,6 +146,7 @@ public class CommunicationInterface { checkOpen(); if(message instanceof LocalMessage) { LocalMessage localMessage = (LocalMessage)message; + localMessage.setCommunicationInterface(this); localMessage.runCommand(); return; } @@ -172,7 +174,7 @@ public class CommunicationInterface { @Override public void run() { CommunicationInterface communicationInterface = communicationInterfaceFactory.createCommunicationInterface(port); - communicationInterface.messagingInterface = new MessagingInterface(communicationInterface, socket); + communicationInterface.messagingInterface = new MessagingInterface(communicationInterface, socket, false); communicationInterface.notifyOpen(); } }.start(); diff --git a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java index cfd5648f45..21afbf18c0 100644 --- a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java +++ b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/Message.java @@ -51,7 +51,7 @@ public abstract class Message implements Serializable { private int id = nextID++; private boolean isSyncExec; private long threadID; - private boolean isOriginatingSide; + private boolean isProcessorToOriginator; /** * Create an empty message. @@ -62,9 +62,6 @@ public abstract class Message implements Serializable { private transient CommunicationInterface communicationInterface; void setCommunicationInterface(CommunicationInterface communicationInterface) { - if(communicationInterface == null) { - System.err.println("in"); - } this.communicationInterface = communicationInterface; } @@ -88,12 +85,12 @@ public abstract class Message implements Serializable { return threadID; } - public void setOriginatingSide(boolean isOriginatingSide) { - this.isOriginatingSide = isOriginatingSide; + void setProcessorToOriginator(boolean isProcessorToOriginator) { + this.isProcessorToOriginator = isProcessorToOriginator; } - public boolean isOriginatingSide() { - return isOriginatingSide; + boolean isProcessorToOriginator() { + return isProcessorToOriginator; } boolean isSyncExec() { diff --git a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java index e1e4715726..dd308e4108 100644 --- a/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java +++ b/jOOQ-console/src/main/java/org/jooq/debug/console/remote/messaging/MessagingInterface.java @@ -112,7 +112,7 @@ public class MessagingInterface { private List messageList = new LinkedList(); public MessageProcessingThread(long originatorThreadID) { - super("Communication Interface Message Dispatcher [" + originatorThreadID + "]"); + setName("Communication Interface Message Dispatcher-" + getId() + " [" + originatorThreadID + "]"); this.originatorThreadID = originatorThreadID; } @@ -126,6 +126,16 @@ public class MessagingInterface { return originatorThreadID; } + private boolean isWaitingOnSyncCall; + + public boolean isWaitingOnSyncCall() { + return isWaitingOnSyncCall; + } + + public void setWaitingOnSyncCall(boolean isWaitingOnSyncCall) { + this.isWaitingOnSyncCall = isWaitingOnSyncCall; + } + @Override public void run() { while(true) { @@ -162,9 +172,11 @@ public class MessagingInterface { private Map originatorThreadIDToThreadMap = new HashMap(); private CommunicationInterface communicationInterface; + private boolean isClient; - public MessagingInterface(final CommunicationInterface communicationInterface, final Socket socket) { + public MessagingInterface(final CommunicationInterface communicationInterface, final Socket socket, boolean isClient) { this.communicationInterface = communicationInterface; + this.isClient = isClient; try { oos = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()) { @Override @@ -183,7 +195,7 @@ public class MessagingInterface { } catch(IOException e) { throw new RuntimeException(e); } - Thread receiverThread = new Thread("MessagingInterface Receiver") { + Thread receiverThread = new Thread("MessagingInterface Receiver (" + (isClient? "client": "server") + ")") { @Override public void run() { while(isAlive) { @@ -215,14 +227,37 @@ public class MessagingInterface { } if(message != null) { long threadID = message.getThreadID(); - boolean isOriginatingSide = message.isOriginatingSide(); - if(isOriginatingSide) { +// boolean isOriginatingSide = message instanceof CM_asyncExecResponse; + boolean isProcessorToOriginator = message.isProcessorToOriginator(); + if(!isProcessorToOriginator) { + MessageProcessingThread messageProcessingThread; + boolean isNew = false; + synchronized (originatorThreadIDToThreadMap) { + messageProcessingThread = originatorThreadIDToThreadMap.get(threadID); + if(messageProcessingThread == null) { + messageProcessingThread = new MessageProcessingThread(threadID); + originatorThreadIDToThreadMap.put(threadID, messageProcessingThread); + isNew = true; + messageProcessingThread.addMessage(message); + } else { + if(messageProcessingThread.isWaitingOnSyncCall()) { + threadID = messageProcessingThread.getId(); + isProcessorToOriginator = true; + } else { + messageProcessingThread.addMessage(message); + } + } + } + if(isNew) { + messageProcessingThread.start(); + } + } + if(isProcessorToOriginator) { ThreadInfo threadInfo; synchronized (idToThreadInfo) { threadInfo = idToThreadInfo.get(threadID); } if(threadInfo != null) { - // TODO: process message by original thread. if(message instanceof CommandResultMessage) { throw new IllegalStateException("I need to indicate if command result message."); } @@ -233,21 +268,6 @@ public class MessagingInterface { } else { System.err.println("What to do?"); } - } else { - MessageProcessingThread messageProcessingThread; - boolean isNew = false; - synchronized (originatorThreadIDToThreadMap) { - messageProcessingThread = originatorThreadIDToThreadMap.get(threadID); - if(messageProcessingThread == null) { - messageProcessingThread = new MessageProcessingThread(threadID); - originatorThreadIDToThreadMap.put(threadID, messageProcessingThread); - isNew = true; - } - messageProcessingThread.addMessage(message); - } - if(isNew) { - messageProcessingThread.start(); - } } } } @@ -271,7 +291,7 @@ public class MessagingInterface { private CommandResultMessage runMessage(Message message) { if(IS_DEBUGGING_MESSAGES) { - System.err.println(">RUN: " + message.getID() + ", " + message); + System.err.println("[" + (isClient? "client": "server") + "] >RUN: " + message.getID() + ", " + message); } CommandResultMessage commandResultMessage; if(message instanceof CommandMessage) { @@ -280,6 +300,7 @@ public class MessagingInterface { Throwable throwable = null; if(message.isValid()) { try { + commandMessage.setCommunicationInterface(communicationInterface); result = commandMessage.runCommand(); } catch(Throwable t) { throwable = t; @@ -301,7 +322,7 @@ public class MessagingInterface { } } if(IS_DEBUGGING_MESSAGES) { - System.err.println(" idToThreadInfo = new HashMap(); private void printFailedInvocation(Message message) { - System.err.println("Failed messaging: " + message); + System.err.println("[" + (isClient? "client": "server") + "] Failed messaging: " + message); } public Object syncSend(Message message) { Thread thread = Thread.currentThread(); long threadID = thread.getId(); ThreadInfo threadInfo = new ThreadInfo(); - boolean isAdded; + ThreadInfo previousThreadInfo; synchronized (idToThreadInfo) { - isAdded = idToThreadInfo.put(threadID, threadInfo) == null; + previousThreadInfo = idToThreadInfo.put(threadID, threadInfo); + } + if(thread instanceof MessageProcessingThread) { + synchronized (originatorThreadIDToThreadMap) { + ((MessageProcessingThread)thread).setWaitingOnSyncCall(true); + } } CM_asyncExec asyncExec = new CM_asyncExec(); asyncExec.setArgs(threadID, message); @@ -400,27 +430,32 @@ public class MessagingInterface { } } Message value = threadInfo.getMessage(); + threadInfo.clearMessage(); if(value instanceof CommandResultMessage) { commandResultMessage = (CommandResultMessage)value; } else { - // TODO: is that correct? - commandResultMessage = runMessage(value); -// message.setCommunicationInterface(this.communicationInterface); -// runMessage(message); + runMessage(value); } } } - if(isAdded) { - synchronized (idToThreadInfo) { + synchronized (idToThreadInfo) { + if(previousThreadInfo != null) { + idToThreadInfo.put(threadID, previousThreadInfo); + } else { idToThreadInfo.remove(threadID); } } + if(previousThreadInfo == null &&thread instanceof MessageProcessingThread) { + synchronized (originatorThreadIDToThreadMap) { + ((MessageProcessingThread)thread).setWaitingOnSyncCall(false); + } + } return processCommandResult(commandResultMessage); } private Object processCommandResult(CommandResultMessage commandResultMessage) { if(IS_DEBUGGING_MESSAGES) { - System.err.println("