From a1f0ecbb6a2bea6aa214322c412af2f3c5ce124b Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 7 May 2014 17:33:13 +0200 Subject: Agent now extends Item, so they can have workflows. All traces of the old 'Entity' superclasses should be removed, including proxies and paths. Very large change, breaks API compatibility with CRISTAL 2.x. Fixes #135 --- .../java/com/c2kernel/entity/proxy/AgentProxy.java | 63 ++-- .../com/c2kernel/entity/proxy/EntityProxy.java | 247 --------------- .../c2kernel/entity/proxy/EntityProxyManager.java | 339 --------------------- .../c2kernel/entity/proxy/EntityProxyObserver.java | 27 -- .../java/com/c2kernel/entity/proxy/ItemProxy.java | 285 +++++++++++++---- .../c2kernel/entity/proxy/MemberSubscription.java | 18 +- .../entity/proxy/ProxyClientConnection.java | 2 +- .../com/c2kernel/entity/proxy/ProxyManager.java | 337 ++++++++++++++++++++ .../com/c2kernel/entity/proxy/ProxyObserver.java | 27 ++ .../entity/proxy/ProxyServerConnection.java | 4 +- 10 files changed, 633 insertions(+), 716 deletions(-) delete mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxy.java delete mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java delete mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyManager.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyObserver.java (limited to 'src/main/java/com/c2kernel/entity/proxy') diff --git a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java index f76af10..29550d4 100644 --- a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java @@ -22,13 +22,12 @@ import com.c2kernel.common.PersistencyException; import com.c2kernel.entity.Agent; import com.c2kernel.entity.AgentHelper; import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.ManageableEntity; import com.c2kernel.entity.agent.Job; import com.c2kernel.lifecycle.instance.predefined.PredefinedStep; import com.c2kernel.lookup.AgentPath; import com.c2kernel.lookup.DomainPath; -import com.c2kernel.lookup.EntityPath; -import com.c2kernel.lookup.InvalidEntityPathException; +import com.c2kernel.lookup.InvalidItemPathException; +import com.c2kernel.lookup.ItemPath; import com.c2kernel.lookup.Path; import com.c2kernel.persistency.outcome.OutcomeValidator; import com.c2kernel.persistency.outcome.Schema; @@ -47,51 +46,34 @@ import com.c2kernel.utils.Logger; * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $ * @author $Author: abranson $ ******************************************************************************/ -public class AgentProxy extends EntityProxy +public class AgentProxy extends ItemProxy { - AgentPath path; + AgentPath agentPath; /************************************************************************** * Creates an AgentProxy without cache and change notification **************************************************************************/ - public AgentProxy( org.omg.CORBA.Object ior, + protected AgentProxy( org.omg.CORBA.Object ior, int systemKey) throws ObjectNotFoundException { - super(ior, systemKey); - try { - path = new AgentPath(systemKey); - } catch (InvalidEntityPathException e) { + super(ior, systemKey); + try { + agentPath = new AgentPath(systemKey); + mPath = agentPath; + } catch (InvalidItemPathException e) { throw new ObjectNotFoundException(); } } @Override - public ManageableEntity narrow() throws ObjectNotFoundException + public Agent narrow() throws ObjectNotFoundException { try { return AgentHelper.narrow(mIOR); } catch (org.omg.CORBA.BAD_PARAM ex) { } throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down."); } - /************************************************************************** - * - * - **************************************************************************/ - public void initialise( String agentProps, String collector ) - throws AccessRightsException, - InvalidDataException, - PersistencyException, - ObjectNotFoundException - { - Logger.msg(7, "AgentProxy::initialise - started"); - - ((Agent)getEntity()).initialise( agentProps ); - } - - public AgentPath getPath() { - return path; - } /** * Executes a job on the given item using this agent. @@ -111,7 +93,7 @@ public class AgentProxy extends EntityProxy { OutcomeValidator validator = null; Date startTime = new Date(); - Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName()); + Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+agentPath.getAgentName()); // get the outcome validator if present if (job.hasOutcome()) { @@ -206,9 +188,9 @@ public class AgentProxy extends EntityProxy ScriptErrorException { try { - ItemProxy targetItem = (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(job.getItemSysKey())); + ItemProxy targetItem = Gateway.getProxyManager().getProxy(new ItemPath(job.getItemSysKey())); execute(targetItem, job); - } catch (InvalidEntityPathException e) { + } catch (InvalidItemPathException e) { throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), ""); } } @@ -239,7 +221,7 @@ public class AgentProxy extends EntityProxy PersistencyException, ObjectAlreadyExistsException { - item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params)); + item.getItem().requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params)); } /** Wrappers for scripts */ @@ -266,18 +248,23 @@ public class AgentProxy extends EntityProxy returnPath = nextMatch; } - return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath); + return Gateway.getProxyManager().getProxy(returnPath); } public ItemProxy getItem(String itemPath) throws ObjectNotFoundException { return (getItem(new DomainPath(itemPath))); } - public ItemProxy getItem(Path itemPath) throws ObjectNotFoundException { - return (ItemProxy)Gateway.getProxyManager().getProxy(itemPath); + @Override + public AgentPath getPath() { + return agentPath; + } + + public ItemProxy getItem(Path itemPath) throws ObjectNotFoundException { + return Gateway.getProxyManager().getProxy(itemPath); } - public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException { - return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey)); + public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidItemPathException { + return Gateway.getProxyManager().getProxy(new ItemPath(sysKey)); } } diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java deleted file mode 100644 index cb76a19..0000000 --- a/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java +++ /dev/null @@ -1,247 +0,0 @@ -/************************************************************************** - * EntityProxy.java - * - * $Revision: 1.35 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.HashMap; -import java.util.Iterator; - -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.ManageableEntity; -import com.c2kernel.persistency.ClusterStorageException; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.utils.Logger; - - -/****************************************************************************** -* It is a wrapper for the connection and communication with Entities. -* It can cache data loaded from the Entity to reduce communication with it. -* This cache is syncronised with corresponding Entity through an event mechanism. -* -* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $ -* @author $Author: abranson $ -******************************************************************************/ - -abstract public class EntityProxy implements ManageableEntity -{ - - protected ManageableEntity mEntity = null; - protected org.omg.CORBA.Object mIOR; - protected int mSystemKey; - private HashMap, EntityProxyObserver> mSubscriptions; - - /************************************************************************** - * - **************************************************************************/ - protected EntityProxy( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity"); - - initialise( ior, systemKey); - } - - /************************************************************************** - * - **************************************************************************/ - private void initialise( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity"); - - mIOR = ior; - mSystemKey = systemKey; - mSubscriptions = new HashMap, EntityProxyObserver>(); - } - - - /************************************************************************** - * - **************************************************************************/ - public ManageableEntity getEntity() throws ObjectNotFoundException - { - if (mEntity == null) { - mEntity = narrow(); - } - return mEntity; - } - - abstract public ManageableEntity narrow() throws ObjectNotFoundException; - - /************************************************************************** - * - **************************************************************************/ - //check who is using.. and if toString() is sufficient - @Override - public int getSystemKey() - { - return mSystemKey; - } - - - /************************************************************************** - * - **************************************************************************/ - @Override - public String queryData( String path ) - throws ObjectNotFoundException - { - - try { - Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); - if (path.endsWith("all")) { - Logger.msg(7, "EntityProxy.queryData() - listing contents"); - String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); - StringBuffer retString = new StringBuffer(); - for (int i = 0; i < result.length; i++) { - retString.append(result[i]); - if (i"+e.getMessage()+""; - } - } - - public String[] getContents( String path ) throws ObjectNotFoundException { - try { - return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); - } catch (ClusterStorageException e) { - throw new ObjectNotFoundException(e.toString()); - } - } - - - /************************************************************************** - * - **************************************************************************/ - public C2KLocalObject getObject( String xpath ) - throws ObjectNotFoundException - { - // load from storage, falling back to proxy loader if not found in others - try - { - return Gateway.getStorage().get( mSystemKey, xpath , null); - } - catch( ClusterStorageException ex ) - { - Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); - throw new ObjectNotFoundException( ex.toString() ); - } - } - - - - public String getProperty( String name ) - throws ObjectNotFoundException - { - Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); - Property prop = (Property)getObject("Property/"+name); - try - { - return prop.getValue(); - } - catch (NullPointerException ex) - { - throw new ObjectNotFoundException(); - } - } - - public String getName() - { - try { - return getProperty("Name"); - } catch (ObjectNotFoundException ex) { - return null; - } - } - - - /************************************************************************** - * Subscription methods - **************************************************************************/ - - public void subscribe (MemberSubscription newSub) { - - newSub.setSubject(this); - synchronized (this){ - mSubscriptions.put( newSub, newSub.getObserver() ); - } - new Thread(newSub).start(); - Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); - } - - public void unsubscribe(EntityProxyObserver observer) - { - synchronized (this){ - for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { - MemberSubscription thisSub = e.next(); - if (mSubscriptions.get( thisSub ) == observer) { - e.remove(); - Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); - } - } - } - } - - public void dumpSubscriptions(int logLevel) { - if (mSubscriptions.size() == 0) return; - Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); - synchronized(this) { - for (MemberSubscription element : mSubscriptions.keySet()) { - EntityProxyObserver obs = element.getObserver(); - if (obs != null) - Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); - else - Logger.msg(logLevel, " Phantom subscription to "+element.interest); - } - } - } - - public void notify(ProxyMessage message) { - Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); - synchronized (this){ - if (!message.getServer().equals(EntityProxyManager.serverName)) - Gateway.getStorage().clearCache(mSystemKey, message.getPath()); - for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { - MemberSubscription newSub = e.next(); - if (newSub.getObserver() == null) { // phantom - Logger.msg(4, "Removing phantom subscription to "+newSub.interest); - e.remove(); - } - else - newSub.update(message.getPath(), message.getState()); - } - } - } - - /** - * If this is reaped, clear out the cache for it too. - */ - @Override - protected void finalize() throws Throwable { - Logger.msg(7, "Proxy "+mSystemKey+" reaped"); - Gateway.getStorage().clearCache(mSystemKey, null); - Gateway.getProxyManager().removeProxy(mSystemKey); - super.finalize(); - } - -} diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java deleted file mode 100644 index c49e7f5..0000000 --- a/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java +++ /dev/null @@ -1,339 +0,0 @@ -/************************************************************************** - * EntityProxyFactory.java - * - * $Revision: 1.45 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.ArrayList; -import java.util.ConcurrentModificationException; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Iterator; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.lookup.AgentPath; -import com.c2kernel.lookup.DomainPath; -import com.c2kernel.lookup.Path; -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.utils.Logger; -import com.c2kernel.utils.SoftCache; -import com.c2kernel.utils.server.SimpleTCPIPServer; - - -public class EntityProxyManager -{ - SoftCache proxyPool = new SoftCache(50); - HashMap treeSubscribers = new HashMap(); - HashMap connections = new HashMap(); - - // server objects - static ArrayList proxyClients = new ArrayList(); - static SimpleTCPIPServer proxyServer = null; - static String serverName = null; - - /** - * Create an entity proxy manager to listen for proxy events and reap unused proxies - */ - public EntityProxyManager() - { - Logger.msg(5, "EntityProxyManager - Starting....."); - - Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers")); - while(servers.hasMoreElements()) { - Path thisServerPath = servers.nextElement(); - try { - int syskey = thisServerPath.getSysKey(); - String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); - String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); - int remotePort = Integer.parseInt(portStr); - connectToProxyServer(remoteServer, remotePort); - - } catch (Exception ex) { - Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); - Logger.error(ex); - } - } - } - - public void connectToProxyServer(String name, int port) { - ProxyServerConnection oldConn = connections.get(name); - if (oldConn != null) - oldConn.shutdown(); - connections.put(name, new ProxyServerConnection(name, port, this)); - } - - - protected void resubscribe(ProxyServerConnection conn) { - synchronized (proxyPool) { - for (Integer key : proxyPool.keySet()) { - ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); - Logger.msg(5, "Subscribing to entity "+key); - conn.sendMessage(sub); - } - } - } - - /** - * @param sub - */ - private void sendMessage(ProxyMessage sub) { - for (ProxyServerConnection element : connections.values()) { - element.sendMessage(sub); - } - - } - - public void shutdown() { - Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections"); - for (ProxyServerConnection element : connections.values()) { - element.shutdown(); - } - } - - protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { - if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); - - if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response - return; - - if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info - informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); - else { - // proper proxy message - Logger.msg(5, "Received proxy message: "+thisMessage.toString()); - Integer key = new Integer(thisMessage.getSysKey()); - EntityProxy relevant = proxyPool.get(key); - if (relevant == null) - Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); - else - try { - relevant.notify(thisMessage); - } catch (Throwable ex) { - Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); - Logger.error(ex); - } - } - } - - private void informTreeSubscribers(boolean state, String path) { - DomainPath last = new DomainPath(path); - DomainPath parent; boolean first = true; - synchronized(treeSubscribers) { - while((parent = last.getParent()) != null) { - ArrayList currentKeys = new ArrayList(); - currentKeys.addAll(treeSubscribers.keySet()); - for (DomainPathSubscriber sub : currentKeys) { - DomainPath interest = treeSubscribers.get(sub); - if (interest!= null && interest.equals(parent)) { - if (state == ProxyMessage.ADDED) - sub.pathAdded(last); - else if (first) - sub.pathRemoved(last); - } - } - last = parent; - first = false; - } - } - } - - public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { - synchronized(treeSubscribers) { - treeSubscribers.put(sub, interest); - } - } - - public void unsubscribeTree(DomainPathSubscriber sub) { - synchronized(treeSubscribers) { - treeSubscribers.remove(sub); - } - } - - /************************************************************************** - * - **************************************************************************/ - private EntityProxy createProxy( org.omg.CORBA.Object ior, - int systemKey, - boolean isItem ) - throws ObjectNotFoundException - { - - EntityProxy newProxy = null; - - Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey); - - if( isItem ) - { - newProxy = new ItemProxy(ior, systemKey); - } - else - { - newProxy = new AgentProxy(ior, systemKey); - } - - // subscribe to changes from server - ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); - sendMessage(sub); - reportCurrentProxies(9); - return ( newProxy ); - } - - protected void removeProxy( int systemKey ) - { - ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); - Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); - sendMessage(sub); - } - - - /************************************************************************** - * EntityProxy getProxy( ManageableEntity, SystemKey) - * - * Called by the other GetProxy methods. Fills in either the ior or the - * SystemKey - **************************************************************************/ - private EntityProxy getProxy( org.omg.CORBA.Object ior, - int systemKey, - boolean isItem ) - throws ObjectNotFoundException - { - Integer key = new Integer(systemKey); - - synchronized(proxyPool) { - EntityProxy newProxy; - // return it if it exists - newProxy = proxyPool.get(key); - if (newProxy == null) { - // create a new one - newProxy = createProxy(ior, systemKey, isItem ); - proxyPool.put(key, newProxy); - } - return newProxy; - - } - } - - /************************************************************************** - * EntityProxy getProxy( String ) - * - * Proxy from Alias - **************************************************************************/ - public EntityProxy getProxy( Path path ) - throws ObjectNotFoundException - { - - //convert namePath to dn format - Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")"); - boolean isItem = !(path.getEntity() instanceof AgentPath); - return getProxy( Gateway.getLDAPLookup().getIOR(path), - path.getSysKey(), - isItem ); - - } - - /************************************************************************** - * void reportCurrentProxies() - * - * A utility to Dump the current proxies loaded - **************************************************************************/ - public void reportCurrentProxies(int logLevel) - { - if (!Logger.doLog(logLevel)) return; - Logger.msg(logLevel, "Current proxies: "); - try { - synchronized(proxyPool) { - Iterator i = proxyPool.keySet().iterator(); - - for( int count=0; i.hasNext(); count++ ) - { - Integer nextProxy = i.next(); - EntityProxy thisProxy = proxyPool.get(nextProxy); - if (thisProxy != null) - Logger.msg(logLevel, - "" + count + ": " - + proxyPool.get(nextProxy).getClass().getName() - + ": " + nextProxy); - } - } - } catch (ConcurrentModificationException ex) { - Logger.msg(logLevel, "Proxy cache modified. Aborting."); - } - } - - - /************************************************************************** - * Static Proxy Server methods - **************************************************************************/ - - /** - * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops - * @param c2kProps - */ - public static void initServer() - { - Logger.msg(5, "EntityProxyFactory::initServer - Starting....."); - int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); - serverName = Gateway.getProperties().getProperty("ItemServer.name"); - if (port == 0) { - Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes."); - return; - } - - // set up the proxy server - try { - Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port); - proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); - proxyServer.startListening(); - } catch (Exception ex) { - Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes."); - Logger.error(ex); - } - } - - public static void sendProxyEvent(ProxyMessage message) { - if (proxyServer != null && message.getPath() != null) - synchronized(proxyClients) { - for (ProxyClientConnection client : proxyClients) { - client.sendMessage(message); - } - } - } - - public static void reportConnections(int logLevel) { - synchronized(proxyClients) { - Logger.msg(logLevel, "Currently connected proxy clients:"); - for (ProxyClientConnection client : proxyClients) { - Logger.msg(logLevel, " "+client); - } - } - } - - public static void shutdownServer() { - if (proxyServer != null) { - Logger.msg(1, "EntityProxyManager: Closing Server."); - proxyServer.stopListening(); - } - } - - public static void registerProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.add(client); - } - } - - public static void unRegisterProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.remove(client); - } - } -} - diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java deleted file mode 100644 index 3ddb99c..0000000 --- a/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.c2kernel.entity.proxy; - -import com.c2kernel.entity.C2KLocalObject; - - - -public interface EntityProxyObserver -{ - /************************************************************************** - * Subscribed items are broken apart and fed one by one to these methods. - * Replacement after an event is done by feeding the new memberbase with the same id. - * ID could be an XPath? - **************************************************************************/ - public void add(V contents); - - /************************************************************************** - * the 'type' parameter should be an indication of the type of object - * supplied so that the subscriber can associate the call back with - * one of its subscriptions. If we go with an Xpath subscription form, - * then the id will probably be sufficient. - * Should be comparable (substring whatever) with the parameter given to - * the subscribe method of ItemProxy. - **************************************************************************/ - public void remove(String id); - - public void control(String control, String msg); -} diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 0e6859d..355acd8 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -10,25 +10,40 @@ package com.c2kernel.entity.proxy; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; + +import org.exolab.castor.mapping.MappingException; +import org.exolab.castor.xml.MarshalException; +import org.exolab.castor.xml.ValidationException; import com.c2kernel.collection.Collection; -import com.c2kernel.collection.CollectionMember; +import com.c2kernel.collection.CollectionArrayList; import com.c2kernel.common.AccessRightsException; import com.c2kernel.common.InvalidDataException; import com.c2kernel.common.InvalidTransitionException; import com.c2kernel.common.ObjectAlreadyExistsException; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.Item; import com.c2kernel.entity.ItemHelper; -import com.c2kernel.entity.ManageableEntity; import com.c2kernel.entity.agent.Job; import com.c2kernel.entity.agent.JobArrayList; +import com.c2kernel.lifecycle.instance.CompositeActivity; import com.c2kernel.lifecycle.instance.Workflow; +import com.c2kernel.lookup.InvalidItemPathException; +import com.c2kernel.lookup.ItemPath; +import com.c2kernel.lookup.Path; import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.ClusterStorageException; import com.c2kernel.persistency.outcome.Viewpoint; import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.property.PropertyArrayList; +import com.c2kernel.utils.CastorXMLUtility; import com.c2kernel.utils.Logger; /****************************************************************************** @@ -38,43 +53,86 @@ import com.c2kernel.utils.Logger; * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ * @author $Author: abranson $ ******************************************************************************/ -public class ItemProxy extends EntityProxy +public class ItemProxy { + protected Item mItem = null; + protected org.omg.CORBA.Object mIOR; + protected int mSystemKey; + protected Path mPath; + private final HashMap, ProxyObserver> + mSubscriptions; + /************************************************************************** - * + * **************************************************************************/ protected ItemProxy( org.omg.CORBA.Object ior, int systemKey) throws ObjectNotFoundException { - super(ior, systemKey); + Logger.msg(8, "ItemProxy::initialise() - Initialising entity " +systemKey); + + mIOR = ior; + mSystemKey = systemKey; + mSubscriptions = new HashMap, ProxyObserver>(); + try { + mPath = new ItemPath(systemKey); + } catch (InvalidItemPathException e) { + throw new ObjectNotFoundException(); + } } + + public int getSystemKey() + { + return mSystemKey; + } + + public Path getPath() { + return mPath; + } + + protected Item getItem() throws ObjectNotFoundException { + if (mItem == null) + mItem = narrow(); + return mItem; + } - @Override - public ManageableEntity narrow() throws ObjectNotFoundException + public Item narrow() throws ObjectNotFoundException { try { return ItemHelper.narrow(mIOR); } catch (org.omg.CORBA.BAD_PARAM ex) { } throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); } - /************************************************************************** + /** + * @throws MappingException + * @throws IOException + * @throws ValidationException + * @throws MarshalException ************************************************************************ * * **************************************************************************/ - public void initialise( int agentId, - String itemProps, - String workflow ) + public void initialise( int agentId, + PropertyArrayList itemProps, + CompositeActivity workflow, + CollectionArrayList colls + ) throws AccessRightsException, InvalidDataException, PersistencyException, - ObjectNotFoundException + ObjectNotFoundException, MarshalException, ValidationException, IOException, MappingException { Logger.msg(7, "ItemProxy::initialise - started"); - - ((Item)getEntity()).initialise( agentId, itemProps, workflow ); + CastorXMLUtility xml = Gateway.getMarshaller(); + if (itemProps == null) throw new InvalidDataException("No initial properties supplied"); + String propString = xml.marshall(itemProps); + String wfString = ""; + if (workflow != null) wfString = xml.marshall(workflow); + String collString = ""; + if (colls != null) collString = xml.marshall(colls); + + getItem().initialise( agentId, propString, wfString, collString); } public void setProperty(AgentProxy agent, String name, String value) @@ -100,7 +158,7 @@ public class ItemProxy extends EntityProxy /************************************************************************** * **************************************************************************/ - protected void requestAction( Job thisJob ) + public void requestAction( Job thisJob ) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, @@ -120,44 +178,10 @@ public class ItemProxy extends EntityProxy throw new InvalidDataException("No Agent specified.", ""); Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); - requestAction (thisJob.getAgentId(), thisJob.getStepPath(), + getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(), thisJob.getTransition().getId(), outcome); } - //requestData is xmlString - public void requestAction( int agentId, - String stepPath, - int transitionID, - String requestData - ) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - ((Item)getEntity()).requestAction( agentId, - stepPath, - transitionID, - requestData ); - } - - /************************************************************************** - * - **************************************************************************/ - public String queryLifeCycle( int agentId, - boolean filter - ) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return ((Item)getEntity()).queryLifeCycle( agentId, - filter ); - } - - /************************************************************************** * **************************************************************************/ @@ -168,7 +192,7 @@ public class ItemProxy extends EntityProxy { JobArrayList thisJobList; try { - String jobs = queryLifeCycle(agentId, filter); + String jobs = getItem().queryLifeCycle(agentId, filter); thisJobList = (JobArrayList)Gateway.getMarshaller().unmarshall(jobs); } catch (Exception e) { @@ -208,8 +232,8 @@ public class ItemProxy extends EntityProxy } - public Collection getCollection(String collName) throws ObjectNotFoundException { - return (Collection)getObject(ClusterStorage.COLLECTION+"/"+collName); + public Collection getCollection(String collName) throws ObjectNotFoundException { + return (Collection)getObject(ClusterStorage.COLLECTION+"/"+collName); } public Workflow getWorkflow() throws ObjectNotFoundException { @@ -226,4 +250,159 @@ public class ItemProxy extends EntityProxy PersistencyException { return getJobByName(actName, agent.getSystemKey()); } + + /** + * If this is reaped, clear out the cache for it too. + */ + @Override + protected void finalize() throws Throwable { + Logger.msg(7, "Proxy "+mSystemKey+" reaped"); + Gateway.getStorage().clearCache(mSystemKey, null); + Gateway.getProxyManager().removeProxy(mSystemKey); + super.finalize(); + } + + /************************************************************************** + * + **************************************************************************/ + public String queryData( String path ) + throws ObjectNotFoundException + { + + try { + Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); + if (path.endsWith("all")) { + Logger.msg(7, "EntityProxy.queryData() - listing contents"); + String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); + StringBuffer retString = new StringBuffer(); + for (int i = 0; i < result.length; i++) { + retString.append(result[i]); + if (i"+e.getMessage()+""; + } + } + + public String[] getContents( String path ) throws ObjectNotFoundException { + try { + return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); + } catch (ClusterStorageException e) { + throw new ObjectNotFoundException(e.toString()); + } + } + + + /************************************************************************** + * + **************************************************************************/ + public C2KLocalObject getObject( String xpath ) + throws ObjectNotFoundException + { + // load from storage, falling back to proxy loader if not found in others + try + { + return Gateway.getStorage().get( mSystemKey, xpath , null); + } + catch( ClusterStorageException ex ) + { + Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); + throw new ObjectNotFoundException( ex.toString() ); + } + } + + + + public String getProperty( String name ) + throws ObjectNotFoundException + { + Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); + Property prop = (Property)getObject("Property/"+name); + try + { + return prop.getValue(); + } + catch (NullPointerException ex) + { + throw new ObjectNotFoundException(); + } + } + + public String getName() + { + try { + return getProperty("Name"); + } catch (ObjectNotFoundException ex) { + return null; + } + } + + + + + /************************************************************************** + * Subscription methods + **************************************************************************/ + + public void subscribe(MemberSubscription newSub) { + + newSub.setSubject(this); + synchronized (this){ + mSubscriptions.put( newSub, newSub.getObserver() ); + } + new Thread(newSub).start(); + Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); + } + + public void unsubscribe(ProxyObserver observer) + { + synchronized (this){ + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription thisSub = e.next(); + if (mSubscriptions.get( thisSub ) == observer) { + e.remove(); + Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); + } + } + } + } + + public void dumpSubscriptions(int logLevel) { + if (mSubscriptions.size() == 0) return; + Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); + synchronized(this) { + for (MemberSubscription element : mSubscriptions.keySet()) { + ProxyObserver obs = element.getObserver(); + if (obs != null) + Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); + else + Logger.msg(logLevel, " Phantom subscription to "+element.interest); + } + } + } + + public void notify(ProxyMessage message) { + Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); + synchronized (this){ + if (!message.getServer().equals(ProxyManager.serverName)) + Gateway.getStorage().clearCache(mSystemKey, message.getPath()); + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription newSub = e.next(); + if (newSub.getObserver() == null) { // phantom + Logger.msg(4, "Removing phantom subscription to "+newSub.interest); + e.remove(); + } + else + newSub.update(message.getPath(), message.getState()); + } + } + } } diff --git a/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java index 1de18f8..01994e4 100644 --- a/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java +++ b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java @@ -12,14 +12,14 @@ public class MemberSubscription implements Runnable { public static final String ERROR = "Error"; public static final String END = "theEND"; - EntityProxy subject; + ItemProxy subject; String interest; // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used - WeakReference> observerReference; + WeakReference> observerReference; ArrayList contents = new ArrayList(); boolean preLoad; - public MemberSubscription(EntityProxyObserver observer, String interest, boolean preLoad) { + public MemberSubscription(ProxyObserver observer, String interest, boolean preLoad) { setObserver(observer); this.interest = interest; this.preLoad = preLoad; @@ -33,7 +33,7 @@ public class MemberSubscription implements Runnable { private void loadChildren() { C newMember; - EntityProxyObserver observer = getObserver(); + ProxyObserver observer = getObserver(); if (observer == null) return; //reaped try { // fetch contents of path @@ -77,7 +77,7 @@ public class MemberSubscription implements Runnable { } public void update(String path, boolean deleted) { - EntityProxyObserver observer = getObserver(); + ProxyObserver observer = getObserver(); if (observer == null) return; //reaped Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); if (!path.startsWith(interest)) // doesn't concern us @@ -106,15 +106,15 @@ public class MemberSubscription implements Runnable { } } - public void setObserver(EntityProxyObserver observer) { - observerReference = new WeakReference>(observer); + public void setObserver(ProxyObserver observer) { + observerReference = new WeakReference>(observer); } - public void setSubject(EntityProxy subject) { + public void setSubject(ItemProxy subject) { this.subject = subject; } - public EntityProxyObserver getObserver() { + public ProxyObserver getObserver() { return observerReference.get(); } } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 9687f22..5abdb16 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -36,7 +36,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() { super(); thisClientId = ++clientId; - EntityProxyManager.registerProxyClient(this); + ProxyManager.registerProxyClient(this); Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); } diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java new file mode 100644 index 0000000..d19e38f --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -0,0 +1,337 @@ +/************************************************************************** + * ProxyManager.java + * + * $Revision: 1.45 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.lookup.Path; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.SoftCache; +import com.c2kernel.utils.server.SimpleTCPIPServer; + + +public class ProxyManager +{ + SoftCache proxyPool = new SoftCache(50); + HashMap treeSubscribers = new HashMap(); + HashMap connections = new HashMap(); + + // server objects + static ArrayList proxyClients = new ArrayList(); + static SimpleTCPIPServer proxyServer = null; + static String serverName = null; + + /** + * Create a proxy manager to listen for proxy events and reap unused proxies + */ + public ProxyManager() + { + Logger.msg(5, "ProxyManager - Starting....."); + + Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers")); + while(servers.hasMoreElements()) { + Path thisServerPath = servers.nextElement(); + try { + int syskey = thisServerPath.getSysKey(); + String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); + String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); + int remotePort = Integer.parseInt(portStr); + connectToProxyServer(remoteServer, remotePort); + + } catch (Exception ex) { + Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); + Logger.error(ex); + } + } + } + + public void connectToProxyServer(String name, int port) { + ProxyServerConnection oldConn = connections.get(name); + if (oldConn != null) + oldConn.shutdown(); + connections.put(name, new ProxyServerConnection(name, port, this)); + } + + + protected void resubscribe(ProxyServerConnection conn) { + synchronized (proxyPool) { + for (Integer key : proxyPool.keySet()) { + ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); + Logger.msg(5, "Subscribing to item "+key); + conn.sendMessage(sub); + } + } + } + + /** + * @param sub + */ + private void sendMessage(ProxyMessage sub) { + for (ProxyServerConnection element : connections.values()) { + element.sendMessage(sub); + } + + } + + public void shutdown() { + Logger.msg("ProxyManager.shutdown() - flagging shutdown of server connections"); + for (ProxyServerConnection element : connections.values()) { + element.shutdown(); + } + } + + protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { + if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); + + if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response + return; + + if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info + informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); + else { + // proper proxy message + Logger.msg(5, "Received proxy message: "+thisMessage.toString()); + Integer key = new Integer(thisMessage.getSysKey()); + ItemProxy relevant = proxyPool.get(key); + if (relevant == null) + Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); + else + try { + relevant.notify(thisMessage); + } catch (Throwable ex) { + Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); + Logger.error(ex); + } + } + } + + private void informTreeSubscribers(boolean state, String path) { + DomainPath last = new DomainPath(path); + DomainPath parent; boolean first = true; + synchronized(treeSubscribers) { + while((parent = last.getParent()) != null) { + ArrayList currentKeys = new ArrayList(); + currentKeys.addAll(treeSubscribers.keySet()); + for (DomainPathSubscriber sub : currentKeys) { + DomainPath interest = treeSubscribers.get(sub); + if (interest!= null && interest.equals(parent)) { + if (state == ProxyMessage.ADDED) + sub.pathAdded(last); + else if (first) + sub.pathRemoved(last); + } + } + last = parent; + first = false; + } + } + } + + public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { + synchronized(treeSubscribers) { + treeSubscribers.put(sub, interest); + } + } + + public void unsubscribeTree(DomainPathSubscriber sub) { + synchronized(treeSubscribers) { + treeSubscribers.remove(sub); + } + } + + /************************************************************************** + * + **************************************************************************/ + private ItemProxy createProxy( org.omg.CORBA.Object ior, + int systemKey, + boolean isItem ) + throws ObjectNotFoundException + { + + ItemProxy newProxy = null; + + Logger.msg(5, "ProxyManager::creating proxy on Item " + systemKey); + + if( isItem ) + { + newProxy = new ItemProxy(ior, systemKey); + } + else + { + newProxy = new AgentProxy(ior, systemKey); + } + + // subscribe to changes from server + ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); + sendMessage(sub); + reportCurrentProxies(9); + return ( newProxy ); + } + + protected void removeProxy( int systemKey ) + { + ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); + Logger.msg(5,"ProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); + sendMessage(sub); + } + + + /************************************************************************** + * Called by the other GetProxy methods. Fills in either the ior or the + * SystemKey + **************************************************************************/ + private ItemProxy getProxy( org.omg.CORBA.Object ior, + int systemKey, + boolean isItem ) + throws ObjectNotFoundException + { + Integer key = new Integer(systemKey); + + synchronized(proxyPool) { + ItemProxy newProxy; + // return it if it exists + newProxy = proxyPool.get(key); + if (newProxy == null) { + // create a new one + newProxy = createProxy(ior, systemKey, isItem ); + proxyPool.put(key, newProxy); + } + return newProxy; + + } + } + + /************************************************************************** + * ItemProxy getProxy( String ) + * + * Proxy from Alias + **************************************************************************/ + public ItemProxy getProxy( Path path ) + throws ObjectNotFoundException + { + + //convert namePath to dn format + Logger.msg(8,"ProxyManager::getProxy(" + path.toString() + ")"); + boolean isItem = !(path.getEntity() instanceof AgentPath); + return getProxy( Gateway.getLDAPLookup().getIOR(path), + path.getSysKey(), + isItem ); + + } + + /************************************************************************** + * void reportCurrentProxies() + * + * A utility to Dump the current proxies loaded + **************************************************************************/ + public void reportCurrentProxies(int logLevel) + { + if (!Logger.doLog(logLevel)) return; + Logger.msg(logLevel, "Current proxies: "); + try { + synchronized(proxyPool) { + Iterator i = proxyPool.keySet().iterator(); + + for( int count=0; i.hasNext(); count++ ) + { + Integer nextProxy = i.next(); + ItemProxy thisProxy = proxyPool.get(nextProxy); + if (thisProxy != null) + Logger.msg(logLevel, + "" + count + ": " + + proxyPool.get(nextProxy).getClass().getName() + + ": " + nextProxy); + } + } + } catch (ConcurrentModificationException ex) { + Logger.msg(logLevel, "Proxy cache modified. Aborting."); + } + } + + + /************************************************************************** + * Static Proxy Server methods + **************************************************************************/ + + /** + * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops + * @param c2kProps + */ + public static void initServer() + { + Logger.msg(5, "ProxyManager::initServer - Starting....."); + int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0); + serverName = Gateway.getProperties().getProperty("ItemServer.name"); + if (port == 0) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes."); + return; + } + + // set up the proxy server + try { + Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port); + proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200); + proxyServer.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes."); + Logger.error(ex); + } + } + + public static void sendProxyEvent(ProxyMessage message) { + if (proxyServer != null && message.getPath() != null) + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } + + public static void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public static void shutdownServer() { + if (proxyServer != null) { + Logger.msg(1, "ProxyManager: Closing Server."); + proxyServer.stopListening(); + } + } + + public static void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public static void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } +} + diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyObserver.java b/src/main/java/com/c2kernel/entity/proxy/ProxyObserver.java new file mode 100644 index 0000000..b15a972 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyObserver.java @@ -0,0 +1,27 @@ +package com.c2kernel.entity.proxy; + +import com.c2kernel.entity.C2KLocalObject; + + + +public interface ProxyObserver +{ + /************************************************************************** + * Subscribed items are broken apart and fed one by one to these methods. + * Replacement after an event is done by feeding the new memberbase with the same id. + * ID could be an XPath? + **************************************************************************/ + public void add(V contents); + + /************************************************************************** + * the 'type' parameter should be an indication of the type of object + * supplied so that the subscriber can associate the call back with + * one of its subscriptions. If we go with an Xpath subscription form, + * then the id will probably be sufficient. + * Should be comparable (substring whatever) with the parameter given to + * the subscribe method of ItemProxy. + **************************************************************************/ + public void remove(String id); + + public void control(String control, String msg); +} diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java index 6807953..54ca787 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java @@ -29,7 +29,7 @@ public class ProxyServerConnection extends Thread String serverName; int serverPort; Socket serverConnection; - EntityProxyManager manager; + ProxyManager manager; // for talking to the proxy server PrintWriter serverStream; boolean listening = false; @@ -38,7 +38,7 @@ public class ProxyServerConnection extends Thread /** * Create an entity proxy manager to listen for proxy events and reap unused proxies */ - public ProxyServerConnection(String host, int port, EntityProxyManager manager) + public ProxyServerConnection(String host, int port, ProxyManager manager) { Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port); serverName = host; -- cgit v1.2.3