/************************************************************************** * ProxyManager.java * * $Revision: 1.45 $ * $Date: 2005/05/10 11:40:09 $ * * Copyright (C) 2001 CERN - European Organization for Nuclear Research * All rights reserved. **************************************************************************/ package com.c2kernel.entity.proxy; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.Iterator; import com.c2kernel.common.InvalidDataException; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.lookup.AgentPath; import com.c2kernel.lookup.DomainPath; import com.c2kernel.lookup.Path; import com.c2kernel.persistency.ClusterStorage; import com.c2kernel.process.Gateway; import com.c2kernel.property.Property; import com.c2kernel.utils.Logger; import com.c2kernel.utils.SoftCache; public class ProxyManager { SoftCache proxyPool = new SoftCache(50); HashMap treeSubscribers = new HashMap(); HashMap connections = new HashMap(); /** * Create a proxy manager to listen for proxy events and reap unused proxies */ public ProxyManager() { Logger.msg(5, "ProxyManager - Starting....."); Iterator servers = Gateway.getLookup().search(new DomainPath("/servers"), new Property("Type", "Server", false)); while(servers.hasNext()) { Path thisServerPath = servers.next(); try { Logger.msg(thisServerPath.dump()); int syskey = thisServerPath.getSysKey(); String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); int remotePort = Integer.parseInt(portStr); connectToProxyServer(remoteServer, remotePort); } catch (Exception ex) { Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); Logger.error(ex); } } } public void connectToProxyServer(String name, int port) { ProxyServerConnection oldConn = connections.get(name); if (oldConn != null) oldConn.shutdown(); connections.put(name, new ProxyServerConnection(name, port, this)); } protected void resubscribe(ProxyServerConnection conn) { synchronized (proxyPool) { for (Integer key : proxyPool.keySet()) { ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); Logger.msg(5, "Subscribing to item "+key); conn.sendMessage(sub); } } } /** * @param sub */ private void sendMessage(ProxyMessage sub) { for (ProxyServerConnection element : connections.values()) { element.sendMessage(sub); } } public void shutdown() { Logger.msg("ProxyManager.shutdown() - flagging shutdown of server connections"); for (ProxyServerConnection element : connections.values()) { element.shutdown(); } } protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response return; if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); else { // proper proxy message Logger.msg(5, "Received proxy message: "+thisMessage.toString()); Integer key = new Integer(thisMessage.getSysKey()); ItemProxy relevant = proxyPool.get(key); if (relevant == null) Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); else try { relevant.notify(thisMessage); } catch (Throwable ex) { Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); Logger.error(ex); } } } private void informTreeSubscribers(boolean state, String path) { DomainPath last = new DomainPath(path); DomainPath parent; boolean first = true; synchronized(treeSubscribers) { while((parent = last.getParent()) != null) { ArrayList currentKeys = new ArrayList(); currentKeys.addAll(treeSubscribers.keySet()); for (DomainPathSubscriber sub : currentKeys) { DomainPath interest = treeSubscribers.get(sub); if (interest!= null && interest.equals(parent)) { if (state == ProxyMessage.ADDED) sub.pathAdded(last); else if (first) sub.pathRemoved(last); } } last = parent; first = false; } } } public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { synchronized(treeSubscribers) { treeSubscribers.put(sub, interest); } } public void unsubscribeTree(DomainPathSubscriber sub) { synchronized(treeSubscribers) { treeSubscribers.remove(sub); } } /************************************************************************** * **************************************************************************/ private ItemProxy createProxy( org.omg.CORBA.Object ior, int systemKey, boolean isAgent ) throws ObjectNotFoundException { ItemProxy newProxy = null; Logger.msg(5, "ProxyManager::creating proxy on Item " + systemKey); if( isAgent ) { newProxy = new AgentProxy(ior, systemKey); } else { newProxy = new ItemProxy(ior, systemKey); } // subscribe to changes from server ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); sendMessage(sub); reportCurrentProxies(9); return ( newProxy ); } protected void removeProxy( int systemKey ) { ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); Logger.msg(5,"ProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); sendMessage(sub); } /************************************************************************** * Called by the other GetProxy methods. Fills in either the ior or the * SystemKey **************************************************************************/ private ItemProxy getProxy( org.omg.CORBA.Object ior, int systemKey, boolean isAgent ) throws ObjectNotFoundException { Integer key = new Integer(systemKey); synchronized(proxyPool) { ItemProxy newProxy; // return it if it exists newProxy = proxyPool.get(key); if (newProxy == null) { // create a new one newProxy = createProxy(ior, systemKey, isAgent ); proxyPool.put(key, newProxy); } return newProxy; } } /************************************************************************** * ItemProxy getProxy( String ) * * Proxy from Alias **************************************************************************/ public ItemProxy getProxy( Path path ) throws ObjectNotFoundException { //convert namePath to dn format Logger.msg(8,"ProxyManager::getProxy(" + path.toString() + ")"); boolean isAgent = (path.getEntity() instanceof AgentPath); return getProxy( Gateway.getLookup().resolve(path), path.getSysKey(), isAgent ); } public AgentProxy getAgentProxy( AgentPath path ) throws ObjectNotFoundException { return (AgentProxy) getProxy(path); } /************************************************************************** * void reportCurrentProxies() * * A utility to Dump the current proxies loaded **************************************************************************/ public void reportCurrentProxies(int logLevel) { if (!Logger.doLog(logLevel)) return; Logger.msg(logLevel, "Current proxies: "); try { synchronized(proxyPool) { Iterator i = proxyPool.keySet().iterator(); for( int count=0; i.hasNext(); count++ ) { Integer nextProxy = i.next(); ItemProxy thisProxy = proxyPool.get(nextProxy); if (thisProxy != null) Logger.msg(logLevel, "" + count + ": " + proxyPool.get(nextProxy).getClass().getName() + ": " + nextProxy); } } } catch (ConcurrentModificationException ex) { Logger.msg(logLevel, "Proxy cache modified. Aborting."); } } }