From 40ef76037aeac4ee2b9d857a092e5ea026c0bb5c Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 14 May 2014 13:23:26 +0200 Subject: Refactored Proxy update notification server into its own class and thread, so executions return before proxy messages are sent. Another deadlock suspect. --- .../java/com/c2kernel/entity/proxy/ItemProxy.java | 2 +- .../entity/proxy/ProxyClientConnection.java | 3 +- .../com/c2kernel/entity/proxy/ProxyManager.java | 72 +------------- .../com/c2kernel/entity/proxy/ProxyServer.java | 106 +++++++++++++++++++++ 4 files changed, 110 insertions(+), 73 deletions(-) create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyServer.java (limited to 'src/main/java/com/c2kernel/entity') diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 355acd8..f3a2f44 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -392,7 +392,7 @@ public class ItemProxy public void notify(ProxyMessage message) { Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); synchronized (this){ - if (!message.getServer().equals(ProxyManager.serverName)) + if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName())) Gateway.getStorage().clearCache(mSystemKey, message.getPath()); for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { MemberSubscription newSub = e.next(); diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 5abdb16..3a7e129 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Iterator; import com.c2kernel.common.InvalidDataException; +import com.c2kernel.process.Gateway; import com.c2kernel.utils.Logger; import com.c2kernel.utils.server.SocketHandler; @@ -36,7 +37,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() { super(); thisClientId = ++clientId; - ProxyManager.registerProxyClient(this); + Gateway.getProxyServer().registerProxyClient(this); Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java index d19e38f..b217f3e 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -26,7 +26,6 @@ import com.c2kernel.process.Gateway; import com.c2kernel.property.Property; import com.c2kernel.utils.Logger; import com.c2kernel.utils.SoftCache; -import com.c2kernel.utils.server.SimpleTCPIPServer; public class ProxyManager @@ -35,11 +34,6 @@ public class ProxyManager HashMap treeSubscribers = new HashMap(); HashMap connections = new HashMap(); - // server objects - static ArrayList proxyClients = new ArrayList(); - static SimpleTCPIPServer proxyServer = null; - static String serverName = null; - /** * Create a proxy manager to listen for proxy events and reap unused proxies */ @@ -268,70 +262,6 @@ public class ProxyManager } - /************************************************************************** - * Static Proxy Server methods - **************************************************************************/ - - /** - * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops - * @param c2kProps - */ - public static void initServer() - { - Logger.msg(5, "ProxyManager::initServer - Starting....."); - int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); - serverName = Gateway.getProperties().getProperty("ItemServer.name"); - if (port == 0) { - Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); - return; - } - - // set up the proxy server - try { - Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); - proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); - proxyServer.startListening(); - } catch (Exception ex) { - Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); - Logger.error(ex); - } - } - - public static void sendProxyEvent(ProxyMessage message) { - if (proxyServer != null && message.getPath() != null) - synchronized(proxyClients) { - for (ProxyClientConnection client : proxyClients) { - client.sendMessage(message); - } - } - } - - public static void reportConnections(int logLevel) { - synchronized(proxyClients) { - Logger.msg(logLevel, "Currently connected proxy clients:"); - for (ProxyClientConnection client : proxyClients) { - Logger.msg(logLevel, " "+client); - } - } - } - - public static void shutdownServer() { - if (proxyServer != null) { - Logger.msg(1, "ProxyManager: Closing Server."); - proxyServer.stopListening(); - } - } - - public static void registerProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.add(client); - } - } - - public static void unRegisterProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.remove(client); - } - } + } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java new file mode 100644 index 0000000..c576cda --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java @@ -0,0 +1,106 @@ +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; + +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.server.SimpleTCPIPServer; + +public class ProxyServer implements Runnable { + + // server objects + ArrayList proxyClients; + SimpleTCPIPServer proxyListener = null; + String serverName = null; + boolean keepRunning = true; + LinkedBlockingQueue messageQueue; + + public ProxyServer(String serverName) { + Logger.msg(5, "ProxyManager::initServer - Starting....."); + int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); + this.serverName = serverName; + this.proxyClients = new ArrayList(); + this.messageQueue = new LinkedBlockingQueue(); + + if (port == 0) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); + return; + } + + // set up the proxy server + try { + Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); + proxyListener = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); + proxyListener.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); + Logger.error(ex); + } + // start the message queue delivery thread + new Thread(this).start(); + } + + @Override + public void run() { + + while(keepRunning) { + ProxyMessage message = messageQueue.poll(); + if (message != null) { + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } else + try { + synchronized(this) { wait(); } + } catch (InterruptedException e) { } + } + + } + + public String getServerName() { + return serverName; + } + + public void sendProxyEvent(ProxyMessage message) { + try { + synchronized(this) { + messageQueue.put(message); + notify(); + } + } catch (InterruptedException e) { } + } + + public void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public void shutdownServer() { + Logger.msg(1, "ProxyManager: Closing Server."); + proxyListener.stopListening(); + synchronized(this) { + keepRunning = false; + notify(); + } + } + + public void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } + +} -- cgit v1.2.3