diff options
Diffstat (limited to 'src/main/java/com/c2kernel/entity/proxy')
4 files changed, 110 insertions, 73 deletions
diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 355acd8..f3a2f44 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -392,7 +392,7 @@ public class ItemProxy public void notify(ProxyMessage message) {
Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey);
synchronized (this){
- if (!message.getServer().equals(ProxyManager.serverName))
+ if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName()))
Gateway.getStorage().clearCache(mSystemKey, message.getPath());
for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
MemberSubscription<?> newSub = e.next();
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 5abdb16..3a7e129 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Iterator;
import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.process.Gateway;
import com.c2kernel.utils.Logger;
import com.c2kernel.utils.server.SocketHandler;
@@ -36,7 +37,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() {
super();
thisClientId = ++clientId;
- ProxyManager.registerProxyClient(this);
+ Gateway.getProxyServer().registerProxyClient(this);
Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready.");
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java index d19e38f..b217f3e 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -26,7 +26,6 @@ import com.c2kernel.process.Gateway; import com.c2kernel.property.Property;
import com.c2kernel.utils.Logger;
import com.c2kernel.utils.SoftCache;
-import com.c2kernel.utils.server.SimpleTCPIPServer;
public class ProxyManager
@@ -35,11 +34,6 @@ public class ProxyManager HashMap<DomainPathSubscriber, DomainPath> treeSubscribers = new HashMap<DomainPathSubscriber, DomainPath>();
HashMap<String, ProxyServerConnection> connections = new HashMap<String, ProxyServerConnection>();
- // server objects
- static ArrayList<ProxyClientConnection> proxyClients = new ArrayList<ProxyClientConnection>();
- static SimpleTCPIPServer proxyServer = null;
- static String serverName = null;
-
/**
* Create a proxy manager to listen for proxy events and reap unused proxies
*/
@@ -268,70 +262,6 @@ public class ProxyManager }
- /**************************************************************************
- * Static Proxy Server methods
- **************************************************************************/
-
- /**
- * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops
- * @param c2kProps
- */
- public static void initServer()
- {
- Logger.msg(5, "ProxyManager::initServer - Starting.....");
- int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0);
- serverName = Gateway.getProperties().getProperty("ItemServer.name");
- if (port == 0) {
- Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes.");
- return;
- }
-
- // set up the proxy server
- try {
- Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port);
- proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200);
- proxyServer.startListening();
- } catch (Exception ex) {
- Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes.");
- Logger.error(ex);
- }
- }
-
- public static void sendProxyEvent(ProxyMessage message) {
- if (proxyServer != null && message.getPath() != null)
- synchronized(proxyClients) {
- for (ProxyClientConnection client : proxyClients) {
- client.sendMessage(message);
- }
- }
- }
-
- public static void reportConnections(int logLevel) {
- synchronized(proxyClients) {
- Logger.msg(logLevel, "Currently connected proxy clients:");
- for (ProxyClientConnection client : proxyClients) {
- Logger.msg(logLevel, " "+client);
- }
- }
- }
-
- public static void shutdownServer() {
- if (proxyServer != null) {
- Logger.msg(1, "ProxyManager: Closing Server.");
- proxyServer.stopListening();
- }
- }
-
- public static void registerProxyClient(ProxyClientConnection client) {
- synchronized(proxyClients) {
- proxyClients.add(client);
- }
- }
-
- public static void unRegisterProxyClient(ProxyClientConnection client) {
- synchronized(proxyClients) {
- proxyClients.remove(client);
- }
- }
+
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java new file mode 100644 index 0000000..c576cda --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServer.java @@ -0,0 +1,106 @@ +package com.c2kernel.entity.proxy;
+
+import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.server.SimpleTCPIPServer;
+
+public class ProxyServer implements Runnable {
+
+ // server objects
+ ArrayList<ProxyClientConnection> proxyClients;
+ SimpleTCPIPServer proxyListener = null;
+ String serverName = null;
+ boolean keepRunning = true;
+ LinkedBlockingQueue<ProxyMessage> messageQueue;
+
+ public ProxyServer(String serverName) {
+ Logger.msg(5, "ProxyManager::initServer - Starting.....");
+ int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0);
+ this.serverName = serverName;
+ this.proxyClients = new ArrayList<ProxyClientConnection>();
+ this.messageQueue = new LinkedBlockingQueue<ProxyMessage>();
+
+ if (port == 0) {
+ Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes.");
+ return;
+ }
+
+ // set up the proxy server
+ try {
+ Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port);
+ proxyListener = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200);
+ proxyListener.startListening();
+ } catch (Exception ex) {
+ Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes.");
+ Logger.error(ex);
+ }
+ // start the message queue delivery thread
+ new Thread(this).start();
+ }
+
+ @Override
+ public void run() {
+
+ while(keepRunning) {
+ ProxyMessage message = messageQueue.poll();
+ if (message != null) {
+ synchronized(proxyClients) {
+ for (ProxyClientConnection client : proxyClients) {
+ client.sendMessage(message);
+ }
+ }
+ } else
+ try {
+ synchronized(this) { wait(); }
+ } catch (InterruptedException e) { }
+ }
+
+ }
+
+ public String getServerName() {
+ return serverName;
+ }
+
+ public void sendProxyEvent(ProxyMessage message) {
+ try {
+ synchronized(this) {
+ messageQueue.put(message);
+ notify();
+ }
+ } catch (InterruptedException e) { }
+ }
+
+ public void reportConnections(int logLevel) {
+ synchronized(proxyClients) {
+ Logger.msg(logLevel, "Currently connected proxy clients:");
+ for (ProxyClientConnection client : proxyClients) {
+ Logger.msg(logLevel, " "+client);
+ }
+ }
+ }
+
+ public void shutdownServer() {
+ Logger.msg(1, "ProxyManager: Closing Server.");
+ proxyListener.stopListening();
+ synchronized(this) {
+ keepRunning = false;
+ notify();
+ }
+ }
+
+ public void registerProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.add(client);
+ }
+ }
+
+ public void unRegisterProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.remove(client);
+ }
+ }
+
+}
|
