/************************************************************************** * EntityProxyFactory.java * * $Revision: 1.45 $ * $Date: 2005/05/10 11:40:09 $ * * Copyright (C) 2001 CERN - European Organization for Nuclear Research * All rights reserved. **************************************************************************/ package com.c2kernel.entity.proxy; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import com.c2kernel.common.InvalidDataException; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.lookup.AgentPath; import com.c2kernel.lookup.DomainPath; import com.c2kernel.lookup.Path; import com.c2kernel.persistency.ClusterStorage; import com.c2kernel.process.Gateway; import com.c2kernel.property.Property; import com.c2kernel.utils.Logger; import com.c2kernel.utils.SoftCache; import com.c2kernel.utils.server.SimpleTCPIPServer; public class EntityProxyManager { SoftCache proxyPool = new SoftCache(50); HashMap treeSubscribers = new HashMap(); HashMap connections = new HashMap(); // server objects static ArrayList proxyClients = new ArrayList(); static SimpleTCPIPServer proxyServer = null; static String serverName = null; /** * Create an entity proxy manager to listen for proxy events and reap unused proxies */ public EntityProxyManager() { Logger.msg(5, "EntityProxyManager - Starting....."); Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers")); while(servers.hasMoreElements()) { Path thisServerPath = servers.nextElement(); try { int syskey = thisServerPath.getSysKey(); String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); int remotePort = Integer.parseInt(portStr); connectToProxyServer(remoteServer, remotePort); } catch (Exception ex) { Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); Logger.error(ex); } } } public void connectToProxyServer(String name, int port) { ProxyServerConnection oldConn = connections.get(name); if (oldConn != null) oldConn.shutdown(); connections.put(name, new ProxyServerConnection(name, port, this)); } protected void resubscribe(ProxyServerConnection conn) { synchronized (proxyPool) { for (Integer key : proxyPool.keySet()) { ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); Logger.msg(5, "Subscribing to entity "+key); conn.sendMessage(sub); } } } /** * @param sub */ private void sendMessage(ProxyMessage sub) { for (ProxyServerConnection element : connections.values()) { element.sendMessage(sub); } } public void shutdown() { Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections"); for (ProxyServerConnection element : connections.values()) { element.shutdown(); } } protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response return; if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); else { // proper proxy message Logger.msg(5, "Received proxy message: "+thisMessage.toString()); Integer key = new Integer(thisMessage.getSysKey()); EntityProxy relevant = proxyPool.get(key); if (relevant == null) Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); else try { relevant.notify(thisMessage); } catch (Throwable ex) { Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); Logger.error(ex); } } } private void informTreeSubscribers(boolean state, String path) { DomainPath last = new DomainPath(path); DomainPath parent; boolean first = true; synchronized(treeSubscribers) { while((parent = last.getParent()) != null) { ArrayList currentKeys = new ArrayList(); currentKeys.addAll(treeSubscribers.keySet()); for (DomainPathSubscriber sub : currentKeys) { DomainPath interest = treeSubscribers.get(sub); if (interest!= null && interest.equals(parent)) { if (state == ProxyMessage.ADDED) sub.pathAdded(last); else if (first) sub.pathRemoved(last); } } last = parent; first = false; } } } public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { synchronized(treeSubscribers) { treeSubscribers.put(sub, interest); } } public void unsubscribeTree(DomainPathSubscriber sub) { synchronized(treeSubscribers) { treeSubscribers.remove(sub); } } /************************************************************************** * **************************************************************************/ private EntityProxy createProxy( org.omg.CORBA.Object ior, int systemKey, boolean isItem ) throws ObjectNotFoundException { EntityProxy newProxy = null; Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey); if( isItem ) { newProxy = new ItemProxy(ior, systemKey); } else { newProxy = new AgentProxy(ior, systemKey); } // subscribe to changes from server ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); sendMessage(sub); reportCurrentProxies(9); return ( newProxy ); } protected void removeProxy( int systemKey ) { ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); sendMessage(sub); } /************************************************************************** * EntityProxy getProxy( ManageableEntity, SystemKey) * * Called by the other GetProxy methods. Fills in either the ior or the * SystemKey **************************************************************************/ private EntityProxy getProxy( org.omg.CORBA.Object ior, int systemKey, boolean isItem ) throws ObjectNotFoundException { Integer key = new Integer(systemKey); synchronized(proxyPool) { EntityProxy newProxy; // return it if it exists newProxy = proxyPool.get(key); if (newProxy == null) { // create a new one newProxy = createProxy(ior, systemKey, isItem ); proxyPool.put(key, newProxy); } return newProxy; } } /************************************************************************** * EntityProxy getProxy( String ) * * Proxy from Alias **************************************************************************/ public EntityProxy getProxy( Path path ) throws ObjectNotFoundException { //convert namePath to dn format Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")"); boolean isItem = !(path.getEntity() instanceof AgentPath); return getProxy( Gateway.getLDAPLookup().getIOR(path), path.getSysKey(), isItem ); } /************************************************************************** * void reportCurrentProxies() * * A utility to Dump the current proxies loaded **************************************************************************/ public void reportCurrentProxies(int logLevel) { if (!Logger.doLog(logLevel)) return; Logger.msg(logLevel, "Current proxies: "); try { synchronized(proxyPool) { Iterator i = proxyPool.keySet().iterator(); for( int count=0; i.hasNext(); count++ ) { Integer nextProxy = i.next(); EntityProxy thisProxy = proxyPool.get(nextProxy); if (thisProxy != null) Logger.msg(logLevel, "" + count + ": " + proxyPool.get(nextProxy).getClass().getName() + ": " + nextProxy); } } } catch (ConcurrentModificationException ex) { Logger.msg(logLevel, "Proxy cache modified. Aborting."); } } /************************************************************************** * Static Proxy Server methods **************************************************************************/ /** * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops * @param c2kProps */ public static void initServer() { Logger.msg(5, "EntityProxyFactory::initServer - Starting....."); String port = Gateway.getProperty("ItemServer.Proxy.port"); serverName = Gateway.getProperty("ItemServer.name"); if (port == null) { Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes."); return; } // set up the proxy server try { int portNo = Integer.parseInt(port); Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port); proxyServer = new SimpleTCPIPServer(portNo, ProxyClientConnection.class, 200); proxyServer.startListening(); } catch (Exception ex) { Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes."); Logger.error(ex); } } public static void sendProxyEvent(ProxyMessage message) { if (proxyServer != null && message.getPath() != null) synchronized(proxyClients) { for (ProxyClientConnection client : proxyClients) { client.sendMessage(message); } } } public static void reportConnections(int logLevel) { synchronized(proxyClients) { Logger.msg(logLevel, "Currently connected proxy clients:"); for (ProxyClientConnection client : proxyClients) { Logger.msg(logLevel, " "+client); } } } public static void shutdownServer() { if (proxyServer != null) { Logger.msg(1, "EntityProxyManager: Closing Server."); proxyServer.stopListening(); } } public static void registerProxyClient(ProxyClientConnection client) { synchronized(proxyClients) { proxyClients.add(client); } } public static void unRegisterProxyClient(ProxyClientConnection client) { synchronized(proxyClients) { proxyClients.remove(client); } } }