diff options
Diffstat (limited to 'src/main/java/com/c2kernel/entity/proxy')
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/AgentProxy.java | 63 | ||||
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/EntityProxy.java | 247 | ||||
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/ItemProxy.java | 285 | ||||
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java | 18 | ||||
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/ProxyManager.java (renamed from src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java) | 50 | ||||
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/ProxyObserver.java (renamed from src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java) | 2 | ||||
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java | 4 |
8 files changed, 294 insertions, 377 deletions
diff --git a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java index f76af10..29550d4 100644 --- a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java @@ -22,13 +22,12 @@ import com.c2kernel.common.PersistencyException; import com.c2kernel.entity.Agent;
import com.c2kernel.entity.AgentHelper;
import com.c2kernel.entity.C2KLocalObject;
-import com.c2kernel.entity.ManageableEntity;
import com.c2kernel.entity.agent.Job;
import com.c2kernel.lifecycle.instance.predefined.PredefinedStep;
import com.c2kernel.lookup.AgentPath;
import com.c2kernel.lookup.DomainPath;
-import com.c2kernel.lookup.EntityPath;
-import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.lookup.InvalidItemPathException;
+import com.c2kernel.lookup.ItemPath;
import com.c2kernel.lookup.Path;
import com.c2kernel.persistency.outcome.OutcomeValidator;
import com.c2kernel.persistency.outcome.Schema;
@@ -47,51 +46,34 @@ import com.c2kernel.utils.Logger; * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $
* @author $Author: abranson $
******************************************************************************/
-public class AgentProxy extends EntityProxy
+public class AgentProxy extends ItemProxy
{
- AgentPath path;
+ AgentPath agentPath;
/**************************************************************************
* Creates an AgentProxy without cache and change notification
**************************************************************************/
- public AgentProxy( org.omg.CORBA.Object ior,
+ protected AgentProxy( org.omg.CORBA.Object ior,
int systemKey)
throws ObjectNotFoundException
{
- super(ior, systemKey);
- try {
- path = new AgentPath(systemKey);
- } catch (InvalidEntityPathException e) {
+ super(ior, systemKey);
+ try {
+ agentPath = new AgentPath(systemKey);
+ mPath = agentPath;
+ } catch (InvalidItemPathException e) {
throw new ObjectNotFoundException();
}
}
@Override
- public ManageableEntity narrow() throws ObjectNotFoundException
+ public Agent narrow() throws ObjectNotFoundException
{
try {
return AgentHelper.narrow(mIOR);
} catch (org.omg.CORBA.BAD_PARAM ex) { }
throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down.");
}
- /**************************************************************************
- *
- *
- **************************************************************************/
- public void initialise( String agentProps, String collector )
- throws AccessRightsException,
- InvalidDataException,
- PersistencyException,
- ObjectNotFoundException
- {
- Logger.msg(7, "AgentProxy::initialise - started");
-
- ((Agent)getEntity()).initialise( agentProps );
- }
-
- public AgentPath getPath() {
- return path;
- }
/**
* Executes a job on the given item using this agent.
@@ -111,7 +93,7 @@ public class AgentProxy extends EntityProxy {
OutcomeValidator validator = null;
Date startTime = new Date();
- Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName());
+ Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+agentPath.getAgentName());
// get the outcome validator if present
if (job.hasOutcome())
{
@@ -206,9 +188,9 @@ public class AgentProxy extends EntityProxy ScriptErrorException
{
try {
- ItemProxy targetItem = (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(job.getItemSysKey()));
+ ItemProxy targetItem = Gateway.getProxyManager().getProxy(new ItemPath(job.getItemSysKey()));
execute(targetItem, job);
- } catch (InvalidEntityPathException e) {
+ } catch (InvalidItemPathException e) {
throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), "");
}
}
@@ -239,7 +221,7 @@ public class AgentProxy extends EntityProxy PersistencyException,
ObjectAlreadyExistsException
{
- item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params));
+ item.getItem().requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params));
}
/** Wrappers for scripts */
@@ -266,18 +248,23 @@ public class AgentProxy extends EntityProxy returnPath = nextMatch;
}
- return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath);
+ return Gateway.getProxyManager().getProxy(returnPath);
}
public ItemProxy getItem(String itemPath) throws ObjectNotFoundException {
return (getItem(new DomainPath(itemPath)));
}
- public ItemProxy getItem(Path itemPath) throws ObjectNotFoundException {
- return (ItemProxy)Gateway.getProxyManager().getProxy(itemPath);
+ @Override
+ public AgentPath getPath() {
+ return agentPath;
+ }
+
+ public ItemProxy getItem(Path itemPath) throws ObjectNotFoundException {
+ return Gateway.getProxyManager().getProxy(itemPath);
}
- public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException {
- return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey));
+ public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidItemPathException {
+ return Gateway.getProxyManager().getProxy(new ItemPath(sysKey));
}
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java deleted file mode 100644 index cb76a19..0000000 --- a/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java +++ /dev/null @@ -1,247 +0,0 @@ -/**************************************************************************
- * EntityProxy.java
- *
- * $Revision: 1.35 $
- * $Date: 2005/05/10 11:40:09 $
- *
- * Copyright (C) 2001 CERN - European Organization for Nuclear Research
- * All rights reserved.
- **************************************************************************/
-
-package com.c2kernel.entity.proxy;
-
-import java.util.HashMap;
-import java.util.Iterator;
-
-import com.c2kernel.common.ObjectNotFoundException;
-import com.c2kernel.entity.C2KLocalObject;
-import com.c2kernel.entity.ManageableEntity;
-import com.c2kernel.persistency.ClusterStorageException;
-import com.c2kernel.process.Gateway;
-import com.c2kernel.property.Property;
-import com.c2kernel.utils.Logger;
-
-
-/******************************************************************************
-* It is a wrapper for the connection and communication with Entities.
-* It can cache data loaded from the Entity to reduce communication with it.
-* This cache is syncronised with corresponding Entity through an event mechanism.
-*
-* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $
-* @author $Author: abranson $
-******************************************************************************/
-
-abstract public class EntityProxy implements ManageableEntity
-{
-
- protected ManageableEntity mEntity = null;
- protected org.omg.CORBA.Object mIOR;
- protected int mSystemKey;
- private HashMap<MemberSubscription<?>, EntityProxyObserver<?>> mSubscriptions;
-
- /**************************************************************************
- *
- **************************************************************************/
- protected EntityProxy( org.omg.CORBA.Object ior,
- int systemKey)
- throws ObjectNotFoundException
- {
- Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity");
-
- initialise( ior, systemKey);
- }
-
- /**************************************************************************
- *
- **************************************************************************/
- private void initialise( org.omg.CORBA.Object ior,
- int systemKey)
- throws ObjectNotFoundException
- {
- Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity");
-
- mIOR = ior;
- mSystemKey = systemKey;
- mSubscriptions = new HashMap<MemberSubscription<?>, EntityProxyObserver<?>>();
- }
-
-
- /**************************************************************************
- *
- **************************************************************************/
- public ManageableEntity getEntity() throws ObjectNotFoundException
- {
- if (mEntity == null) {
- mEntity = narrow();
- }
- return mEntity;
- }
-
- abstract public ManageableEntity narrow() throws ObjectNotFoundException;
-
- /**************************************************************************
- *
- **************************************************************************/
- //check who is using.. and if toString() is sufficient
- @Override
- public int getSystemKey()
- {
- return mSystemKey;
- }
-
-
- /**************************************************************************
- *
- **************************************************************************/
- @Override
- public String queryData( String path )
- throws ObjectNotFoundException
- {
-
- try {
- Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path);
- if (path.endsWith("all")) {
- Logger.msg(7, "EntityProxy.queryData() - listing contents");
- String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3));
- StringBuffer retString = new StringBuffer();
- for (int i = 0; i < result.length; i++) {
- retString.append(result[i]);
- if (i<result.length-1) retString.append(",");
- }
- Logger.msg(7, "EntityProxy.queryData() - "+retString.toString());
- return retString.toString();
- }
- C2KLocalObject target = Gateway.getStorage().get(mSystemKey, path, null);
- return Gateway.getMarshaller().marshall(target);
- } catch (ObjectNotFoundException e) {
- throw e;
- } catch (Exception e) {
- Logger.error(e);
- return "<ERROR>"+e.getMessage()+"</ERROR>";
- }
- }
-
- public String[] getContents( String path ) throws ObjectNotFoundException {
- try {
- return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()));
- } catch (ClusterStorageException e) {
- throw new ObjectNotFoundException(e.toString());
- }
- }
-
-
- /**************************************************************************
- *
- **************************************************************************/
- public C2KLocalObject getObject( String xpath )
- throws ObjectNotFoundException
- {
- // load from storage, falling back to proxy loader if not found in others
- try
- {
- return Gateway.getStorage().get( mSystemKey, xpath , null);
- }
- catch( ClusterStorageException ex )
- {
- Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath);
- throw new ObjectNotFoundException( ex.toString() );
- }
- }
-
-
-
- public String getProperty( String name )
- throws ObjectNotFoundException
- {
- Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey);
- Property prop = (Property)getObject("Property/"+name);
- try
- {
- return prop.getValue();
- }
- catch (NullPointerException ex)
- {
- throw new ObjectNotFoundException();
- }
- }
-
- public String getName()
- {
- try {
- return getProperty("Name");
- } catch (ObjectNotFoundException ex) {
- return null;
- }
- }
-
-
- /**************************************************************************
- * Subscription methods
- **************************************************************************/
-
- public void subscribe (MemberSubscription<?> newSub) {
-
- newSub.setSubject(this);
- synchronized (this){
- mSubscriptions.put( newSub, newSub.getObserver() );
- }
- new Thread(newSub).start();
- Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest);
- }
-
- public void unsubscribe(EntityProxyObserver<?> observer)
- {
- synchronized (this){
- for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
- MemberSubscription<?> thisSub = e.next();
- if (mSubscriptions.get( thisSub ) == observer) {
- e.remove();
- Logger.msg(7, "Unsubscribed "+observer.getClass().getName());
- }
- }
- }
- }
-
- public void dumpSubscriptions(int logLevel) {
- if (mSubscriptions.size() == 0) return;
- Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":");
- synchronized(this) {
- for (MemberSubscription<?> element : mSubscriptions.keySet()) {
- EntityProxyObserver<?> obs = element.getObserver();
- if (obs != null)
- Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest);
- else
- Logger.msg(logLevel, " Phantom subscription to "+element.interest);
- }
- }
- }
-
- public void notify(ProxyMessage message) {
- Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey);
- synchronized (this){
- if (!message.getServer().equals(EntityProxyManager.serverName))
- Gateway.getStorage().clearCache(mSystemKey, message.getPath());
- for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
- MemberSubscription<?> newSub = e.next();
- if (newSub.getObserver() == null) { // phantom
- Logger.msg(4, "Removing phantom subscription to "+newSub.interest);
- e.remove();
- }
- else
- newSub.update(message.getPath(), message.getState());
- }
- }
- }
-
- /**
- * If this is reaped, clear out the cache for it too.
- */
- @Override
- protected void finalize() throws Throwable {
- Logger.msg(7, "Proxy "+mSystemKey+" reaped");
- Gateway.getStorage().clearCache(mSystemKey, null);
- Gateway.getProxyManager().removeProxy(mSystemKey);
- super.finalize();
- }
-
-}
diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 0e6859d..355acd8 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -10,25 +10,40 @@ package com.c2kernel.entity.proxy;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.exolab.castor.mapping.MappingException;
+import org.exolab.castor.xml.MarshalException;
+import org.exolab.castor.xml.ValidationException;
import com.c2kernel.collection.Collection;
-import com.c2kernel.collection.CollectionMember;
+import com.c2kernel.collection.CollectionArrayList;
import com.c2kernel.common.AccessRightsException;
import com.c2kernel.common.InvalidDataException;
import com.c2kernel.common.InvalidTransitionException;
import com.c2kernel.common.ObjectAlreadyExistsException;
import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.common.PersistencyException;
+import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.entity.Item;
import com.c2kernel.entity.ItemHelper;
-import com.c2kernel.entity.ManageableEntity;
import com.c2kernel.entity.agent.Job;
import com.c2kernel.entity.agent.JobArrayList;
+import com.c2kernel.lifecycle.instance.CompositeActivity;
import com.c2kernel.lifecycle.instance.Workflow;
+import com.c2kernel.lookup.InvalidItemPathException;
+import com.c2kernel.lookup.ItemPath;
+import com.c2kernel.lookup.Path;
import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.persistency.ClusterStorageException;
import com.c2kernel.persistency.outcome.Viewpoint;
import com.c2kernel.process.Gateway;
+import com.c2kernel.property.Property;
+import com.c2kernel.property.PropertyArrayList;
+import com.c2kernel.utils.CastorXMLUtility;
import com.c2kernel.utils.Logger;
/******************************************************************************
@@ -38,43 +53,86 @@ import com.c2kernel.utils.Logger; * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $
* @author $Author: abranson $
******************************************************************************/
-public class ItemProxy extends EntityProxy
+public class ItemProxy
{
+ protected Item mItem = null;
+ protected org.omg.CORBA.Object mIOR;
+ protected int mSystemKey;
+ protected Path mPath;
+ private final HashMap<MemberSubscription<?>, ProxyObserver<?>>
+ mSubscriptions;
+
/**************************************************************************
- *
+ *
**************************************************************************/
protected ItemProxy( org.omg.CORBA.Object ior,
int systemKey)
throws ObjectNotFoundException
{
- super(ior, systemKey);
+ Logger.msg(8, "ItemProxy::initialise() - Initialising entity " +systemKey);
+
+ mIOR = ior;
+ mSystemKey = systemKey;
+ mSubscriptions = new HashMap<MemberSubscription<?>, ProxyObserver<?>>();
+ try {
+ mPath = new ItemPath(systemKey);
+ } catch (InvalidItemPathException e) {
+ throw new ObjectNotFoundException();
+ }
}
+
+ public int getSystemKey()
+ {
+ return mSystemKey;
+ }
+
+ public Path getPath() {
+ return mPath;
+ }
+
+ protected Item getItem() throws ObjectNotFoundException {
+ if (mItem == null)
+ mItem = narrow();
+ return mItem;
+ }
- @Override
- public ManageableEntity narrow() throws ObjectNotFoundException
+ public Item narrow() throws ObjectNotFoundException
{
try {
return ItemHelper.narrow(mIOR);
} catch (org.omg.CORBA.BAD_PARAM ex) { }
throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down.");
}
- /**************************************************************************
+ /**
+ * @throws MappingException
+ * @throws IOException
+ * @throws ValidationException
+ * @throws MarshalException ************************************************************************
*
*
**************************************************************************/
- public void initialise( int agentId,
- String itemProps,
- String workflow )
+ public void initialise( int agentId,
+ PropertyArrayList itemProps,
+ CompositeActivity workflow,
+ CollectionArrayList colls
+ )
throws AccessRightsException,
InvalidDataException,
PersistencyException,
- ObjectNotFoundException
+ ObjectNotFoundException, MarshalException, ValidationException, IOException, MappingException
{
Logger.msg(7, "ItemProxy::initialise - started");
-
- ((Item)getEntity()).initialise( agentId, itemProps, workflow );
+ CastorXMLUtility xml = Gateway.getMarshaller();
+ if (itemProps == null) throw new InvalidDataException("No initial properties supplied");
+ String propString = xml.marshall(itemProps);
+ String wfString = "";
+ if (workflow != null) wfString = xml.marshall(workflow);
+ String collString = "";
+ if (colls != null) collString = xml.marshall(colls);
+
+ getItem().initialise( agentId, propString, wfString, collString);
}
public void setProperty(AgentProxy agent, String name, String value)
@@ -100,7 +158,7 @@ public class ItemProxy extends EntityProxy /**************************************************************************
*
**************************************************************************/
- protected void requestAction( Job thisJob )
+ public void requestAction( Job thisJob )
throws AccessRightsException,
InvalidTransitionException,
ObjectNotFoundException,
@@ -120,44 +178,10 @@ public class ItemProxy extends EntityProxy throw new InvalidDataException("No Agent specified.", "");
Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName());
- requestAction (thisJob.getAgentId(), thisJob.getStepPath(),
+ getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(),
thisJob.getTransition().getId(), outcome);
}
- //requestData is xmlString
- public void requestAction( int agentId,
- String stepPath,
- int transitionID,
- String requestData
- )
- throws AccessRightsException,
- InvalidTransitionException,
- ObjectNotFoundException,
- InvalidDataException,
- PersistencyException,
- ObjectAlreadyExistsException
- {
- ((Item)getEntity()).requestAction( agentId,
- stepPath,
- transitionID,
- requestData );
- }
-
- /**************************************************************************
- *
- **************************************************************************/
- public String queryLifeCycle( int agentId,
- boolean filter
- )
- throws AccessRightsException,
- ObjectNotFoundException,
- PersistencyException
- {
- return ((Item)getEntity()).queryLifeCycle( agentId,
- filter );
- }
-
-
/**************************************************************************
*
**************************************************************************/
@@ -168,7 +192,7 @@ public class ItemProxy extends EntityProxy {
JobArrayList thisJobList;
try {
- String jobs = queryLifeCycle(agentId, filter);
+ String jobs = getItem().queryLifeCycle(agentId, filter);
thisJobList = (JobArrayList)Gateway.getMarshaller().unmarshall(jobs);
}
catch (Exception e) {
@@ -208,8 +232,8 @@ public class ItemProxy extends EntityProxy }
- public Collection<? extends CollectionMember> getCollection(String collName) throws ObjectNotFoundException {
- return (Collection<? extends CollectionMember>)getObject(ClusterStorage.COLLECTION+"/"+collName);
+ public Collection<?> getCollection(String collName) throws ObjectNotFoundException {
+ return (Collection<?>)getObject(ClusterStorage.COLLECTION+"/"+collName);
}
public Workflow getWorkflow() throws ObjectNotFoundException {
@@ -226,4 +250,159 @@ public class ItemProxy extends EntityProxy PersistencyException {
return getJobByName(actName, agent.getSystemKey());
}
+
+ /**
+ * If this is reaped, clear out the cache for it too.
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ Logger.msg(7, "Proxy "+mSystemKey+" reaped");
+ Gateway.getStorage().clearCache(mSystemKey, null);
+ Gateway.getProxyManager().removeProxy(mSystemKey);
+ super.finalize();
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public String queryData( String path )
+ throws ObjectNotFoundException
+ {
+
+ try {
+ Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path);
+ if (path.endsWith("all")) {
+ Logger.msg(7, "EntityProxy.queryData() - listing contents");
+ String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3));
+ StringBuffer retString = new StringBuffer();
+ for (int i = 0; i < result.length; i++) {
+ retString.append(result[i]);
+ if (i<result.length-1) retString.append(",");
+ }
+ Logger.msg(7, "EntityProxy.queryData() - "+retString.toString());
+ return retString.toString();
+ }
+ C2KLocalObject target = Gateway.getStorage().get(mSystemKey, path, null);
+ return Gateway.getMarshaller().marshall(target);
+ } catch (ObjectNotFoundException e) {
+ throw e;
+ } catch (Exception e) {
+ Logger.error(e);
+ return "<ERROR>"+e.getMessage()+"</ERROR>";
+ }
+ }
+
+ public String[] getContents( String path ) throws ObjectNotFoundException {
+ try {
+ return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()));
+ } catch (ClusterStorageException e) {
+ throw new ObjectNotFoundException(e.toString());
+ }
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public C2KLocalObject getObject( String xpath )
+ throws ObjectNotFoundException
+ {
+ // load from storage, falling back to proxy loader if not found in others
+ try
+ {
+ return Gateway.getStorage().get( mSystemKey, xpath , null);
+ }
+ catch( ClusterStorageException ex )
+ {
+ Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath);
+ throw new ObjectNotFoundException( ex.toString() );
+ }
+ }
+
+
+
+ public String getProperty( String name )
+ throws ObjectNotFoundException
+ {
+ Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey);
+ Property prop = (Property)getObject("Property/"+name);
+ try
+ {
+ return prop.getValue();
+ }
+ catch (NullPointerException ex)
+ {
+ throw new ObjectNotFoundException();
+ }
+ }
+
+ public String getName()
+ {
+ try {
+ return getProperty("Name");
+ } catch (ObjectNotFoundException ex) {
+ return null;
+ }
+ }
+
+
+
+
+ /**************************************************************************
+ * Subscription methods
+ **************************************************************************/
+
+ public void subscribe(MemberSubscription<?> newSub) {
+
+ newSub.setSubject(this);
+ synchronized (this){
+ mSubscriptions.put( newSub, newSub.getObserver() );
+ }
+ new Thread(newSub).start();
+ Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest);
+ }
+
+ public void unsubscribe(ProxyObserver<?> observer)
+ {
+ synchronized (this){
+ for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription<?> thisSub = e.next();
+ if (mSubscriptions.get( thisSub ) == observer) {
+ e.remove();
+ Logger.msg(7, "Unsubscribed "+observer.getClass().getName());
+ }
+ }
+ }
+ }
+
+ public void dumpSubscriptions(int logLevel) {
+ if (mSubscriptions.size() == 0) return;
+ Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":");
+ synchronized(this) {
+ for (MemberSubscription<?> element : mSubscriptions.keySet()) {
+ ProxyObserver<?> obs = element.getObserver();
+ if (obs != null)
+ Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest);
+ else
+ Logger.msg(logLevel, " Phantom subscription to "+element.interest);
+ }
+ }
+ }
+
+ public void notify(ProxyMessage message) {
+ Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey);
+ synchronized (this){
+ if (!message.getServer().equals(ProxyManager.serverName))
+ Gateway.getStorage().clearCache(mSystemKey, message.getPath());
+ for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription<?> newSub = e.next();
+ if (newSub.getObserver() == null) { // phantom
+ Logger.msg(4, "Removing phantom subscription to "+newSub.interest);
+ e.remove();
+ }
+ else
+ newSub.update(message.getPath(), message.getState());
+ }
+ }
+ }
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java index 1de18f8..01994e4 100644 --- a/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java +++ b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java @@ -12,14 +12,14 @@ public class MemberSubscription<C extends C2KLocalObject> implements Runnable { public static final String ERROR = "Error";
public static final String END = "theEND";
- EntityProxy subject;
+ ItemProxy subject;
String interest;
// keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used
- WeakReference<EntityProxyObserver<C>> observerReference;
+ WeakReference<ProxyObserver<C>> observerReference;
ArrayList<String> contents = new ArrayList<String>();
boolean preLoad;
- public MemberSubscription(EntityProxyObserver<C> observer, String interest, boolean preLoad) {
+ public MemberSubscription(ProxyObserver<C> observer, String interest, boolean preLoad) {
setObserver(observer);
this.interest = interest;
this.preLoad = preLoad;
@@ -33,7 +33,7 @@ public class MemberSubscription<C extends C2KLocalObject> implements Runnable { private void loadChildren() {
C newMember;
- EntityProxyObserver<C> observer = getObserver();
+ ProxyObserver<C> observer = getObserver();
if (observer == null) return; //reaped
try {
// fetch contents of path
@@ -77,7 +77,7 @@ public class MemberSubscription<C extends C2KLocalObject> implements Runnable { }
public void update(String path, boolean deleted) {
- EntityProxyObserver<C> observer = getObserver();
+ ProxyObserver<C> observer = getObserver();
if (observer == null) return; //reaped
Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted);
if (!path.startsWith(interest)) // doesn't concern us
@@ -106,15 +106,15 @@ public class MemberSubscription<C extends C2KLocalObject> implements Runnable { }
}
- public void setObserver(EntityProxyObserver<C> observer) {
- observerReference = new WeakReference<EntityProxyObserver<C>>(observer);
+ public void setObserver(ProxyObserver<C> observer) {
+ observerReference = new WeakReference<ProxyObserver<C>>(observer);
}
- public void setSubject(EntityProxy subject) {
+ public void setSubject(ItemProxy subject) {
this.subject = subject;
}
- public EntityProxyObserver<C> getObserver() {
+ public ProxyObserver<C> getObserver() {
return observerReference.get();
}
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java index 9687f22..5abdb16 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -36,7 +36,7 @@ public class ProxyClientConnection implements SocketHandler { public ProxyClientConnection() {
super();
thisClientId = ++clientId;
- EntityProxyManager.registerProxyClient(this);
+ ProxyManager.registerProxyClient(this);
Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready.");
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java index c49e7f5..d19e38f 100644 --- a/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyManager.java @@ -1,5 +1,5 @@ /**************************************************************************
- * EntityProxyFactory.java
+ * ProxyManager.java
*
* $Revision: 1.45 $
* $Date: 2005/05/10 11:40:09 $
@@ -29,9 +29,9 @@ import com.c2kernel.utils.SoftCache; import com.c2kernel.utils.server.SimpleTCPIPServer;
-public class EntityProxyManager
+public class ProxyManager
{
- SoftCache<Integer, EntityProxy> proxyPool = new SoftCache<Integer, EntityProxy>(50);
+ SoftCache<Integer, ItemProxy> proxyPool = new SoftCache<Integer, ItemProxy>(50);
HashMap<DomainPathSubscriber, DomainPath> treeSubscribers = new HashMap<DomainPathSubscriber, DomainPath>();
HashMap<String, ProxyServerConnection> connections = new HashMap<String, ProxyServerConnection>();
@@ -41,11 +41,11 @@ public class EntityProxyManager static String serverName = null;
/**
- * Create an entity proxy manager to listen for proxy events and reap unused proxies
+ * Create a proxy manager to listen for proxy events and reap unused proxies
*/
- public EntityProxyManager()
+ public ProxyManager()
{
- Logger.msg(5, "EntityProxyManager - Starting.....");
+ Logger.msg(5, "ProxyManager - Starting.....");
Enumeration<Path> servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers"));
while(servers.hasMoreElements()) {
@@ -76,7 +76,7 @@ public class EntityProxyManager synchronized (proxyPool) {
for (Integer key : proxyPool.keySet()) {
ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false);
- Logger.msg(5, "Subscribing to entity "+key);
+ Logger.msg(5, "Subscribing to item "+key);
conn.sendMessage(sub);
}
}
@@ -93,7 +93,7 @@ public class EntityProxyManager }
public void shutdown() {
- Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections");
+ Logger.msg("ProxyManager.shutdown() - flagging shutdown of server connections");
for (ProxyServerConnection element : connections.values()) {
element.shutdown();
}
@@ -111,7 +111,7 @@ public class EntityProxyManager // proper proxy message
Logger.msg(5, "Received proxy message: "+thisMessage.toString());
Integer key = new Integer(thisMessage.getSysKey());
- EntityProxy relevant = proxyPool.get(key);
+ ItemProxy relevant = proxyPool.get(key);
if (relevant == null)
Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for.");
else
@@ -161,15 +161,15 @@ public class EntityProxyManager /**************************************************************************
*
**************************************************************************/
- private EntityProxy createProxy( org.omg.CORBA.Object ior,
+ private ItemProxy createProxy( org.omg.CORBA.Object ior,
int systemKey,
boolean isItem )
throws ObjectNotFoundException
{
- EntityProxy newProxy = null;
+ ItemProxy newProxy = null;
- Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey);
+ Logger.msg(5, "ProxyManager::creating proxy on Item " + systemKey);
if( isItem )
{
@@ -190,18 +190,16 @@ public class EntityProxyManager protected void removeProxy( int systemKey )
{
ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true);
- Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey);
+ Logger.msg(5,"ProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey);
sendMessage(sub);
}
/**************************************************************************
- * EntityProxy getProxy( ManageableEntity, SystemKey)
- *
* Called by the other GetProxy methods. Fills in either the ior or the
* SystemKey
**************************************************************************/
- private EntityProxy getProxy( org.omg.CORBA.Object ior,
+ private ItemProxy getProxy( org.omg.CORBA.Object ior,
int systemKey,
boolean isItem )
throws ObjectNotFoundException
@@ -209,7 +207,7 @@ public class EntityProxyManager Integer key = new Integer(systemKey);
synchronized(proxyPool) {
- EntityProxy newProxy;
+ ItemProxy newProxy;
// return it if it exists
newProxy = proxyPool.get(key);
if (newProxy == null) {
@@ -223,16 +221,16 @@ public class EntityProxyManager }
/**************************************************************************
- * EntityProxy getProxy( String )
+ * ItemProxy getProxy( String )
*
* Proxy from Alias
**************************************************************************/
- public EntityProxy getProxy( Path path )
+ public ItemProxy getProxy( Path path )
throws ObjectNotFoundException
{
//convert namePath to dn format
- Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")");
+ Logger.msg(8,"ProxyManager::getProxy(" + path.toString() + ")");
boolean isItem = !(path.getEntity() instanceof AgentPath);
return getProxy( Gateway.getLDAPLookup().getIOR(path),
path.getSysKey(),
@@ -256,7 +254,7 @@ public class EntityProxyManager for( int count=0; i.hasNext(); count++ )
{
Integer nextProxy = i.next();
- EntityProxy thisProxy = proxyPool.get(nextProxy);
+ ItemProxy thisProxy = proxyPool.get(nextProxy);
if (thisProxy != null)
Logger.msg(logLevel,
"" + count + ": "
@@ -280,21 +278,21 @@ public class EntityProxyManager */
public static void initServer()
{
- Logger.msg(5, "EntityProxyFactory::initServer - Starting.....");
+ Logger.msg(5, "ProxyManager::initServer - Starting.....");
int port = Gateway.getProperties().getInt("ItemServer.Proxy.port", 0);
serverName = Gateway.getProperties().getProperty("ItemServer.name");
if (port == 0) {
- Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes.");
+ Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of changes.");
return;
}
// set up the proxy server
try {
- Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port);
+ Logger.msg(5, "ProxyManager::initServer - Initialising proxy informer on port "+port);
proxyServer = new SimpleTCPIPServer(port, ProxyClientConnection.class, 200);
proxyServer.startListening();
} catch (Exception ex) {
- Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes.");
+ Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of changes.");
Logger.error(ex);
}
}
@@ -319,7 +317,7 @@ public class EntityProxyManager public static void shutdownServer() {
if (proxyServer != null) {
- Logger.msg(1, "EntityProxyManager: Closing Server.");
+ Logger.msg(1, "ProxyManager: Closing Server.");
proxyServer.stopListening();
}
}
diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java b/src/main/java/com/c2kernel/entity/proxy/ProxyObserver.java index 3ddb99c..b15a972 100644 --- a/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyObserver.java @@ -4,7 +4,7 @@ import com.c2kernel.entity.C2KLocalObject; -public interface EntityProxyObserver<V extends C2KLocalObject>
+public interface ProxyObserver<V extends C2KLocalObject>
{
/**************************************************************************
* Subscribed items are broken apart and fed one by one to these methods.
diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java index 6807953..54ca787 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java @@ -29,7 +29,7 @@ public class ProxyServerConnection extends Thread String serverName;
int serverPort;
Socket serverConnection;
- EntityProxyManager manager;
+ ProxyManager manager;
// for talking to the proxy server
PrintWriter serverStream;
boolean listening = false;
@@ -38,7 +38,7 @@ public class ProxyServerConnection extends Thread /**
* Create an entity proxy manager to listen for proxy events and reap unused proxies
*/
- public ProxyServerConnection(String host, int port, EntityProxyManager manager)
+ public ProxyServerConnection(String host, int port, ProxyManager manager)
{
Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port);
serverName = host;
|
