From 40ef76037aeac4ee2b9d857a092e5ea026c0bb5c Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 14 May 2014 13:23:26 +0200 Subject: Refactored Proxy update notification server into its own class and thread, so executions return before proxy messages are sent. Another deadlock suspect. --- .../java/com/c2kernel/entity/proxy/ItemProxy.java | 2 +- .../entity/proxy/ProxyClientConnection.java | 3 +- .../com/c2kernel/entity/proxy/ProxyManager.java | 72 +------------- .../com/c2kernel/entity/proxy/ProxyServer.java | 106 +++++++++++++++++++++ src/main/java/com/c2kernel/lookup/LDAPLookup.java | 5 +- .../persistency/ClusterStorageManager.java | 5 +- src/main/java/com/c2kernel/process/Gateway.java | 21 ++-- 7 files changed, 129 insertions(+), 85 deletions(-) create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyServer.java (limited to 'src') diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 355acd8..f3a2f44 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -392,7 +392,7 @@ public class ItemProxy public void notify(ProxyMessage message) { Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); synchronized (this){ - if (!message.getServer().equals(ProxyManager.serverName)) + if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName())) Gateway.getStorage().clearCache(mSystemKey, message.getPath()); for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { MemberSubscription newSub = e.next(); diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 5abdb16..3a7e129 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Iterator; import com.c2kernel.common.InvalidDataException; +import com.c2kernel.process.Gateway; import com.c2kernel.utils.Logger; import com.c2kernel.utils.server.SocketHandler; @@ -36,7 +37,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() { super(); thisClientId = ++clientId; - ProxyManager.registerProxyClient(this); + Gateway.getProxyServer().registerProxyClient(this); Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java index d19e38f..b217f3e 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -26,7 +26,6 @@ import com.c2kernel.process.Gateway; import com.c2kernel.property.Property; import com.c2kernel.utils.Logger; import com.c2kernel.utils.SoftCache; -import com.c2kernel.utils.server.SimpleTCPIPServer; public class ProxyManager @@ -35,11 +34,6 @@ public class ProxyManager HashMap treeSubscribers = new HashMap(); HashMap connections = new HashMap(); - // server objects - static ArrayList proxyClients = new ArrayList(); - static SimpleTCPIPServer proxyServer = null; - static String serverName = null; - /** * Create a proxy manager to listen for proxy events and reap unused proxies */ @@ -268,70 +262,6 @@ public class ProxyManager } - /************************************************************************** - * Static Proxy Server methods - **************************************************************************/ - - /** - * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops - * @param c2kProps - */ - public static void initServer() - { - Logger.msg(5, "ProxyManager::initServer - Starting....."); - int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); - serverName = Gateway.getProperties().getProperty("ItemServer.name"); - if (port == 0) { - Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); - return; - } - - // set up the proxy server - try { - Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); - proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); - proxyServer.startListening(); - } catch (Exception ex) { - Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); - Logger.error(ex); - } - } - - public static void sendProxyEvent(ProxyMessage message) { - if (proxyServer != null && message.getPath() != null) - synchronized(proxyClients) { - for (ProxyClientConnection client : proxyClients) { - client.sendMessage(message); - } - } - } - - public static void reportConnections(int logLevel) { - synchronized(proxyClients) { - Logger.msg(logLevel, "Currently connected proxy clients:"); - for (ProxyClientConnection client : proxyClients) { - Logger.msg(logLevel, " "+client); - } - } - } - - public static void shutdownServer() { - if (proxyServer != null) { - Logger.msg(1, "ProxyManager: Closing Server."); - proxyServer.stopListening(); - } - } - - public static void registerProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.add(client); - } - } - - public static void unRegisterProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.remove(client); - } - } + } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java new file mode 100644 index 0000000..c576cda --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java @@ -0,0 +1,106 @@ +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; + +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.server.SimpleTCPIPServer; + +public class ProxyServer implements Runnable { + + // server objects + ArrayList proxyClients; + SimpleTCPIPServer proxyListener = null; + String serverName = null; + boolean keepRunning = true; + LinkedBlockingQueue messageQueue; + + public ProxyServer(String serverName) { + Logger.msg(5, "ProxyManager::initServer - Starting....."); + int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); + this.serverName = serverName; + this.proxyClients = new ArrayList(); + this.messageQueue = new LinkedBlockingQueue(); + + if (port == 0) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); + return; + } + + // set up the proxy server + try { + Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); + proxyListener = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); + proxyListener.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); + Logger.error(ex); + } + // start the message queue delivery thread + new Thread(this).start(); + } + + @Override + public void run() { + + while(keepRunning) { + ProxyMessage message = messageQueue.poll(); + if (message != null) { + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } else + try { + synchronized(this) { wait(); } + } catch (InterruptedException e) { } + } + + } + + public String getServerName() { + return serverName; + } + + public void sendProxyEvent(ProxyMessage message) { + try { + synchronized(this) { + messageQueue.put(message); + notify(); + } + } catch (InterruptedException e) { } + } + + public void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public void shutdownServer() { + Logger.msg(1, "ProxyManager: Closing Server."); + proxyListener.stopListening(); + synchronized(this) { + keepRunning = false; + notify(); + } + } + + public void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } + +} diff --git a/src/main/java/com/c2kernel/lookup/LDAPLookup.java b/src/main/java/com/c2kernel/lookup/LDAPLookup.java index 116362e..eae803b 100644 --- a/src/main/java/com/c2kernel/lookup/LDAPLookup.java +++ b/src/main/java/com/c2kernel/lookup/LDAPLookup.java @@ -12,7 +12,6 @@ import com.c2kernel.common.ObjectCannotBeUpdated; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.TraceableEntity; import com.c2kernel.entity.agent.ActiveEntity; -import com.c2kernel.entity.proxy.ProxyManager; import com.c2kernel.entity.proxy.ProxyMessage; import com.c2kernel.process.Gateway; import com.c2kernel.property.PropertyDescription; @@ -231,7 +230,7 @@ public class LDAPLookup LDAPEntry newEntry = new LDAPEntry(path.getFullDN(),attrSet); LDAPLookupUtils.addEntry(getConnection(),newEntry); if (path instanceof DomainPath) - ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED)); + Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED)); return newEntry; } catch (LDAPException ex) { if (ex.getResultCode() == LDAPException.ENTRY_ALREADY_EXISTS) @@ -251,7 +250,7 @@ public class LDAPLookup throw new ObjectCannotBeUpdated(ex.getLDAPErrorMessage(), ""); } if (path instanceof DomainPath) { - ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED)); + Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED)); } } diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java index d0c3f77..20857c6 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -10,7 +10,6 @@ import java.util.StringTokenizer; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.agent.JobList; -import com.c2kernel.entity.proxy.ProxyManager; import com.c2kernel.entity.proxy.ProxyMessage; import com.c2kernel.events.History; import com.c2kernel.persistency.outcome.Outcome; @@ -291,7 +290,7 @@ public class ClusterStorageManager { if (Logger.doLog(9)) dumpCacheContents(9); // transmit proxy event - ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); } /** Deletes a cluster from all writers */ @@ -317,7 +316,7 @@ public class ClusterStorageManager { // transmit proxy event - ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); } public void clearCache(Integer sysKeyIntObj, String path) { diff --git a/src/main/java/com/c2kernel/process/Gateway.java b/src/main/java/com/c2kernel/process/Gateway.java index 6c7b68d..01cc202 100644 --- a/src/main/java/com/c2kernel/process/Gateway.java +++ b/src/main/java/com/c2kernel/process/Gateway.java @@ -14,6 +14,7 @@ import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.CorbaServer; import com.c2kernel.entity.proxy.AgentProxy; import com.c2kernel.entity.proxy.ProxyManager; +import com.c2kernel.entity.proxy.ProxyServer; import com.c2kernel.lookup.AgentPath; import com.c2kernel.lookup.LDAPLookup; import com.c2kernel.lookup.LDAPProperties; @@ -57,7 +58,8 @@ public class Gateway static private boolean orbDestroyed = false; static private LDAPLookup mLDAPLookup; static private TransactionManager mStorage; - static private ProxyManager mProxyManager; + static private ProxyManager mProxyManager; + static private ProxyServer mProxyServer; static private CorbaServer mCorbaServer; static private CastorXMLUtility mMarshaller; static private AgentProxy mCurrentUser = null; @@ -151,13 +153,13 @@ public class Gateway mLDAPLookup.install(); // start entity proxy server - ProxyManager.initServer(); + mProxyServer = new ProxyServer(mC2KProps.getProperty("ItemServer.name")); // Init ORB - set various config - String serverName = getProperty("ItemServer.name"); + String serverName = mC2KProps.getProperty("ItemServer.name"); if (serverName != null) mC2KProps.put("com.sun.CORBA.ORBServerHost", serverName); - String serverPort = getProperty("ItemServer.iiop", "1500"); + String serverPort = mC2KProps.getProperty("ItemServer.iiop", "1500"); mC2KProps.put("com.sun.CORBA.ORBServerPort", serverPort); //TODO: externalize this (or replace corba completely) mC2KProps.put("com.sun.CORBA.POA.ORBServerId", "1"); @@ -356,11 +358,13 @@ public class Gateway mLDAPLookup.disconnect(); mLDAPLookup = null; - // shut down proxy manager + // shut down proxy manager & server + if (mProxyServer != null) + mProxyServer.shutdownServer(); if (mProxyManager != null) mProxyManager.shutdown(); mProxyManager = null; - ProxyManager.shutdownServer(); + // close log consoles Logger.closeConsole(); @@ -410,6 +414,11 @@ public class Gateway return mProxyManager; } + + public static ProxyServer getProxyServer() { + return mProxyServer; + } + static public String getCentreId() { return getProperty("LocalCentre"); } -- cgit v1.2.3