summaryrefslogtreecommitdiff
path: root/source/com/c2kernel/entity/proxy/EntityProxyManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'source/com/c2kernel/entity/proxy/EntityProxyManager.java')
-rwxr-xr-xsource/com/c2kernel/entity/proxy/EntityProxyManager.java341
1 files changed, 341 insertions, 0 deletions
diff --git a/source/com/c2kernel/entity/proxy/EntityProxyManager.java b/source/com/c2kernel/entity/proxy/EntityProxyManager.java
new file mode 100755
index 0000000..386bc2c
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/EntityProxyManager.java
@@ -0,0 +1,341 @@
+/**************************************************************************
+ * EntityProxyFactory.java
+ *
+ * $Revision: 1.45 $
+ * $Date: 2005/05/10 11:40:09 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.proxy;
+
+import java.util.*;
+
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.DomainPath;
+import com.c2kernel.lookup.Path;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.property.Property;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.SoftCache;
+import com.c2kernel.utils.server.SimpleTCPIPServer;
+
+
+public class EntityProxyManager
+{
+ SoftCache proxyPool = new SoftCache(50);
+ HashMap treeSubscribers = new HashMap();
+ HashMap connections = new HashMap();
+
+ // server objects
+ static ArrayList proxyClients = new ArrayList();
+ static SimpleTCPIPServer proxyServer = null;
+ static String serverName = null;
+
+ /**
+ * Create an entity proxy manager to listen for proxy events and reap unused proxies
+ */
+ public EntityProxyManager()
+ {
+ Logger.msg(5, "EntityProxyManager - Starting.....");
+
+ Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers"));
+ while(servers.hasMoreElements()) {
+ Path thisServerPath = (Path)servers.nextElement();
+ try {
+ int syskey = thisServerPath.getSysKey();
+ String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue();
+ String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue();
+ int remotePort = Integer.parseInt(portStr);
+ connectToProxyServer(remoteServer, remotePort);
+
+ } catch (Exception ex) {
+ Logger.error("Exception retrieving proxy server connection data for "+thisServerPath);
+ Logger.error(ex);
+ }
+ }
+ }
+
+ public void connectToProxyServer(String name, int port) {
+ ProxyServerConnection oldConn = (ProxyServerConnection)connections.get(name);
+ if (oldConn != null)
+ oldConn.shutdown();
+ connections.put(name, new ProxyServerConnection(name, port, this));
+ }
+
+
+ protected void resubscribe(ProxyServerConnection conn) {
+ synchronized (proxyPool) {
+ for (Iterator iter = proxyPool.keySet().iterator(); iter.hasNext();) {
+ Integer key = (Integer)iter.next();
+ ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false);
+ Logger.msg(5, "Subscribing to entity "+key);
+ conn.sendMessage(sub);
+ }
+ }
+ }
+
+ /**
+ * @param sub
+ */
+ private void sendMessage(ProxyMessage sub) {
+ for (Iterator iter = connections.values().iterator(); iter.hasNext();) {
+ ProxyServerConnection element = (ProxyServerConnection) iter.next();
+ element.sendMessage(sub);
+ }
+
+ }
+
+ public void shutdown() {
+ Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections");
+ for (Iterator iter = connections.values().iterator(); iter.hasNext();) {
+ ProxyServerConnection element = (ProxyServerConnection) iter.next();
+ element.shutdown();
+ }
+ }
+
+ protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException {
+ if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString());
+
+ if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response
+ return;
+
+ if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info
+ informTreeSubscribers(thisMessage.getState(), thisMessage.getPath());
+ else {
+ // proper proxy message
+ Logger.msg(5, "Received proxy message: "+thisMessage.toString());
+ Integer key = new Integer(thisMessage.getSysKey());
+ EntityProxy relevant = (EntityProxy)proxyPool.get(key);
+ if (relevant == null)
+ Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for.");
+ else
+ try {
+ relevant.notify(thisMessage);
+ } catch (Throwable ex) {
+ Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString());
+ Logger.error(ex);
+ }
+ }
+ }
+
+ private void informTreeSubscribers(boolean state, String path) {
+ DomainPath last = new DomainPath(path);
+ DomainPath parent; boolean first = true;
+ synchronized(treeSubscribers) {
+ while((parent = last.getParent()) != null) {
+
+ for (Iterator iter = treeSubscribers.keySet().iterator(); iter.hasNext();) {
+ DomainPathSubscriber sub = (DomainPathSubscriber)iter.next();
+ DomainPath interest = (DomainPath)treeSubscribers.get(sub);
+ if (interest.equals(parent)) {
+ if (state == ProxyMessage.ADDED)
+ sub.pathAdded(last);
+ else if (first)
+ sub.pathRemoved(last);
+ }
+ }
+ last = parent;
+ first = false;
+ }
+ }
+ }
+
+ public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) {
+ synchronized(treeSubscribers) {
+ treeSubscribers.put(sub, interest);
+ }
+ }
+
+ public void unsubscribeTree(DomainPathSubscriber sub) {
+ synchronized(treeSubscribers) {
+ treeSubscribers.remove(sub);
+ }
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ private EntityProxy createProxy( org.omg.CORBA.Object ior,
+ int systemKey,
+ boolean isItem )
+ throws ObjectNotFoundException
+ {
+
+ EntityProxy newProxy = null;
+
+ Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey);
+
+ if( isItem )
+ {
+ newProxy = new ItemProxy(ior, systemKey);
+ }
+ else
+ {
+ newProxy = new AgentProxy(ior, systemKey);
+ }
+
+ // subscribe to changes from server
+ ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false);
+ sendMessage(sub);
+ reportCurrentProxies(9);
+ return ( newProxy );
+ }
+
+ protected void removeProxy( int systemKey )
+ {
+ ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true);
+ Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey);
+ sendMessage(sub);
+ }
+
+
+ /**************************************************************************
+ * EntityProxy getProxy( ManageableEntity, SystemKey)
+ *
+ * Called by the other GetProxy methods. Fills in either the ior or the
+ * SystemKey
+ **************************************************************************/
+ private EntityProxy getProxy( org.omg.CORBA.Object ior,
+ int systemKey,
+ boolean isItem )
+ throws ObjectNotFoundException
+ {
+ Integer key = new Integer(systemKey);
+
+ synchronized(proxyPool) {
+ EntityProxy newProxy;
+ // return it if it exists
+ newProxy = (EntityProxy)proxyPool.get(key);
+ if (newProxy == null) {
+ // create a new one
+ newProxy = createProxy(ior, systemKey, isItem );
+ proxyPool.put(key, newProxy);
+ }
+ return newProxy;
+
+ }
+ }
+
+ /**************************************************************************
+ * EntityProxy getProxy( String )
+ *
+ * Proxy from Alias
+ **************************************************************************/
+ public EntityProxy getProxy( Path path )
+ throws ObjectNotFoundException
+ {
+
+ //convert namePath to dn format
+ Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")");
+ boolean isItem = !(path.getEntity() instanceof AgentPath);
+ return getProxy( Gateway.getLDAPLookup().getIOR(path),
+ path.getSysKey(),
+ isItem );
+
+ }
+
+ /**************************************************************************
+ * void reportCurrentProxies()
+ *
+ * A utility to Dump the current proxies loaded
+ **************************************************************************/
+ public void reportCurrentProxies(int logLevel)
+ {
+ if (!Logger.doLog(logLevel)) return;
+ Logger.msg(logLevel, "Current proxies: ");
+ try {
+ synchronized(proxyPool) {
+ Iterator i = proxyPool.keySet().iterator();
+
+ for( int count=0; i.hasNext(); count++ )
+ {
+ Integer nextProxy = (Integer)i.next();
+ EntityProxy thisProxy = (EntityProxy)proxyPool.get(nextProxy);
+ if (thisProxy != null)
+ Logger.msg(logLevel,
+ "" + count + ": "
+ + proxyPool.get(nextProxy).getClass().getName()
+ + ": " + nextProxy);
+ }
+ }
+ } catch (ConcurrentModificationException ex) {
+ Logger.msg(logLevel, "Proxy cache modified. Aborting.");
+ }
+ }
+
+
+ /**************************************************************************
+ * Static Proxy Server methods
+ **************************************************************************/
+
+ /**
+ * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops
+ * @param c2kProps
+ */
+ public static void initServer()
+ {
+ Logger.msg(5, "EntityProxyFactory::initServer - Starting.....");
+ String port = Gateway.getProperty("ItemServer.Proxy.port");
+ serverName = Gateway.getProperty("ItemServer.name");
+ if (port == null) {
+ Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes.");
+ return;
+ }
+
+ // set up the proxy server
+ try {
+ int portNo = Integer.parseInt(port);
+ Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port);
+ proxyServer = new SimpleTCPIPServer(portNo, ProxyClientConnection.class, 200);
+ proxyServer.startListening();
+ } catch (Exception ex) {
+ Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes.");
+ Logger.error(ex);
+ }
+ }
+
+ public static void sendProxyEvent(ProxyMessage message) {
+ if (proxyServer != null && message.getPath() != null)
+ synchronized(proxyClients) {
+ for (Iterator iter = proxyClients.iterator(); iter.hasNext();) {
+ ProxyClientConnection client = (ProxyClientConnection)iter.next();
+ client.sendMessage(message);
+ }
+ }
+ }
+
+ public static void reportConnections(int logLevel) {
+ synchronized(proxyClients) {
+ Logger.msg(logLevel, "Currently connected proxy clients:");
+ for (Iterator iter = proxyClients.iterator(); iter.hasNext();) {
+ ProxyClientConnection client = (ProxyClientConnection)iter.next();
+ Logger.msg(logLevel, " "+client);
+ }
+ }
+ }
+
+ public static void shutdownServer() {
+ if (proxyServer != null) {
+ Logger.msg(1, "EntityProxyManager: Closing Server.");
+ proxyServer.stopListening();
+ }
+ }
+
+ public static void registerProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.add(client);
+ }
+ }
+
+ public static void unRegisterProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.remove(client);
+ }
+ }
+}
+