diff options
| author | Andrew Branson <andrew.branson@cern.ch> | 2014-05-07 17:33:13 +0200 |
|---|---|---|
| committer | Andrew Branson <andrew.branson@cern.ch> | 2014-05-08 16:37:39 +0200 |
| commit | a1f0ecbb6a2bea6aa214322c412af2f3c5ce124b (patch) | |
| tree | 4d74229b6dd9cfd7ce054e06bf740b9a63a578d6 /src/main/java/com/c2kernel/entity/proxy/ProxyManager.java | |
| parent | 6dfa1bbe05a712174e937af89d5223e98d9d7d06 (diff) | |
Agent now extends Item, so they can have workflows. All traces of the
old 'Entity' superclasses should be removed, including proxies and
paths. Very large change, breaks API compatibility with CRISTAL 2.x.
Fixes #135
Diffstat (limited to 'src/main/java/com/c2kernel/entity/proxy/ProxyManager.java')
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/ProxyManager.java | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java new file mode 100644 index 0000000..d19e38f --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -0,0 +1,337 @@ +/**************************************************************************
+ * ProxyManager.java
+ *
+ * $Revision: 1.45 $
+ * $Date: 2005/05/10 11:40:09 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.proxy;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.DomainPath;
+import com.c2kernel.lookup.Path;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.property.Property;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.SoftCache;
+import com.c2kernel.utils.server.SimpleTCPIPServer;
+
+
+public class ProxyManager
+{
+ SoftCache<Integer, ItemProxy> proxyPool = new SoftCache<Integer, ItemProxy>(50);
+ HashMap<DomainPathSubscriber, DomainPath> treeSubscribers = new HashMap<DomainPathSubscriber, DomainPath>();
+ HashMap<String, ProxyServerConnection> connections = new HashMap<String, ProxyServerConnection>();
+
+ // server objects
+ static ArrayList<ProxyClientConnection> proxyClients = new ArrayList<ProxyClientConnection>();
+ static SimpleTCPIPServer proxyServer = null;
+ static String serverName = null;
+
+ /**
+ * Create a proxy manager to listen for proxy events and reap unused proxies
+ */
+ public ProxyManager()
+ {
+ Logger.msg(5, "ProxyManager - Starting.....");
+
+ Enumeration<Path> servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers"));
+ while(servers.hasMoreElements()) {
+ Path thisServerPath = servers.nextElement();
+ try {
+ int syskey = thisServerPath.getSysKey();
+ String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue();
+ String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue();
+ int remotePort = Integer.parseInt(portStr);
+ connectToProxyServer(remoteServer, remotePort);
+
+ } catch (Exception ex) {
+ Logger.error("Exception retrieving proxy server connection data for "+thisServerPath);
+ Logger.error(ex);
+ }
+ }
+ }
+
+ public void connectToProxyServer(String name, int port) {
+ ProxyServerConnection oldConn = connections.get(name);
+ if (oldConn != null)
+ oldConn.shutdown();
+ connections.put(name, new ProxyServerConnection(name, port, this));
+ }
+
+
+ protected void resubscribe(ProxyServerConnection conn) {
+ synchronized (proxyPool) {
+ for (Integer key : proxyPool.keySet()) {
+ ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false);
+ Logger.msg(5, "Subscribing to item "+key);
+ conn.sendMessage(sub);
+ }
+ }
+ }
+
+ /**
+ * @param sub
+ */
+ private void sendMessage(ProxyMessage sub) {
+ for (ProxyServerConnection element : connections.values()) {
+ element.sendMessage(sub);
+ }
+
+ }
+
+ public void shutdown() {
+ Logger.msg("ProxyManager.shutdown() - flagging shutdown of server connections");
+ for (ProxyServerConnection element : connections.values()) {
+ element.shutdown();
+ }
+ }
+
+ protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException {
+ if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString());
+
+ if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response
+ return;
+
+ if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info
+ informTreeSubscribers(thisMessage.getState(), thisMessage.getPath());
+ else {
+ // proper proxy message
+ Logger.msg(5, "Received proxy message: "+thisMessage.toString());
+ Integer key = new Integer(thisMessage.getSysKey());
+ ItemProxy relevant = proxyPool.get(key);
+ if (relevant == null)
+ Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for.");
+ else
+ try {
+ relevant.notify(thisMessage);
+ } catch (Throwable ex) {
+ Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString());
+ Logger.error(ex);
+ }
+ }
+ }
+
+ private void informTreeSubscribers(boolean state, String path) {
+ DomainPath last = new DomainPath(path);
+ DomainPath parent; boolean first = true;
+ synchronized(treeSubscribers) {
+ while((parent = last.getParent()) != null) {
+ ArrayList<DomainPathSubscriber> currentKeys = new ArrayList<DomainPathSubscriber>();
+ currentKeys.addAll(treeSubscribers.keySet());
+ for (DomainPathSubscriber sub : currentKeys) {
+ DomainPath interest = treeSubscribers.get(sub);
+ if (interest!= null && interest.equals(parent)) {
+ if (state == ProxyMessage.ADDED)
+ sub.pathAdded(last);
+ else if (first)
+ sub.pathRemoved(last);
+ }
+ }
+ last = parent;
+ first = false;
+ }
+ }
+ }
+
+ public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) {
+ synchronized(treeSubscribers) {
+ treeSubscribers.put(sub, interest);
+ }
+ }
+
+ public void unsubscribeTree(DomainPathSubscriber sub) {
+ synchronized(treeSubscribers) {
+ treeSubscribers.remove(sub);
+ }
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ private ItemProxy createProxy( org.omg.CORBA.Object ior,
+ int systemKey,
+ boolean isItem )
+ throws ObjectNotFoundException
+ {
+
+ ItemProxy newProxy = null;
+
+ Logger.msg(5, "ProxyManager::creating proxy on Item " + systemKey);
+
+ if( isItem )
+ {
+ newProxy = new ItemProxy(ior, systemKey);
+ }
+ else
+ {
+ newProxy = new AgentProxy(ior, systemKey);
+ }
+
+ // subscribe to changes from server
+ ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false);
+ sendMessage(sub);
+ reportCurrentProxies(9);
+ return ( newProxy );
+ }
+
+ protected void removeProxy( int systemKey )
+ {
+ ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true);
+ Logger.msg(5,"ProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey);
+ sendMessage(sub);
+ }
+
+
+ /**************************************************************************
+ * Called by the other GetProxy methods. Fills in either the ior or the
+ * SystemKey
+ **************************************************************************/
+ private ItemProxy getProxy( org.omg.CORBA.Object ior,
+ int systemKey,
+ boolean isItem )
+ throws ObjectNotFoundException
+ {
+ Integer key = new Integer(systemKey);
+
+ synchronized(proxyPool) {
+ ItemProxy newProxy;
+ // return it if it exists
+ newProxy = proxyPool.get(key);
+ if (newProxy == null) {
+ // create a new one
+ newProxy = createProxy(ior, systemKey, isItem );
+ proxyPool.put(key, newProxy);
+ }
+ return newProxy;
+
+ }
+ }
+
+ /**************************************************************************
+ * ItemProxy getProxy( String )
+ *
+ * Proxy from Alias
+ **************************************************************************/
+ public ItemProxy getProxy( Path path )
+ throws ObjectNotFoundException
+ {
+
+ //convert namePath to dn format
+ Logger.msg(8,"ProxyManager::getProxy(" + path.toString() + ")");
+ boolean isItem = !(path.getEntity() instanceof AgentPath);
+ return getProxy( Gateway.getLDAPLookup().getIOR(path),
+ path.getSysKey(),
+ isItem );
+
+ }
+
+ /**************************************************************************
+ * void reportCurrentProxies()
+ *
+ * A utility to Dump the current proxies loaded
+ **************************************************************************/
+ public void reportCurrentProxies(int logLevel)
+ {
+ if (!Logger.doLog(logLevel)) return;
+ Logger.msg(logLevel, "Current proxies: ");
+ try {
+ synchronized(proxyPool) {
+ Iterator<Integer> i = proxyPool.keySet().iterator();
+
+ for( int count=0; i.hasNext(); count++ )
+ {
+ Integer nextProxy = i.next();
+ ItemProxy thisProxy = proxyPool.get(nextProxy);
+ if (thisProxy != null)
+ Logger.msg(logLevel,
+ "" + count + ": "
+ + proxyPool.get(nextProxy).getClass().getName()
+ + ": " + nextProxy);
+ }
+ }
+ } catch (ConcurrentModificationException ex) {
+ Logger.msg(logLevel, "Proxy cache modified. Aborting.");
+ }
+ }
+
+
+ /**************************************************************************
+ * Static Proxy Server methods
+ **************************************************************************/
+
+ /**
+ * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops
+ * @param c2kProps
+ */
+ public static void initServer()
+ {
+ Logger.msg(5, "ProxyManager::initServer - Starting.....");
+ int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0);
+ serverName = Gateway.getProperties().getProperty("ItemServer.name");
+ if (port == 0) {
+ Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes.");
+ return;
+ }
+
+ // set up the proxy server
+ try {
+ Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port);
+ proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200);
+ proxyServer.startListening();
+ } catch (Exception ex) {
+ Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes.");
+ Logger.error(ex);
+ }
+ }
+
+ public static void sendProxyEvent(ProxyMessage message) {
+ if (proxyServer != null && message.getPath() != null)
+ synchronized(proxyClients) {
+ for (ProxyClientConnection client : proxyClients) {
+ client.sendMessage(message);
+ }
+ }
+ }
+
+ public static void reportConnections(int logLevel) {
+ synchronized(proxyClients) {
+ Logger.msg(logLevel, "Currently connected proxy clients:");
+ for (ProxyClientConnection client : proxyClients) {
+ Logger.msg(logLevel, " "+client);
+ }
+ }
+ }
+
+ public static void shutdownServer() {
+ if (proxyServer != null) {
+ Logger.msg(1, "ProxyManager: Closing Server.");
+ proxyServer.stopListening();
+ }
+ }
+
+ public static void registerProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.add(client);
+ }
+ }
+
+ public static void unRegisterProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.remove(client);
+ }
+ }
+}
+
|
