[#1177] Add SQL Console module to jOOQ - client/server communication

channel is replaced.
This commit is contained in:
Chrriis 2012-05-02 00:10:51 +02:00
parent ad8d4a159c
commit 5c4ff85e78
14 changed files with 1459 additions and 95 deletions

View File

@ -91,7 +91,7 @@ import javax.swing.event.DocumentListener;
import org.jooq.debug.Debugger;
import org.jooq.debug.DebuggerRegistry;
import org.jooq.debug.LocalDebugger;
import org.jooq.debug.console.remote.RemoteDebuggerClient;
import org.jooq.debug.console.remote.DebuggerClient;
/**
* @author Christopher Deckers
@ -475,7 +475,7 @@ public class Console extends JFrame {
}
final Debugger debugger;
try {
debugger = new RemoteDebuggerClient(args[0], Integer.parseInt(args[1]));
debugger = new DebuggerClient(args[0], Integer.parseInt(args[1]));
} catch(Exception e) {
e.printStackTrace();
return;

View File

@ -0,0 +1,48 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote;
import org.jooq.debug.console.remote.messaging.CommandMessage;
@SuppressWarnings("serial")
abstract class ClientDebuggerCommandMessage extends CommandMessage {
protected DebuggerClient getDebugger() {
return (DebuggerClient)((DebuggerCommmunicationInterface)getCommunicationInterface()).getDebugger();
}
}

View File

@ -0,0 +1,126 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote;
import org.jooq.debug.Debugger;
import org.jooq.debug.LoggingListener;
import org.jooq.debug.QueryLoggingData;
import org.jooq.debug.ResultSetLoggingData;
import org.jooq.debug.StatementExecutor;
import org.jooq.debug.StatementMatcher;
import org.jooq.debug.console.remote.messaging.CommunicationInterface;
import org.jooq.debug.console.remote.messaging.CommunicationInterfaceFactory;
/**
* @author Christopher Deckers
*/
public class DebuggerClient implements Debugger {
private CommunicationInterface communicationInterface;
public DebuggerClient(String ip, int port) throws Exception {
communicationInterface = CommunicationInterface.openClientCommunicationChannel(new CommunicationInterfaceFactory() {
@Override
public CommunicationInterface createCommunicationInterface(int port_) {
return new DebuggerCommmunicationInterface(DebuggerClient.this, port_);
}
}, ip, port);
}
private LoggingListener loggingListener;
@Override
public void setLoggingListener(LoggingListener loggingListener) {
this.loggingListener = loggingListener;
new DebuggerServer.CMS_setLoggingActive().asyncExec(communicationInterface, loggingListener != null);
}
@Override
public LoggingListener getLoggingListener() {
return loggingListener;
}
private StatementMatcher[] loggingStatementMatchers;
@Override
public void setLoggingStatementMatchers(StatementMatcher[] loggingStatementMatchers) {
this.loggingStatementMatchers = loggingStatementMatchers;
new DebuggerServer.CMS_setLoggingStatementMatchers().asyncExec(communicationInterface, (Object)loggingStatementMatchers);
}
@Override
public StatementMatcher[] getLoggingStatementMatchers() {
return loggingStatementMatchers;
}
@Override
public boolean isExecutionSupported() {
// TODO: implement
return false;
}
@Override
public StatementExecutor createStatementExecutor() {
// TODO: implement
return null;
}
@SuppressWarnings("serial")
static class CMC_logQueries extends ClientDebuggerCommandMessage {
@Override
public Object run(Object[] args) {
LoggingListener loggingListener = getDebugger().getLoggingListener();
if(loggingListener != null) {
loggingListener.logQueries((QueryLoggingData)args[0]);
}
return null;
}
}
@SuppressWarnings("serial")
static class CMC_logResultSet extends ClientDebuggerCommandMessage {
@Override
public Object run(Object[] args) {
LoggingListener loggingListener = getDebugger().getLoggingListener();
if(loggingListener != null) {
loggingListener.logResultSet((Integer)args[0], (ResultSetLoggingData)args[1]);
}
return null;
}
}
}

View File

@ -0,0 +1,55 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote;
import org.jooq.debug.Debugger;
import org.jooq.debug.console.remote.messaging.CommunicationInterface;
class DebuggerCommmunicationInterface extends CommunicationInterface {
private Debugger debugger;
public DebuggerCommmunicationInterface(Debugger debugger, int port) {
super(port);
this.debugger = debugger;
}
public Debugger getDebugger() {
return debugger;
}
}

View File

@ -0,0 +1,94 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote;
import org.jooq.debug.LocalDebugger;
import org.jooq.debug.LoggingListener;
import org.jooq.debug.QueryLoggingData;
import org.jooq.debug.ResultSetLoggingData;
import org.jooq.debug.StatementMatcher;
import org.jooq.debug.console.DatabaseDescriptor;
import org.jooq.debug.console.remote.messaging.CommunicationInterface;
class DebuggerServer extends LocalDebugger {
public DebuggerServer(DatabaseDescriptor databaseDescriptor) {
super(databaseDescriptor);
}
private CommunicationInterface communicationInterface;
void setCommunicationInterface(CommunicationInterface communicationInterface) {
this.communicationInterface = communicationInterface;
}
private void setLoggingActive(boolean isActive) {
if(isActive) {
setLoggingListener(new LoggingListener() {
@Override
public void logQueries(QueryLoggingData queryLoggingData) {
new DebuggerClient.CMC_logQueries().asyncExec(communicationInterface, queryLoggingData);
}
@Override
public void logResultSet(int sqlQueryDebuggerDataID, ResultSetLoggingData resultSetLoggingData) {
new DebuggerClient.CMC_logResultSet().asyncExec(communicationInterface, sqlQueryDebuggerDataID, resultSetLoggingData);
}
});
} else {
setLoggingListener(null);
}
}
@SuppressWarnings("serial")
static class CMS_setLoggingActive extends ServerDebuggerCommandMessage {
@Override
public Object run(Object[] args) {
getDebugger().setLoggingActive((Boolean)args[0]);
return null;
}
}
@SuppressWarnings("serial")
static class CMS_setLoggingStatementMatchers extends ServerDebuggerCommandMessage {
@Override
public Object run(Object[] args) {
getDebugger().setLoggingStatementMatchers((StatementMatcher[])args[0]);
return null;
}
}
}

View File

@ -36,112 +36,50 @@
*/
package org.jooq.debug.console.remote;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import org.jooq.debug.Debugger;
import org.jooq.debug.DebuggerRegistry;
import org.jooq.debug.LocalDebugger;
import org.jooq.debug.LoggingListener;
import org.jooq.debug.QueryLoggingData;
import org.jooq.debug.ResultSetLoggingData;
import org.jooq.debug.console.DatabaseDescriptor;
import org.jooq.debug.console.remote.messaging.CommunicationInterface;
import org.jooq.debug.console.remote.messaging.CommunicationInterfaceFactory;
/**
* @author Christopher Deckers
*/
public class RemoteDebuggerServer {
private final Object LOCK = new Object();
private ServerSocket serverSocket;
private final Object LOCK = new Object();
private ServerSocket serverSocket;
public RemoteDebuggerServer(final int port) {
Thread serverThread = new Thread("SQL Remote Debugger Server on port " + port) {
@Override
public void run() {
try {
synchronized(LOCK) {
serverSocket = new ServerSocket(port);
}
while(true) {
ServerSocket serverSocket_;
synchronized(LOCK) {
serverSocket_ = serverSocket;
}
if(serverSocket_ != null) {
Socket socket = serverSocket_.accept();
startServerToClientThread(socket, port);
}
}
} catch(Exception e) {
e.printStackTrace();
}
}
};
serverThread.setDaemon(true);
serverThread.start();
}
// TODO: pass databaseDescriptor.
final DatabaseDescriptor databaseDescriptor = null;
try {
synchronized(LOCK) {
serverSocket = CommunicationInterface.openServerCommunicationChannel(new CommunicationInterfaceFactory() {
@Override
public CommunicationInterface createCommunicationInterface(int port_) {
final DebuggerServer serverDebugger = new DebuggerServer(databaseDescriptor);
DebuggerCommmunicationInterface commmunicationInterface = new DebuggerCommmunicationInterface(serverDebugger, port_) {
@Override
protected void processOpened() {
DebuggerRegistry.addSqlQueryDebugger(serverDebugger);
}
private void startServerToClientThread(final Socket socket, int port) {
Thread clientThread = new Thread("SQL Remote Debugger Server-Client on port " + port) {
@Override
public void run() {
Debugger debugger = null;
boolean isLogging = false;
try {
ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
final ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
// TODO: find how to pass a database descriptor for remote edition.
debugger = new LocalDebugger(null);
DebuggerRegistry.addSqlQueryDebugger(debugger);
for(Message o; (o=(Message)in.readObject()) != null; ) {
if(o instanceof ServerLoggingActivationMessage) {
isLogging = ((ServerLoggingActivationMessage) o).isLogging();
if(isLogging) {
debugger.setLoggingListener(new LoggingListener() {
@Override
public void logQueries(QueryLoggingData queryLoggingData) {
try {
out.writeObject(new ClientDebugQueriesMessage(queryLoggingData));
out.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void logResultSet(int sqlQueryDebuggerDataID, ResultSetLoggingData resultSetLoggingData) {
try {
out.writeObject(new ClientDebugResultSetMessage(sqlQueryDebuggerDataID, resultSetLoggingData));
out.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
});
} else {
debugger.setLoggingListener(null);
}
} else if(o instanceof ServerLoggingStatementMatchersMessage) {
debugger.setLoggingStatementMatchers(((ServerLoggingStatementMatchersMessage) o).getStatementMatchers());
}
}
} catch(Exception e) {
if(isLogging) {
e.printStackTrace();
}
} finally {
if(debugger != null) {
DebuggerRegistry.removeSqlQueryDebugger(debugger);
}
}
}
};
clientThread.setDaemon(true);
clientThread.start();
@Override
protected void processClosed() {
DebuggerRegistry.removeSqlQueryDebugger(serverDebugger);
}
};
serverDebugger.setCommunicationInterface(commmunicationInterface);
return commmunicationInterface;
}
}, port);
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public void close() {

View File

@ -0,0 +1,48 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote;
import org.jooq.debug.console.remote.messaging.CommandMessage;
@SuppressWarnings("serial")
abstract class ServerDebuggerCommandMessage extends CommandMessage {
protected DebuggerServer getDebugger() {
return (DebuggerServer)((DebuggerCommmunicationInterface)getCommunicationInterface()).getDebugger();
}
}

View File

@ -0,0 +1,120 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote.messaging;
import java.util.Arrays;
/**
* A type of message that executes a command with optional arguments and returns a result.
* @author Christopher Deckers
*/
@SuppressWarnings("serial")
public abstract class CommandMessage extends Message {
public CommandMessage() {
}
private Object[] args;
/**
* Set the arguments that will be used when the message is run.
* @param args the arguments, which must be serializable.
*/
void setArgs(Object... args) {
if(args.length == 0) {
args = null;
}
this.args = args;
}
/**
* Execute that message asynchronously with the given arguments.
* @param args the arguments, which must be serializable.
*/
public void asyncExec(CommunicationInterface communicationInterface, Object... args) {
setArgs(args);
asyncSend(communicationInterface);
}
/**
* Execute that message synchronously with the given arguments and return the result.
* @param args the arguments, which must be serializable.
* @return the result of the execution.
*/
public Object syncExec(CommunicationInterface communicationInterface, Object... args) {
setArgs(args);
return syncSend(communicationInterface);
}
private static final Object[] EMPTY_ARGS = new Object[0];
Object runCommand() throws Exception {
return run(args == null? EMPTY_ARGS: args);
}
/**
* Run the message.
* @param args the arguments that were specified for that command, or an empty array if none were specified.
* @return the result that may be passed back to the caller.
* @throws Exception any exception that may happen, and which would be passed back if it is a synchronous execution.
*/
public abstract Object run(Object[] args) throws Exception;
@Override
public String toString() {
String s = super.toString();
if(args == null || args.length == 0) {
return s + "()";
}
StringBuilder sb = new StringBuilder();
sb.append(s).append('(');
for(int i=0; i<args.length; i++) {
Object arg = args[i];
if(i > 0) {
sb.append(", ");
}
if(arg != null && arg.getClass().isArray()) {
sb.append(Arrays.deepToString((Object[])arg));
} else {
sb.append(arg);
}
}
sb.append(')');
return sb.toString();
}
}

View File

@ -0,0 +1,192 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote.messaging;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* The native interface, which establishes the link between a peer VM and this local side.
* @author Christopher Deckers
*/
public class CommunicationInterface {
private final boolean IS_SYNCING_MESSAGES = Boolean.parseBoolean(System.getProperty("communication.interface.syncmessages"));
private volatile boolean isOpen;
public boolean isOpen() {
return isOpen;
}
private void checkOpen() {
if(!isOpen()) {
throw new IllegalStateException("The interface is not open!");
}
}
public void close() {
isOpen = false;
if(messagingInterface != null) {
messagingInterface.destroy();
messagingInterface = null;
}
}
public static CommunicationInterface openClientCommunicationChannel(CommunicationInterfaceFactory communicationInterfaceFactory, String ip, int port) throws Exception {
CommunicationInterface communicationInterface = communicationInterfaceFactory.createCommunicationInterface(port);
communicationInterface.createClientCommunicationChannel(ip);
return communicationInterface;
}
void notifyOpen() {
isOpen = true;
processOpened();
}
protected void processOpened() {
}
void notifyKilled() {
isOpen = false;
messagingInterface = null;
processClosed();
}
protected void processClosed() {
}
private int port;
private void createClientCommunicationChannel(String ip) throws Exception {
// Create the interface to communicate with the process handling the other side
Socket socket = null;
// 2 attempts
for(int i=1; i>=0; i--) {
try {
socket = new Socket(ip, port);
break;
} catch(IOException e) {
if(i == 0) {
throw new RuntimeException(e);
}
}
try {
Thread.sleep(100);
} catch(Exception e) {
}
}
if(socket == null) {
throw new IllegalStateException("Failed to connect to " + ip + "!");
}
messagingInterface = new MessagingInterface(this, socket);
notifyOpen();
}
public CommunicationInterface(int port) {
this.port = port;
}
private volatile MessagingInterface messagingInterface;
MessagingInterface getMessagingInterface() {
return messagingInterface;
}
Object syncSend(final Message message) {
checkOpen();
if(message instanceof LocalMessage) {
LocalMessage localMessage = (LocalMessage)message;
return localMessage.runCommand();
}
return messagingInterface.syncSend(message);
}
void asyncSend(final Message message) {
if(IS_SYNCING_MESSAGES) {
syncSend(message);
} else {
checkOpen();
if(message instanceof LocalMessage) {
LocalMessage localMessage = (LocalMessage)message;
localMessage.runCommand();
return;
}
messagingInterface.asyncSend(message);
}
}
public static ServerSocket openServerCommunicationChannel(final CommunicationInterfaceFactory communicationInterfaceFactory, final int port) throws Exception {
final ServerSocket serverSocket;
try {
serverSocket = new ServerSocket();
// serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(port));
} catch(IOException e) {
throw e;
}
Thread serverThread = new Thread("Communication channel server on port " + port) {
@Override
public void run() {
while(!serverSocket.isClosed()) {
final Socket socket;
try {
socket = serverSocket.accept();
new Thread("Communication channel - client connection") {
@Override
public void run() {
CommunicationInterface communicationInterface = communicationInterfaceFactory.createCommunicationInterface(port);
communicationInterface.messagingInterface = new MessagingInterface(communicationInterface, socket);
communicationInterface.notifyOpen();
}
}.start();
} catch(Exception e) {
if(!serverSocket.isClosed()) {
e.printStackTrace();
}
}
}
}
};
serverThread.setDaemon(true);
serverThread.start();
return serverSocket;
}
}

View File

@ -0,0 +1,44 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote.messaging;
public interface CommunicationInterfaceFactory {
public CommunicationInterface createCommunicationInterface(int port);
}

View File

@ -0,0 +1,64 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote.messaging;
/**
* A local message is a special message that is not sent through the messaging interface. It is normally used to sequence a local command among remote commands.
* @author Christopher Deckers
*/
@SuppressWarnings("serial")
public abstract class LocalMessage extends CommandMessage {
public LocalMessage() {
}
@Override
Object runCommand() {
try {
return super.runCommand();
} catch(RuntimeException e) {
throw e;
} catch(Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public abstract Object run(Object[] args);
}

View File

@ -0,0 +1,114 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote.messaging;
import java.io.Serializable;
/**
* The superclass of all the messages that are exchanged at the communication interface.
* @author Christopher Deckers
*/
@SuppressWarnings("serial")
public abstract class Message implements Serializable {
private static int nextID = 1;
private int id = nextID++;
private boolean isSyncExec;
/**
* Create an empty message.
*/
public Message() {
}
private transient CommunicationInterface communicationInterface;
void setCommunicationInterface(CommunicationInterface communicationInterface) {
this.communicationInterface = communicationInterface;
}
protected CommunicationInterface getCommunicationInterface() {
return communicationInterface;
}
int getID() {
return id;
}
void setSyncExec(boolean isSyncExec) {
this.isSyncExec = isSyncExec;
}
boolean isSyncExec() {
return isSyncExec;
}
/**
* Send that message asynchronously.
*/
public void asyncSend(CommunicationInterface communicationInterface) {
communicationInterface.asyncSend(this);
}
/**
* Send that message synchronously, potentially returning a result if the message type allows that.
* @return the result if any.
*/
public Object syncSend(CommunicationInterface communicationInterface) {
return communicationInterface.syncSend(this);
}
/**
* Indicate whether the message is valid. This is called before interpreting it to give a chance for the message to prevent its interpretation.
* @return true if the message is valid and should be interpreted, false otherwise.
*/
protected boolean isValid() {
return true;
}
@Override
public String toString() {
String name = getClass().getName();
if(name.startsWith("chrriis.dj.nativeswing.")) {
name = name.substring("chrriis.dj.nativeswing.".length());
}
return name;
}
}

View File

@ -0,0 +1,360 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote.messaging;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.LinkedList;
import java.util.List;
/**
* @author Christopher Deckers
*/
public class MessagingInterface {
private static final boolean IS_DEBUGGING_MESSAGES = Boolean.parseBoolean(System.getProperty("communication.interface.debug.printmessages"));
@SuppressWarnings("serial")
private static class CommandResultMessage extends Message {
private int originalID;
private Object result;
private Throwable exception;
CommandResultMessage(int originalID, Object result, Throwable exception) {
this.originalID = originalID;
this.result = result;
this.exception = exception;
}
int getOriginalID() {
return originalID;
}
public Object getResult() {
return result;
}
public Throwable getException() {
return exception;
}
@Override
public String toString() {
return super.toString() + "(" + originalID + ")";
}
}
private Object RECEIVER_LOCK = new Object();
private ObjectOutputStream oos;
private ObjectInputStream ois;
private boolean isAlive = true;
public boolean isAlive() {
return isAlive;
}
public void destroy() {
isAlive = false;
try {
ois.close();
} catch(Exception e) {
}
}
private CommunicationInterface communicationInterface;
public MessagingInterface(final CommunicationInterface communicationInterface, final Socket socket) {
this.communicationInterface = communicationInterface;
try {
oos = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()) {
@Override
public synchronized void write(int b) throws IOException {
super.write(b);
oosByteCount++;
}
@Override
public synchronized void write(byte[] b, int off, int len) throws IOException {
super.write(b, off, len);
oosByteCount += len;
}
});
oos.flush();
ois = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
} catch(IOException e) {
throw new RuntimeException(e);
}
Thread receiverThread = new Thread("MessagingInterface Receiver") {
@Override
public void run() {
while(isAlive) {
Message message = null;
try {
message = readMessage();
} catch(Exception e) {
if(isAlive) {
isAlive = false;
// e.printStackTrace();
try {
communicationInterface.notifyKilled();
} catch(Exception ex) {
ex.printStackTrace();
}
}
// Unlock all locked sync calls
synchronized(RECEIVER_LOCK) {
receivedMessageList.clear();
RECEIVER_LOCK.notify();
}
for(int instanceID: syncThreadRegistry.getInstanceIDs()) {
Thread thread = (Thread)syncThreadRegistry.get(instanceID);
if(thread != null) {
synchronized(thread) {
thread.notify();
}
}
}
}
if(message != null) {
final Message message_ = message;
new Thread("Communication Interface Async") {
@Override
public void run() {
runMessage(message_);
}
}.start();
}
}
try {
oos.close();
} catch(Exception e) {
}
try {
ois.close();
} catch(Exception e) {
}
try {
socket.close();
} catch(Exception e) {
}
}
};
receiverThread.setDaemon(true);
receiverThread.start();
}
private CommandResultMessage runMessage(Message message) {
if(IS_DEBUGGING_MESSAGES) {
System.err.println(">RUN: " + message.getID() + ", " + message);
}
CommandResultMessage commandResultMessage;
if(message instanceof CommandMessage) {
CommandMessage commandMessage = (CommandMessage)message;
Object result = null;
Throwable throwable = null;
if(message.isValid()) {
try {
result = commandMessage.runCommand();
} catch(Throwable t) {
throwable = t;
}
}
if(commandMessage.isSyncExec()) {
commandResultMessage = new CommandResultMessage(commandMessage.getID(), result, throwable);
asyncSend(commandResultMessage);
} else {
if(throwable != null) {
throwable.printStackTrace();
}
commandResultMessage = new CommandResultMessage(message.getID(), result, throwable);
}
} else {
commandResultMessage = new CommandResultMessage(message.getID(), null, null);
if(message.isSyncExec()) {
asyncSend(commandResultMessage);
}
}
if(IS_DEBUGGING_MESSAGES) {
System.err.println("<RUN: " + message.getID());
}
return commandResultMessage;
}
private List<Message> receivedMessageList = new LinkedList<Message>();
@SuppressWarnings("serial")
private static class CM_asyncExecResponse extends CommandMessage {
@Override
public Object run(Object[] args) {
MessagingInterface messagingInterface = getCommunicationInterface().getMessagingInterface();
int instanceID = (Integer)args[0];
Thread thread = (Thread)messagingInterface.syncThreadRegistry.get(instanceID);
messagingInterface.syncThreadRegistry.remove(instanceID);
if(thread == null) {
return null;
}
synchronized(thread) {
messagingInterface.syncThreadRegistry.add(args[1], instanceID);
thread.notify();
}
return null;
}
}
@SuppressWarnings("serial")
private static class CM_asyncExec extends CommandMessage {
@Override
public Object run(Object[] args) {
Message message = (Message)args[1];
message.setSyncExec(false);
CommunicationInterface communicationInterface = getCommunicationInterface();
MessagingInterface messagingInterface = communicationInterface.getMessagingInterface();
CM_asyncExecResponse asyncExecResponse = new CM_asyncExecResponse();
asyncExecResponse.setArgs(args[0], messagingInterface.runMessage(message));
messagingInterface.asyncSend(asyncExecResponse);
return null;
}
}
private ObjectRegistry syncThreadRegistry = new ObjectRegistry();
private void printFailedInvocation(Message message) {
System.err.println("Failed messaging: " + message);
}
public Object syncSend(Message message) {
Thread thread = Thread.currentThread();
final int instanceID = syncThreadRegistry.add(Thread.currentThread());
CM_asyncExec asyncExec = new CM_asyncExec();
asyncExec.setArgs(instanceID, message);
asyncSend(asyncExec);
synchronized(thread) {
while(syncThreadRegistry.get(instanceID) instanceof Thread) {
try {
thread.wait();
} catch(Exception e) {
}
if(!isAlive()) {
syncThreadRegistry.remove(instanceID);
printFailedInvocation(message);
return null;
}
}
CommandResultMessage commandResultMessage = (CommandResultMessage)syncThreadRegistry.get(instanceID);
syncThreadRegistry.remove(instanceID);
return processCommandResult(commandResultMessage);
}
}
private Object processCommandResult(CommandResultMessage commandResultMessage) {
if(IS_DEBUGGING_MESSAGES) {
System.err.println("<USE: " + commandResultMessage.getID());
}
Throwable exception = commandResultMessage.getException();
if(exception != null) {
// if(exception instanceof RuntimeException) {
// throw (RuntimeException)exception;
// }
throw new RuntimeException(exception);
}
return commandResultMessage.getResult();
}
public void asyncSend(Message message) {
message.setSyncExec(false);
try {
writeMessage(message);
} catch(Exception e) {
throw new IllegalStateException(e);
}
}
private static final int OOS_RESET_THRESHOLD;
static {
String maxByteCountProperty = System.getProperty("communication.interface.streamresetthreshold");
if(maxByteCountProperty != null) {
OOS_RESET_THRESHOLD = Integer.parseInt(maxByteCountProperty);
} else {
OOS_RESET_THRESHOLD = 500000;
}
}
private int oosByteCount;
private void writeMessage(Message message) throws IOException {
if(!isAlive()) {
printFailedInvocation(message);
return;
}
if(IS_DEBUGGING_MESSAGES) {
System.err.println((message.isSyncExec()? "SYNDS": "SYNDA") + ": " + message.getID() + ", " + message);
}
synchronized(oos) {
oos.writeUnshared(message);
oos.flush();
// Messages are cached, so we need to reset() from time to time to clean the cache, or else we get an OutOfMemoryError.
if(oosByteCount > OOS_RESET_THRESHOLD) {
oos.reset();
oosByteCount = 0;
}
}
}
private Message readMessage() throws IOException, ClassNotFoundException {
Object o = ois.readUnshared();
if(o instanceof Message) {
Message message = (Message)o;
if(IS_DEBUGGING_MESSAGES) {
System.err.println("RECV: " + message.getID() + ", " + message);
}
message.setCommunicationInterface(communicationInterface);
return message;
}
System.err.println("Unknown message: " + o);
return null;
}
}

View File

@ -0,0 +1,161 @@
/**
* Copyright (c) 2009-2012, Lukas Eder, lukas.eder@gmail.com
* Christopher Deckers, chrriis@gmail.com
* All rights reserved.
*
* This software is licensed to you under the Apache License, Version 2.0
* (the "License"); You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name "jOOQ" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.jooq.debug.console.remote.messaging;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
/**
* A convenient class to register objects to an ID.
* @author Christopher Deckers
*/
class ObjectRegistry {
private Thread cleanUpThread;
private synchronized void startThread() {
if(cleanUpThread != null) {
return;
}
cleanUpThread = new Thread("Registry cleanup thread") {
@Override
public void run() {
while(true) {
try {
sleep(5000);
} catch(Exception e) {
}
synchronized(ObjectRegistry.this) {
for(Integer instanceID: instanceIDToObjectReferenceMap.keySet().toArray(new Integer[0])) {
if(instanceIDToObjectReferenceMap.get(instanceID).get() == null) {
instanceIDToObjectReferenceMap.remove(instanceID);
}
}
if(instanceIDToObjectReferenceMap.isEmpty()) {
cleanUpThread = null;
return;
}
}
}
}
};
cleanUpThread.setDaemon(true);
cleanUpThread.start();
}
private int nextInstanceID = 1;
private Map<Integer, WeakReference<Object>> instanceIDToObjectReferenceMap = new HashMap<Integer, WeakReference<Object>>();
/**
* Construct an object registry.
*/
public ObjectRegistry() {
}
/**
* Add an object to the registry.
* @param o the object to add.
* @return an unused instance ID that is strictly greater than 0.
*/
public synchronized int add(Object o) {
while(true) {
int instanceID = nextInstanceID++;
if(!instanceIDToObjectReferenceMap.containsKey(instanceID)) {
if(o == null) {
return instanceID;
}
instanceIDToObjectReferenceMap.put(instanceID, new WeakReference<Object>(o));
startThread();
return instanceID;
}
}
}
/**
* Add an object to the registry, specifying its ID, which throws an exception if the ID is already in use.
* @param o the object to add.
* @param instanceID the ID to associate the object to.
*/
public synchronized void add(Object o, int instanceID) {
Object o2 = get(instanceID);
if(o2 != null && o2 != o) {
throw new IllegalStateException("An object is already registered with the id \"" + instanceID + "\" for object: " + o);
}
instanceIDToObjectReferenceMap.put(instanceID, new WeakReference<Object>(o));
startThread();
}
/**
* Get an object using its ID.
* @return the object, or null.
*/
public synchronized Object get(int instanceID) {
WeakReference<Object> weakReference = instanceIDToObjectReferenceMap.get(instanceID);
if(weakReference == null) {
return null;
}
Object o = weakReference.get();
if(o == null) {
instanceIDToObjectReferenceMap.remove(instanceID);
}
return o;
}
/**
* Remove an object from the registry using its instance ID.
* @param instanceID the ID of the object to remove.
*/
public synchronized void remove(int instanceID) {
instanceIDToObjectReferenceMap.remove(instanceID);
}
/**
* Get all the instance IDs that are used in this registry.
* @return the instance IDs.
*/
public int[] getInstanceIDs() {
Object[] instanceIDObjects = instanceIDToObjectReferenceMap.keySet().toArray();
int[] instanceIDs = new int[instanceIDObjects.length];
for(int i=0; i<instanceIDObjects.length; i++) {
instanceIDs[i] = (Integer)instanceIDObjects[i];
}
return instanceIDs;
}
}