diff options
| author | Andrew Branson <andrew.branson@cern.ch> | 2014-05-14 13:23:26 +0200 |
|---|---|---|
| committer | Andrew Branson <andrew.branson@cern.ch> | 2014-05-14 13:23:26 +0200 |
| commit | 40ef76037aeac4ee2b9d857a092e5ea026c0bb5c (patch) | |
| tree | 9b3a11ee847e4efe4099143098ca95b66bfb9ba2 | |
| parent | b8f1b6c330ba5117a608ae8113bad6a07d471881 (diff) | |
Refactored Proxy update notification server into its own class and
thread, so executions return before proxy messages are sent. Another
deadlock suspect.
7 files changed, 129 insertions, 85 deletions
diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 355acd8..f3a2f44 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -392,7 +392,7 @@ public class ItemProxy public void notify(ProxyMessage message) {
Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey);
synchronized (this){
- if (!message.getServer().equals(ProxyManager.serverName))
+ if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName()))
Gateway.getStorage().clearCache(mSystemKey, message.getPath());
for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
MemberSubscription<?> newSub = e.next();
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 5abdb16..3a7e129 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Iterator;
import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.process.Gateway;
import com.c2kernel.utils.Logger;
import com.c2kernel.utils.server.SocketHandler;
@@ -36,7 +37,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() {
super();
thisClientId = ++clientId;
- ProxyManager.registerProxyClient(this);
+ Gateway.getProxyServer().registerProxyClient(this);
Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready.");
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java index d19e38f..b217f3e 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -26,7 +26,6 @@ import com.c2kernel.process.Gateway; import com.c2kernel.property.Property;
import com.c2kernel.utils.Logger;
import com.c2kernel.utils.SoftCache;
-import com.c2kernel.utils.server.SimpleTCPIPServer;
public class ProxyManager
@@ -35,11 +34,6 @@ public class ProxyManager HashMap<DomainPathSubscriber, DomainPath> treeSubscribers = new HashMap<DomainPathSubscriber, DomainPath>();
HashMap<String, ProxyServerConnection> connections = new HashMap<String, ProxyServerConnection>();
- // server objects
- static ArrayList<ProxyClientConnection> proxyClients = new ArrayList<ProxyClientConnection>();
- static SimpleTCPIPServer proxyServer = null;
- static String serverName = null;
-
/**
* Create a proxy manager to listen for proxy events and reap unused proxies
*/
@@ -268,70 +262,6 @@ public class ProxyManager }
- /**************************************************************************
- * Static Proxy Server methods
- **************************************************************************/
-
- /**
- * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops
- * @param c2kProps
- */
- public static void initServer()
- {
- Logger.msg(5, "ProxyManager::initServer - Starting.....");
- int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0);
- serverName = Gateway.getProperties().getProperty("ItemServer.name");
- if (port == 0) {
- Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes.");
- return;
- }
-
- // set up the proxy server
- try {
- Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port);
- proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200);
- proxyServer.startListening();
- } catch (Exception ex) {
- Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes.");
- Logger.error(ex);
- }
- }
-
- public static void sendProxyEvent(ProxyMessage message) {
- if (proxyServer != null && message.getPath() != null)
- synchronized(proxyClients) {
- for (ProxyClientConnection client : proxyClients) {
- client.sendMessage(message);
- }
- }
- }
-
- public static void reportConnections(int logLevel) {
- synchronized(proxyClients) {
- Logger.msg(logLevel, "Currently connected proxy clients:");
- for (ProxyClientConnection client : proxyClients) {
- Logger.msg(logLevel, " "+client);
- }
- }
- }
-
- public static void shutdownServer() {
- if (proxyServer != null) {
- Logger.msg(1, "ProxyManager: Closing Server.");
- proxyServer.stopListening();
- }
- }
-
- public static void registerProxyClient(ProxyClientConnection client) {
- synchronized(proxyClients) {
- proxyClients.add(client);
- }
- }
-
- public static void unRegisterProxyClient(ProxyClientConnection client) {
- synchronized(proxyClients) {
- proxyClients.remove(client);
- }
- }
+
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java new file mode 100644 index 0000000..c576cda --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java @@ -0,0 +1,106 @@ +package com.c2kernel.entity.proxy;
+
+import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.server.SimpleTCPIPServer;
+
+public class ProxyServer implements Runnable {
+
+ // server objects
+ ArrayList<ProxyClientConnection> proxyClients;
+ SimpleTCPIPServer proxyListener = null;
+ String serverName = null;
+ boolean keepRunning = true;
+ LinkedBlockingQueue<ProxyMessage> messageQueue;
+
+ public ProxyServer(String serverName) {
+ Logger.msg(5, "ProxyManager::initServer - Starting.....");
+ int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0);
+ this.serverName = serverName;
+ this.proxyClients = new ArrayList<ProxyClientConnection>();
+ this.messageQueue = new LinkedBlockingQueue<ProxyMessage>();
+
+ if (port == 0) {
+ Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes.");
+ return;
+ }
+
+ // set up the proxy server
+ try {
+ Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port);
+ proxyListener = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200);
+ proxyListener.startListening();
+ } catch (Exception ex) {
+ Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes.");
+ Logger.error(ex);
+ }
+ // start the message queue delivery thread
+ new Thread(this).start();
+ }
+
+ @Override
+ public void run() {
+
+ while(keepRunning) {
+ ProxyMessage message = messageQueue.poll();
+ if (message != null) {
+ synchronized(proxyClients) {
+ for (ProxyClientConnection client : proxyClients) {
+ client.sendMessage(message);
+ }
+ }
+ } else
+ try {
+ synchronized(this) { wait(); }
+ } catch (InterruptedException e) { }
+ }
+
+ }
+
+ public String getServerName() {
+ return serverName;
+ }
+
+ public void sendProxyEvent(ProxyMessage message) {
+ try {
+ synchronized(this) {
+ messageQueue.put(message);
+ notify();
+ }
+ } catch (InterruptedException e) { }
+ }
+
+ public void reportConnections(int logLevel) {
+ synchronized(proxyClients) {
+ Logger.msg(logLevel, "Currently connected proxy clients:");
+ for (ProxyClientConnection client : proxyClients) {
+ Logger.msg(logLevel, " "+client);
+ }
+ }
+ }
+
+ public void shutdownServer() {
+ Logger.msg(1, "ProxyManager: Closing Server.");
+ proxyListener.stopListening();
+ synchronized(this) {
+ keepRunning = false;
+ notify();
+ }
+ }
+
+ public void registerProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.add(client);
+ }
+ }
+
+ public void unRegisterProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.remove(client);
+ }
+ }
+
+}
diff --git a/src/main/java/com/c2kernel/lookup/LDAPLookup.java b/src/main/java/com/c2kernel/lookup/LDAPLookup.java index 116362e..eae803b 100644 --- a/src/main/java/com/c2kernel/lookup/LDAPLookup.java +++ b/src/main/java/com/c2kernel/lookup/LDAPLookup.java @@ -12,7 +12,6 @@ import com.c2kernel.common.ObjectCannotBeUpdated; import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.entity.TraceableEntity;
import com.c2kernel.entity.agent.ActiveEntity;
-import com.c2kernel.entity.proxy.ProxyManager;
import com.c2kernel.entity.proxy.ProxyMessage;
import com.c2kernel.process.Gateway;
import com.c2kernel.property.PropertyDescription;
@@ -231,7 +230,7 @@ public class LDAPLookup LDAPEntry newEntry = new LDAPEntry(path.getFullDN(),attrSet);
LDAPLookupUtils.addEntry(getConnection(),newEntry);
if (path instanceof DomainPath)
- ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED));
+ Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.ADDED));
return newEntry;
} catch (LDAPException ex) {
if (ex.getResultCode() == LDAPException.ENTRY_ALREADY_EXISTS)
@@ -251,7 +250,7 @@ public class LDAPLookup throw new ObjectCannotBeUpdated(ex.getLDAPErrorMessage(), "");
}
if (path instanceof DomainPath) {
- ProxyManager.sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED));
+ Gateway.getProxyServer().sendProxyEvent(new ProxyMessage(ProxyMessage.NA, path.toString(), ProxyMessage.DELETED));
}
}
diff --git a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java index d0c3f77..20857c6 100644 --- a/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java +++ b/src/main/java/com/c2kernel/persistency/ClusterStorageManager.java @@ -10,7 +10,6 @@ import java.util.StringTokenizer; import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.entity.agent.JobList;
-import com.c2kernel.entity.proxy.ProxyManager;
import com.c2kernel.entity.proxy.ProxyMessage;
import com.c2kernel.events.History;
import com.c2kernel.persistency.outcome.Outcome;
@@ -291,7 +290,7 @@ public class ClusterStorageManager { if (Logger.doLog(9)) dumpCacheContents(9);
// transmit proxy event
- ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.ADDED));
}
/** Deletes a cluster from all writers */
@@ -317,7 +316,7 @@ public class ClusterStorageManager { // transmit proxy event
- ProxyManager.sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED));
+ Gateway.getProxyServer().sendProxyEvent( new ProxyMessage(sysKeyIntObj.intValue(), path, ProxyMessage.DELETED));
}
public void clearCache(Integer sysKeyIntObj, String path) {
diff --git a/src/main/java/com/c2kernel/process/Gateway.java b/src/main/java/com/c2kernel/process/Gateway.java index 6c7b68d..01cc202 100644 --- a/src/main/java/com/c2kernel/process/Gateway.java +++ b/src/main/java/com/c2kernel/process/Gateway.java @@ -14,6 +14,7 @@ import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.CorbaServer;
import com.c2kernel.entity.proxy.AgentProxy;
import com.c2kernel.entity.proxy.ProxyManager;
+import com.c2kernel.entity.proxy.ProxyServer;
import com.c2kernel.lookup.AgentPath;
import com.c2kernel.lookup.LDAPLookup;
import com.c2kernel.lookup.LDAPProperties;
@@ -57,7 +58,8 @@ public class Gateway static private boolean orbDestroyed = false;
static private LDAPLookup mLDAPLookup;
static private TransactionManager mStorage;
- static private ProxyManager mProxyManager;
+ static private ProxyManager mProxyManager;
+ static private ProxyServer mProxyServer;
static private CorbaServer mCorbaServer;
static private CastorXMLUtility mMarshaller;
static private AgentProxy mCurrentUser = null;
@@ -151,13 +153,13 @@ public class Gateway mLDAPLookup.install();
// start entity proxy server
- ProxyManager.initServer();
+ mProxyServer = new ProxyServer(mC2KProps.getProperty("ItemServer.name"));
// Init ORB - set various config
- String serverName = getProperty("ItemServer.name");
+ String serverName = mC2KProps.getProperty("ItemServer.name");
if (serverName != null)
mC2KProps.put("com.sun.CORBA.ORBServerHost", serverName);
- String serverPort = getProperty("ItemServer.iiop", "1500");
+ String serverPort = mC2KProps.getProperty("ItemServer.iiop", "1500");
mC2KProps.put("com.sun.CORBA.ORBServerPort", serverPort);
//TODO: externalize this (or replace corba completely)
mC2KProps.put("com.sun.CORBA.POA.ORBServerId", "1");
@@ -356,11 +358,13 @@ public class Gateway mLDAPLookup.disconnect();
mLDAPLookup = null;
- // shut down proxy manager
+ // shut down proxy manager & server
+ if (mProxyServer != null)
+ mProxyServer.shutdownServer();
if (mProxyManager != null)
mProxyManager.shutdown();
mProxyManager = null;
- ProxyManager.shutdownServer();
+
// close log consoles
Logger.closeConsole();
@@ -410,6 +414,11 @@ public class Gateway return mProxyManager;
}
+
+ public static ProxyServer getProxyServer() {
+ return mProxyServer;
+ }
+
static public String getCentreId() {
return getProperty("LocalCentre");
}
|
