From a1f0ecbb6a2bea6aa214322c412af2f3c5ce124b Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 7 May 2014 17:33:13 +0200 Subject: Agent now extends Item, so they can have workflows. All traces of the old 'Entity' superclasses should be removed, including proxies and paths. Very large change, breaks API compatibility with CRISTAL 2.x. Fixes #135 --- src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java') diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 9687f22..5abdb16 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -36,7 +36,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() { super(); thisClientId = ++clientId; - EntityProxyManager.registerProxyClient(this); + ProxyManager.registerProxyClient(this); Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); } -- cgit v1.2.3 From 40ef76037aeac4ee2b9d857a092e5ea026c0bb5c Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 14 May 2014 13:23:26 +0200 Subject: Refactored Proxy update notification server into its own class and thread, so executions return before proxy messages are sent. Another deadlock suspect. --- .../java/com/c2kernel/entity/proxy/ItemProxy.java | 2 +- .../entity/proxy/ProxyClientConnection.java | 3 +- .../com/c2kernel/entity/proxy/ProxyManager.java | 72 +------------- .../com/c2kernel/entity/proxy/ProxyServer.java | 106 +++++++++++++++++++++ src/main/java/com/c2kernel/lookup/LDAPLookup.java | 5 +- .../persistency/ClusterStorageManager.java | 5 +- src/main/java/com/c2kernel/process/Gateway.java | 21 ++-- 7 files changed, 129 insertions(+), 85 deletions(-) create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyServer.java (limited to 'src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java') diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 355acd8..f3a2f44 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -392,7 +392,7 @@ public class ItemProxy public void notify(ProxyMessage message) { Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); synchronized (this){ - if (!message.getServer().equals(ProxyManager.serverName)) + if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName())) Gateway.getStorage().clearCache(mSystemKey, message.getPath()); for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { MemberSubscription newSub = e.next(); diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 5abdb16..3a7e129 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Iterator; import com.c2kernel.common.InvalidDataException; +import com.c2kernel.process.Gateway; import com.c2kernel.utils.Logger; import com.c2kernel.utils.server.SocketHandler; @@ -36,7 +37,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() { super(); thisClientId = ++clientId; - ProxyManager.registerProxyClient(this); + Gateway.getProxyServer().registerProxyClient(this); Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java index d19e38f..b217f3e 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -26,7 +26,6 @@ import com.c2kernel.process.Gateway; import com.c2kernel.property.Property; import com.c2kernel.utils.Logger; import com.c2kernel.utils.SoftCache; -import com.c2kernel.utils.server.SimpleTCPIPServer; public class ProxyManager @@ -35,11 +34,6 @@ public class ProxyManager HashMap treeSubscribers = new HashMap(); HashMap connections = new HashMap(); - // server objects - static ArrayList proxyClients = new ArrayList(); - static SimpleTCPIPServer proxyServer = null; - static String serverName = null; - /** * Create a proxy manager to listen for proxy events and reap unused proxies */ @@ -268,70 +262,6 @@ public class ProxyManager } - /************************************************************************** - * Static Proxy Server methods - **************************************************************************/ - - /** - * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops - * @param c2kProps - */ - public static void initServer() - { - Logger.msg(5, "ProxyManager::initServer - Starting....."); - int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); - serverName = Gateway.getProperties().getProperty("ItemServer.name"); - if (port == 0) { - Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); - return; - } - - // set up the proxy server - try { - Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); - proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); - proxyServer.startListening(); - } catch (Exception ex) { - Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); - Logger.error(ex); - } - } - - public static void sendProxyEvent(ProxyMessage message) { - if (proxyServer != null && message.getPath() != null) - synchronized(proxyClients) { - for (ProxyClientConnection client : proxyClients) { - client.sendMessage(message); - } - } - } - - public static void reportConnections(int logLevel) { - synchronized(proxyClients) { - Logger.msg(logLevel, "Currently connected proxy clients:"); - for (ProxyClientConnection client : proxyClients) { - Logger.msg(logLevel, " "+client); - } - } - } - - public static void shutdownServer() { - if (proxyServer != null) { - Logger.msg(1, "ProxyManager: Closing Server."); - proxyServer.stopListening(); - } - } - - public static void registerProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.add(client); - } - } - - public static void unRegisterProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.remove(client); - } - } + } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java new file mode 100644 index 0000000..c576cda --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java @@ -0,0 +1,106 @@ +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; + +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.server.SimpleTCPIPServer; + +public class ProxyServer implements Runnable { + + // server objects + ArrayList proxyClients; + SimpleTCPIPServer proxyListener = null; + String serverName = null; + boolean keepRunning = true; + LinkedBlockingQueue messageQueue; + + public ProxyServer(String serverName) { + Logger.msg(5, "ProxyManager::initServer - Starting....."); + int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); + this.serverName = serverName; + this.proxyClients = new ArrayList(); + this.messageQueue = new LinkedBlockingQueue(); + + if (port == 0) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); + return; + } + + // set up the proxy server + try { + Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); + proxyListener = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); + proxyListener.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); + Logger.error(ex); + } + // start the message queue delivery thread + new Thread(this).start(); + } + + @Override + public void run() { + + while(keepRunning) { + ProxyMessage message = messageQueue.poll(); + if (message != null) { + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } else + try { + synchronized(this) { wait(); } + } catch (InterruptedException e) { } + } + + } + + public String getServerName() { + return serverName; + } + + public void sendProxyEvent(ProxyMessage message) { + try { + synchronized(this) { + messageQueue.put(message); + notify(); + } + } catch (InterruptedException e) { } + } + + public void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public void shutdownServer() { + Logger.msg(1, "ProxyManager: Closing Server."); + proxyListener.stopListening(); + synchronized(this) { + keepRunning = false; + notify(); + } + } + + public void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } + +} diff --git a/src/main/java/com/c2kernel/lookup/LDAPLookup.java b/src/main/java/com/c2kernel/lookup/LDAPLookup.java index 116362e..eae803b 100644 --- a/src/main/java/com/c2kernel/lookup/LDAPLookup.java +++ b/src/main/java/com/c2kernel/lookup/LDAPLookup.java @@ -12,7 +12,6 @@ import com.c2kernel.common.ObjectCannotBeUpdated; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.TraceableEntity; import com.c2kernel.entity.agent.ActiveEntity; -import com.c2kernel.entity.proxy.ProxyManager; import com.c2kernel.entity.proxy.ProxyMessage; import com.c2kernel.process.Gateway; import com.c2kernel.property.PropertyDescription; @@ -231,7 +230,7 @@ public class LDAPLookup LDAPEntry newEntry = new LDAPEntry(path.getFullDN(),attrSet); LDAPLookupUtils.addEntry(getConnection(),newEntry); if (path instanceof DomainPath) - ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED)); + Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED)); return newEntry; } catch (LDAPException ex) { if (ex.getResultCode() == LDAPException.ENTRY_ALREADY_EXISTS) @@ -251,7 +250,7 @@ public class LDAPLookup throw new ObjectCannotBeUpdated(ex.getLDAPErrorMessage(), ""); } if (path instanceof DomainPath) { - ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED)); + Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED)); } } diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java index d0c3f77..20857c6 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -10,7 +10,6 @@ import java.util.StringTokenizer; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.agent.JobList; -import com.c2kernel.entity.proxy.ProxyManager; import com.c2kernel.entity.proxy.ProxyMessage; import com.c2kernel.events.History; import com.c2kernel.persistency.outcome.Outcome; @@ -291,7 +290,7 @@ public class ClusterStorageManager { if (Logger.doLog(9)) dumpCacheContents(9); // transmit proxy event - ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED)); } /** Deletes a cluster from all writers */ @@ -317,7 +316,7 @@ public class ClusterStorageManager { // transmit proxy event - ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); + Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED)); } public void clearCache(Integer sysKeyIntObj, String path) { diff --git a/src/main/java/com/c2kernel/process/Gateway.java b/src/main/java/com/c2kernel/process/Gateway.java index 6c7b68d..01cc202 100644 --- a/src/main/java/com/c2kernel/process/Gateway.java +++ b/src/main/java/com/c2kernel/process/Gateway.java @@ -14,6 +14,7 @@ import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.CorbaServer; import com.c2kernel.entity.proxy.AgentProxy; import com.c2kernel.entity.proxy.ProxyManager; +import com.c2kernel.entity.proxy.ProxyServer; import com.c2kernel.lookup.AgentPath; import com.c2kernel.lookup.LDAPLookup; import com.c2kernel.lookup.LDAPProperties; @@ -57,7 +58,8 @@ public class Gateway static private boolean orbDestroyed = false; static private LDAPLookup mLDAPLookup; static private TransactionManager mStorage; - static private ProxyManager mProxyManager; + static private ProxyManager mProxyManager; + static private ProxyServer mProxyServer; static private CorbaServer mCorbaServer; static private CastorXMLUtility mMarshaller; static private AgentProxy mCurrentUser = null; @@ -151,13 +153,13 @@ public class Gateway mLDAPLookup.install(); // start entity proxy server - ProxyManager.initServer(); + mProxyServer = new ProxyServer(mC2KProps.getProperty("ItemServer.name")); // Init ORB - set various config - String serverName = getProperty("ItemServer.name"); + String serverName = mC2KProps.getProperty("ItemServer.name"); if (serverName != null) mC2KProps.put("com.sun.CORBA.ORBServerHost", serverName); - String serverPort = getProperty("ItemServer.iiop", "1500"); + String serverPort = mC2KProps.getProperty("ItemServer.iiop", "1500"); mC2KProps.put("com.sun.CORBA.ORBServerPort", serverPort); //TODO: externalize this (or replace corba completely) mC2KProps.put("com.sun.CORBA.POA.ORBServerId", "1"); @@ -356,11 +358,13 @@ public class Gateway mLDAPLookup.disconnect(); mLDAPLookup = null; - // shut down proxy manager + // shut down proxy manager & server + if (mProxyServer != null) + mProxyServer.shutdownServer(); if (mProxyManager != null) mProxyManager.shutdown(); mProxyManager = null; - ProxyManager.shutdownServer(); + // close log consoles Logger.closeConsole(); @@ -410,6 +414,11 @@ public class Gateway return mProxyManager; } + + public static ProxyServer getProxyServer() { + return mProxyServer; + } + static public String getCentreId() { return getProperty("LocalCentre"); } -- cgit v1.2.3