diff options
| author | ogattaz <olivier@gattaz.com> | 2014-06-05 16:51:07 +0200 |
|---|---|---|
| committer | ogattaz <olivier@gattaz.com> | 2014-06-05 16:51:07 +0200 |
| commit | 2fd193d7936084de91eae46e8c2763914d87ab71 (patch) | |
| tree | b136ed97e535f11d4b3433d16c26570c89430ce4 /src/main/java/com/c2kernel/entity/proxy/ItemProxy.java | |
| parent | 1225792532f77e6e8f4a9addfc0c0a6cf56e89b8 (diff) | |
| parent | e73468fd08cc27aa31f76a27c916e45d5987c628 (diff) | |
Merge branch 'master' of ssh://dev.cccs.uwe.ac.uk:22/var/git/cristal-kernel
Diffstat (limited to 'src/main/java/com/c2kernel/entity/proxy/ItemProxy.java')
| -rw-r--r-- | src/main/java/com/c2kernel/entity/proxy/ItemProxy.java | 285 |
1 files changed, 232 insertions, 53 deletions
diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index 0e6859d..454da6d 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -10,25 +10,40 @@ package com.c2kernel.entity.proxy;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.exolab.castor.mapping.MappingException;
+import org.exolab.castor.xml.MarshalException;
+import org.exolab.castor.xml.ValidationException;
import com.c2kernel.collection.Collection;
-import com.c2kernel.collection.CollectionMember;
+import com.c2kernel.collection.CollectionArrayList;
import com.c2kernel.common.AccessRightsException;
import com.c2kernel.common.InvalidDataException;
import com.c2kernel.common.InvalidTransitionException;
import com.c2kernel.common.ObjectAlreadyExistsException;
import com.c2kernel.common.ObjectNotFoundException;
import com.c2kernel.common.PersistencyException;
+import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.entity.Item;
import com.c2kernel.entity.ItemHelper;
-import com.c2kernel.entity.ManageableEntity;
import com.c2kernel.entity.agent.Job;
import com.c2kernel.entity.agent.JobArrayList;
+import com.c2kernel.lifecycle.instance.CompositeActivity;
import com.c2kernel.lifecycle.instance.Workflow;
+import com.c2kernel.lookup.InvalidItemPathException;
+import com.c2kernel.lookup.ItemPath;
+import com.c2kernel.lookup.Path;
import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.persistency.ClusterStorageException;
import com.c2kernel.persistency.outcome.Viewpoint;
import com.c2kernel.process.Gateway;
+import com.c2kernel.property.Property;
+import com.c2kernel.property.PropertyArrayList;
+import com.c2kernel.utils.CastorXMLUtility;
import com.c2kernel.utils.Logger;
/******************************************************************************
@@ -38,43 +53,86 @@ import com.c2kernel.utils.Logger; * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $
* @author $Author: abranson $
******************************************************************************/
-public class ItemProxy extends EntityProxy
+public class ItemProxy
{
+ protected Item mItem = null;
+ protected org.omg.CORBA.Object mIOR;
+ protected int mSystemKey;
+ protected Path mPath;
+ private final HashMap<MemberSubscription<?>, ProxyObserver<?>>
+ mSubscriptions;
+
/**************************************************************************
- *
+ *
**************************************************************************/
protected ItemProxy( org.omg.CORBA.Object ior,
int systemKey)
throws ObjectNotFoundException
{
- super(ior, systemKey);
+ Logger.msg(8, "ItemProxy::initialise() - Initialising entity " +systemKey);
+
+ mIOR = ior;
+ mSystemKey = systemKey;
+ mSubscriptions = new HashMap<MemberSubscription<?>, ProxyObserver<?>>();
+ try {
+ mPath = new ItemPath(systemKey);
+ } catch (InvalidItemPathException e) {
+ throw new ObjectNotFoundException();
+ }
}
+
+ public int getSystemKey()
+ {
+ return mSystemKey;
+ }
+
+ public Path getPath() {
+ return mPath;
+ }
+
+ protected Item getItem() throws ObjectNotFoundException {
+ if (mItem == null)
+ mItem = narrow();
+ return mItem;
+ }
- @Override
- public ManageableEntity narrow() throws ObjectNotFoundException
+ public Item narrow() throws ObjectNotFoundException
{
try {
return ItemHelper.narrow(mIOR);
} catch (org.omg.CORBA.BAD_PARAM ex) { }
throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down.");
}
- /**************************************************************************
+ /**
+ * @throws MappingException
+ * @throws IOException
+ * @throws ValidationException
+ * @throws MarshalException ************************************************************************
*
*
**************************************************************************/
- public void initialise( int agentId,
- String itemProps,
- String workflow )
+ public void initialise( int agentId,
+ PropertyArrayList itemProps,
+ CompositeActivity workflow,
+ CollectionArrayList colls
+ )
throws AccessRightsException,
InvalidDataException,
PersistencyException,
- ObjectNotFoundException
+ ObjectNotFoundException, MarshalException, ValidationException, IOException, MappingException
{
Logger.msg(7, "ItemProxy::initialise - started");
-
- ((Item)getEntity()).initialise( agentId, itemProps, workflow );
+ CastorXMLUtility xml = Gateway.getMarshaller();
+ if (itemProps == null) throw new InvalidDataException("No initial properties supplied");
+ String propString = xml.marshall(itemProps);
+ String wfString = "";
+ if (workflow != null) wfString = xml.marshall(workflow);
+ String collString = "";
+ if (colls != null) collString = xml.marshall(colls);
+
+ getItem().initialise( agentId, propString, wfString, collString);
}
public void setProperty(AgentProxy agent, String name, String value)
@@ -100,7 +158,7 @@ public class ItemProxy extends EntityProxy /**************************************************************************
*
**************************************************************************/
- protected void requestAction( Job thisJob )
+ public String requestAction( Job thisJob )
throws AccessRightsException,
InvalidTransitionException,
ObjectNotFoundException,
@@ -120,44 +178,10 @@ public class ItemProxy extends EntityProxy throw new InvalidDataException("No Agent specified.", "");
Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName());
- requestAction (thisJob.getAgentId(), thisJob.getStepPath(),
+ return getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(),
thisJob.getTransition().getId(), outcome);
}
- //requestData is xmlString
- public void requestAction( int agentId,
- String stepPath,
- int transitionID,
- String requestData
- )
- throws AccessRightsException,
- InvalidTransitionException,
- ObjectNotFoundException,
- InvalidDataException,
- PersistencyException,
- ObjectAlreadyExistsException
- {
- ((Item)getEntity()).requestAction( agentId,
- stepPath,
- transitionID,
- requestData );
- }
-
- /**************************************************************************
- *
- **************************************************************************/
- public String queryLifeCycle( int agentId,
- boolean filter
- )
- throws AccessRightsException,
- ObjectNotFoundException,
- PersistencyException
- {
- return ((Item)getEntity()).queryLifeCycle( agentId,
- filter );
- }
-
-
/**************************************************************************
*
**************************************************************************/
@@ -168,7 +192,7 @@ public class ItemProxy extends EntityProxy {
JobArrayList thisJobList;
try {
- String jobs = queryLifeCycle(agentId, filter);
+ String jobs = getItem().queryLifeCycle(agentId, filter);
thisJobList = (JobArrayList)Gateway.getMarshaller().unmarshall(jobs);
}
catch (Exception e) {
@@ -208,8 +232,8 @@ public class ItemProxy extends EntityProxy }
- public Collection<? extends CollectionMember> getCollection(String collName) throws ObjectNotFoundException {
- return (Collection<? extends CollectionMember>)getObject(ClusterStorage.COLLECTION+"/"+collName);
+ public Collection<?> getCollection(String collName) throws ObjectNotFoundException {
+ return (Collection<?>)getObject(ClusterStorage.COLLECTION+"/"+collName);
}
public Workflow getWorkflow() throws ObjectNotFoundException {
@@ -226,4 +250,159 @@ public class ItemProxy extends EntityProxy PersistencyException {
return getJobByName(actName, agent.getSystemKey());
}
+
+ /**
+ * If this is reaped, clear out the cache for it too.
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ Logger.msg(7, "Proxy "+mSystemKey+" reaped");
+ Gateway.getStorage().clearCache(mSystemKey, null);
+ Gateway.getProxyManager().removeProxy(mSystemKey);
+ super.finalize();
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public String queryData( String path )
+ throws ObjectNotFoundException
+ {
+
+ try {
+ Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path);
+ if (path.endsWith("all")) {
+ Logger.msg(7, "EntityProxy.queryData() - listing contents");
+ String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3));
+ StringBuffer retString = new StringBuffer();
+ for (int i = 0; i < result.length; i++) {
+ retString.append(result[i]);
+ if (i<result.length-1) retString.append(",");
+ }
+ Logger.msg(7, "EntityProxy.queryData() - "+retString.toString());
+ return retString.toString();
+ }
+ C2KLocalObject target = Gateway.getStorage().get(mSystemKey, path, null);
+ return Gateway.getMarshaller().marshall(target);
+ } catch (ObjectNotFoundException e) {
+ throw e;
+ } catch (Exception e) {
+ Logger.error(e);
+ return "<ERROR>"+e.getMessage()+"</ERROR>";
+ }
+ }
+
+ public String[] getContents( String path ) throws ObjectNotFoundException {
+ try {
+ return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()));
+ } catch (ClusterStorageException e) {
+ throw new ObjectNotFoundException(e.toString());
+ }
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public C2KLocalObject getObject( String xpath )
+ throws ObjectNotFoundException
+ {
+ // load from storage, falling back to proxy loader if not found in others
+ try
+ {
+ return Gateway.getStorage().get( mSystemKey, xpath , null);
+ }
+ catch( ClusterStorageException ex )
+ {
+ Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath);
+ throw new ObjectNotFoundException( ex.toString() );
+ }
+ }
+
+
+
+ public String getProperty( String name )
+ throws ObjectNotFoundException
+ {
+ Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey);
+ Property prop = (Property)getObject("Property/"+name);
+ try
+ {
+ return prop.getValue();
+ }
+ catch (NullPointerException ex)
+ {
+ throw new ObjectNotFoundException();
+ }
+ }
+
+ public String getName()
+ {
+ try {
+ return getProperty("Name");
+ } catch (ObjectNotFoundException ex) {
+ return null;
+ }
+ }
+
+
+
+
+ /**************************************************************************
+ * Subscription methods
+ **************************************************************************/
+
+ public void subscribe(MemberSubscription<?> newSub) {
+
+ newSub.setSubject(this);
+ synchronized (this){
+ mSubscriptions.put( newSub, newSub.getObserver() );
+ }
+ new Thread(newSub).start();
+ Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest);
+ }
+
+ public void unsubscribe(ProxyObserver<?> observer)
+ {
+ synchronized (this){
+ for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription<?> thisSub = e.next();
+ if (mSubscriptions.get( thisSub ) == observer) {
+ e.remove();
+ Logger.msg(7, "Unsubscribed "+observer.getClass().getName());
+ }
+ }
+ }
+ }
+
+ public void dumpSubscriptions(int logLevel) {
+ if (mSubscriptions.size() == 0) return;
+ Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":");
+ synchronized(this) {
+ for (MemberSubscription<?> element : mSubscriptions.keySet()) {
+ ProxyObserver<?> obs = element.getObserver();
+ if (obs != null)
+ Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest);
+ else
+ Logger.msg(logLevel, " Phantom subscription to "+element.interest);
+ }
+ }
+ }
+
+ public void notify(ProxyMessage message) {
+ Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey);
+ synchronized (this){
+ if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName()))
+ Gateway.getStorage().clearCache(mSystemKey, message.getPath());
+ for (Iterator<MemberSubscription<?>> e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription<?> newSub = e.next();
+ if (newSub.getObserver() == null) { // phantom
+ Logger.msg(4, "Removing phantom subscription to "+newSub.interest);
+ e.remove();
+ }
+ else
+ newSub.update(message.getPath(), message.getState());
+ }
+ }
+ }
}
|
