From a1f0ecbb6a2bea6aa214322c412af2f3c5ce124b Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 7 May 2014 17:33:13 +0200 Subject: Agent now extends Item, so they can have workflows. All traces of the old 'Entity' superclasses should be removed, including proxies and paths. Very large change, breaks API compatibility with CRISTAL 2.x. Fixes #135 --- .../com/c2kernel/entity/ItemImplementation.java | 281 +++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 src/main/java/com/c2kernel/entity/ItemImplementation.java (limited to 'src/main/java/com/c2kernel/entity/ItemImplementation.java') diff --git a/src/main/java/com/c2kernel/entity/ItemImplementation.java b/src/main/java/com/c2kernel/entity/ItemImplementation.java new file mode 100644 index 0000000..e0d107a --- /dev/null +++ b/src/main/java/com/c2kernel/entity/ItemImplementation.java @@ -0,0 +1,281 @@ +package com.c2kernel.entity; + +import com.c2kernel.collection.Collection; +import com.c2kernel.collection.CollectionArrayList; +import com.c2kernel.common.AccessRightsException; +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.InvalidTransitionException; +import com.c2kernel.common.ObjectAlreadyExistsException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.agent.JobArrayList; +import com.c2kernel.lifecycle.instance.CompositeActivity; +import com.c2kernel.lifecycle.instance.Workflow; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.InvalidItemPathException; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.ClusterStorageException; +import com.c2kernel.persistency.TransactionManager; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.property.PropertyArrayList; +import com.c2kernel.utils.Logger; + +public class ItemImplementation implements ItemOperations { + + protected final TransactionManager mStorage; + protected final int mSystemKey; + + protected ItemImplementation(int systemKey) { + this.mStorage = Gateway.getStorage(); + this.mSystemKey = systemKey; + } + + @Override + public int getSystemKey() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void initialise(int agentId, String propString, String initWfString, + String initCollsString) throws AccessRightsException, + InvalidDataException, PersistencyException + { + Logger.msg(5, "Item::initialise("+mSystemKey+") - agent:"+agentId); + Object locker = new Object(); + + AgentPath agentPath; + try { + agentPath = new AgentPath(agentId); + } catch (InvalidItemPathException e) { + throw new AccessRightsException("Invalid Agent Id:" + agentId); + } + + // must supply properties + if (propString == null || propString.length() == 0) { + throw new InvalidDataException("No properties supplied", ""); + } + + // store properties + try { + PropertyArrayList props = (PropertyArrayList) Gateway + .getMarshaller().unmarshall(propString); + for (Property thisProp : props.list) + mStorage.put(mSystemKey, thisProp, locker); + } catch (Throwable ex) { + Logger.msg(8, "TraceableEntity::initialise(" + mSystemKey + + ") - Properties were invalid: " + propString); + Logger.error(ex); + mStorage.abort(locker); + throw new InvalidDataException("Properties were invalid", ""); + } + + // create wf + try { + Workflow lc = null; + if (initWfString == null || initWfString.length() == 0) + lc = new Workflow(new CompositeActivity()); + else + lc = new Workflow((CompositeActivity) Gateway + .getMarshaller().unmarshall(initWfString)); + lc.initialise(mSystemKey, agentPath); + mStorage.put(mSystemKey, lc, locker); + } catch (Throwable ex) { + Logger.msg(8, "TraceableEntity::initialise(" + mSystemKey + + ") - Workflow was invalid: " + initWfString); + Logger.error(ex); + mStorage.abort(locker); + throw new InvalidDataException("Workflow was invalid", ""); + } + + // init collections + if (initCollsString != null && initCollsString.length() > 0) { + try { + CollectionArrayList colls = (CollectionArrayList) Gateway + .getMarshaller().unmarshall(initCollsString); + for (Collection thisColl : colls.list) { + mStorage.put(mSystemKey, thisColl, locker); + } + } catch (Throwable ex) { + Logger.msg(8, "TraceableEntity::initialise(" + mSystemKey + + ") - Collections were invalid: " + + initCollsString); + Logger.error(ex); + mStorage.abort(locker); + throw new InvalidDataException("Collections were invalid"); + } + } + mStorage.commit(locker); + Logger.msg(3, "Initialisation of item " + mSystemKey + + " was successful"); + } + + + @Override + public void requestAction(int agentId, String stepPath, int transitionID, + String requestData) throws AccessRightsException, + InvalidTransitionException, ObjectNotFoundException, + InvalidDataException, PersistencyException, + ObjectAlreadyExistsException { + + try { + + Logger.msg(1, "TraceableEntity::request(" + mSystemKey + ") - " + + transitionID + " " + stepPath + " by " + agentId); + + AgentPath agent = new AgentPath(agentId); + Workflow lifeCycle = (Workflow) mStorage.get(mSystemKey, + ClusterStorage.LIFECYCLE + "/workflow", null); + + lifeCycle.requestAction(agent, stepPath, mSystemKey, + transitionID, requestData); + + // store the workflow if we've changed the state of the domain + // wf + if (!(stepPath.startsWith("workflow/predefined"))) + mStorage.put(mSystemKey, lifeCycle, null); + + // Normal operation exceptions + } catch (AccessRightsException ex) { + Logger.msg("Propagating AccessRightsException back to the calling agent"); + throw ex; + } catch (InvalidTransitionException ex) { + Logger.msg("Propagating InvalidTransitionException back to the calling agent"); + throw ex; + } catch (ObjectNotFoundException ex) { + Logger.msg("Propagating ObjectNotFoundException back to the calling agent"); + throw ex; + // errors + } catch (ClusterStorageException ex) { + Logger.error(ex); + throw new PersistencyException("Error on storage: " + + ex.getMessage(), ""); + } catch (InvalidItemPathException ex) { + Logger.error(ex); + throw new AccessRightsException("Invalid Agent Id: " + agentId, + ""); + } catch (InvalidDataException ex) { + Logger.error(ex); + Logger.msg("Propagating InvalidDataException back to the calling agent"); + throw ex; + } catch (ObjectAlreadyExistsException ex) { + Logger.error(ex); + Logger.msg("Propagating ObjectAlreadyExistsException back to the calling agent"); + throw ex; + // non-CORBA exception hasn't been caught! + } catch (Throwable ex) { + Logger.error("Unknown Error: requestAction on " + mSystemKey + + " by " + agentId + " executing " + stepPath); + Logger.error(ex); + throw new InvalidDataException( + "Extraordinary Exception during execution:" + + ex.getClass().getName() + " - " + + ex.getMessage(), ""); + } + } + + @Override + public String queryLifeCycle(int agentId, boolean filter) + throws AccessRightsException, ObjectNotFoundException, + PersistencyException { + Logger.msg(1, "TraceableEntity::queryLifeCycle(" + mSystemKey + + ") - agent: " + agentId); + try { + AgentPath agent; + try { + agent = new AgentPath(agentId); + } catch (InvalidItemPathException e) { + throw new AccessRightsException("Agent " + agentId + + " doesn't exist"); + } + Workflow wf; + try { + wf = (Workflow) mStorage.get(mSystemKey, + ClusterStorage.LIFECYCLE + "/workflow", null); + } catch (ClusterStorageException e) { + Logger.error("TraceableEntity::queryLifeCycle(" + + mSystemKey + ") - Error loading workflow"); + Logger.error(e); + throw new PersistencyException("Error loading workflow"); + } + JobArrayList jobBag = new JobArrayList(); + CompositeActivity domainWf = (CompositeActivity) wf + .search("workflow/domain"); + jobBag.list = filter ? domainWf.calculateJobs(agent, + mSystemKey, true) : domainWf.calculateAllJobs(agent, + mSystemKey, true); + Logger.msg(1, "TraceableEntity::queryLifeCycle(" + mSystemKey + + ") - Returning " + jobBag.list.size() + " jobs."); + try { + return Gateway.getMarshaller().marshall(jobBag); + } catch (Exception e) { + Logger.error(e); + throw new PersistencyException("Error marshalling job bag"); + } + } catch (Throwable ex) { + Logger.error("TraceableEntity::queryLifeCycle(" + mSystemKey + + ") - Unknown error"); + Logger.error(ex); + throw new PersistencyException( + "Unknown error querying jobs. Please see server log."); + } + } + + @Override + public String queryData(String path) throws AccessRightsException, + ObjectNotFoundException, PersistencyException { + + String result = ""; + + Logger.msg(1, "TraceableEntity::queryData(" + mSystemKey + ") - " + + path); + + try { // check for cluster contents query + + if (path.endsWith("/all")) { + int allPos = path.lastIndexOf("all"); + String query = path.substring(0, allPos); + String[] ids = mStorage.getClusterContents(mSystemKey, + query); + + for (int i = 0; i < ids.length; i++) { + result += ids[i]; + + if (i != ids.length - 1) + result += ","; + } + } + // **************************************************************** + else { // retrieve the object instead + C2KLocalObject obj = mStorage.get(mSystemKey, path, null); + + // marshall it, or in the case of an outcome get the data. + result = Gateway.getMarshaller().marshall(obj); + } + } catch (ObjectNotFoundException ex) { + throw ex; + } catch (Throwable ex) { + Logger.warning("TraceableEntity::queryData(" + mSystemKey + + ") - " + path + " Failed: " + ex.getClass().getName()); + throw new PersistencyException("Server exception: " + + ex.getClass().getName(), ""); + } + + if (Logger.doLog(9)) + Logger.msg(9, "TraceableEntity::queryData(" + mSystemKey + + ") - result:" + result); + + return result; + } + + /** + * + */ + @Override + protected void finalize() throws Throwable { + Logger.msg(7, "Item "+mSystemKey+" reaped"); + Gateway.getStorage().clearCache(mSystemKey, null); + super.finalize(); + } +} -- cgit v1.2.3 From 62c7a46967e949304e6b242854526463aae7ee17 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Fri, 16 May 2014 10:38:32 +0200 Subject: item.request returns the final outcome, which may be modified during the execution (e.g. in the case of Predefined Steps). Fixes #136 --- src/main/idl/Entity.idl | 2 +- .../java/com/c2kernel/entity/ItemImplementation.java | 5 +++-- src/main/java/com/c2kernel/entity/TraceableEntity.java | 4 ++-- .../java/com/c2kernel/entity/agent/ActiveEntity.java | 4 ++-- .../java/com/c2kernel/entity/proxy/AgentProxy.java | 18 ++++++++++-------- src/main/java/com/c2kernel/entity/proxy/ItemProxy.java | 4 ++-- .../java/com/c2kernel/lifecycle/instance/Activity.java | 4 +++- .../c2kernel/lifecycle/instance/CompositeActivity.java | 4 ++-- .../java/com/c2kernel/lifecycle/instance/Workflow.java | 4 ++-- 9 files changed, 27 insertions(+), 22 deletions(-) (limited to 'src/main/java/com/c2kernel/entity/ItemImplementation.java') diff --git a/src/main/idl/Entity.idl b/src/main/idl/Entity.idl index 2aa9188..51884a3 100644 --- a/src/main/idl/Entity.idl +++ b/src/main/idl/Entity.idl @@ -101,7 +101,7 @@ module entity * @throws PersistencyException There was a problem committing the changes to storage. * @throws ObjectAlreadyExistsException Not normally thrown, but reserved for PredefinedSteps to throw if they need to. **/ - void requestAction( in unsigned long agentID, + string requestAction( in unsigned long agentID, in string stepPath, in unsigned long transitionID, in string requestData diff --git a/src/main/java/com/c2kernel/entity/ItemImplementation.java b/src/main/java/com/c2kernel/entity/ItemImplementation.java index e0d107a..b12e105 100644 --- a/src/main/java/com/c2kernel/entity/ItemImplementation.java +++ b/src/main/java/com/c2kernel/entity/ItemImplementation.java @@ -113,7 +113,7 @@ public class ItemImplementation implements ItemOperations { @Override - public void requestAction(int agentId, String stepPath, int transitionID, + public String requestAction(int agentId, String stepPath, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, InvalidDataException, PersistencyException, @@ -128,7 +128,7 @@ public class ItemImplementation implements ItemOperations { Workflow lifeCycle = (Workflow) mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE + "/workflow", null); - lifeCycle.requestAction(agent, stepPath, mSystemKey, + String finalOutcome = lifeCycle.requestAction(agent, stepPath, mSystemKey, transitionID, requestData); // store the workflow if we've changed the state of the domain @@ -136,6 +136,7 @@ public class ItemImplementation implements ItemOperations { if (!(stepPath.startsWith("workflow/predefined"))) mStorage.put(mSystemKey, lifeCycle, null); + return finalOutcome; // Normal operation exceptions } catch (AccessRightsException ex) { Logger.msg("Propagating AccessRightsException back to the calling agent"); diff --git a/src/main/java/com/c2kernel/entity/TraceableEntity.java b/src/main/java/com/c2kernel/entity/TraceableEntity.java index ffd5859..a0980ee 100644 --- a/src/main/java/com/c2kernel/entity/TraceableEntity.java +++ b/src/main/java/com/c2kernel/entity/TraceableEntity.java @@ -111,7 +111,7 @@ public class TraceableEntity extends ItemPOA **************************************************************************/ //requestdata is xmlstring @Override - public void requestAction( int agentId, + public String requestAction( int agentId, String stepPath, int transitionID, String requestData @@ -124,7 +124,7 @@ public class TraceableEntity extends ItemPOA ObjectAlreadyExistsException { synchronized (this) { - mItemImpl.requestAction(agentId, stepPath, transitionID, requestData); + return mItemImpl.requestAction(agentId, stepPath, transitionID, requestData); } } diff --git a/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java b/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java index c59b0fe..a799b62 100644 --- a/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java +++ b/src/main/java/com/c2kernel/entity/agent/ActiveEntity.java @@ -120,14 +120,14 @@ public class ActiveEntity extends AgentPOA } @Override - public void requestAction(int agentID, String stepPath, int transitionID, + public String requestAction(int agentID, String stepPath, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, InvalidDataException, PersistencyException, ObjectAlreadyExistsException { synchronized (this) { - mAgentImpl.requestAction(agentID, stepPath, transitionID, requestData); + return mAgentImpl.requestAction(agentID, stepPath, transitionID, requestData); } } diff --git a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java index 29550d4..b6566a8 100644 --- a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java @@ -82,7 +82,7 @@ public class AgentProxy extends ItemProxy * @param job - the job to execute * @throws ScriptErrorException */ - public void execute(ItemProxy item, Job job) + public String execute(ItemProxy item, Job job) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, @@ -152,12 +152,14 @@ public class AgentProxy extends ItemProxy job.setAgentId(getSystemKey()); Logger.msg(3, "AgentProxy - submitting job to item proxy"); - item.requestAction(job); + String result = item.requestAction(job); if (Logger.doLog(3)) { Date timeNow = new Date(); long secsNow = (timeNow.getTime()-startTime.getTime())/1000; Logger.msg(3, "Execution took "+secsNow+" seconds"); } + + return result; } private Object callScript(ItemProxy item, Job job) throws ScriptingEngineException { @@ -178,7 +180,7 @@ public class AgentProxy extends ItemProxy * @throws ObjectAlreadyExistsException * @throws ScriptErrorException */ - public void execute(Job job) + public String execute(Job job) throws AccessRightsException, InvalidDataException, InvalidTransitionException, @@ -189,13 +191,13 @@ public class AgentProxy extends ItemProxy { try { ItemProxy targetItem = Gateway.getProxyManager().getProxy(new ItemPath(job.getItemSysKey())); - execute(targetItem, job); + return execute(targetItem, job); } catch (InvalidItemPathException e) { throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), ""); } } - public void execute(ItemProxy item, String predefStep, C2KLocalObject obj) + public String execute(ItemProxy item, String predefStep, C2KLocalObject obj) throws AccessRightsException, InvalidDataException, InvalidTransitionException, @@ -210,10 +212,10 @@ public class AgentProxy extends ItemProxy Logger.error(ex); throw new InvalidDataException("Error on marshall", ""); } - execute(item, predefStep, param); + return execute(item, predefStep, param); } - public void execute(ItemProxy item, String predefStep, String... params) + public String execute(ItemProxy item, String predefStep, String... params) throws AccessRightsException, InvalidDataException, InvalidTransitionException, @@ -221,7 +223,7 @@ public class AgentProxy extends ItemProxy PersistencyException, ObjectAlreadyExistsException { - item.getItem().requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params)); + return item.getItem().requestAction(getSystemKey(), "workflow/predefined/"+predefStep, PredefinedStep.DONE, PredefinedStep.bundleData(params)); } /** Wrappers for scripts */ diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java index f3a2f44..454da6d 100644 --- a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -158,7 +158,7 @@ public class ItemProxy /************************************************************************** * **************************************************************************/ - public void requestAction( Job thisJob ) + public String requestAction( Job thisJob ) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, @@ -178,7 +178,7 @@ public class ItemProxy throw new InvalidDataException("No Agent specified.", ""); Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); - getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(), + return getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(), thisJob.getTransition().getId(), outcome); } diff --git a/src/main/java/com/c2kernel/lifecycle/instance/Activity.java b/src/main/java/com/c2kernel/lifecycle/instance/Activity.java index 8e578c2..b86e200 100644 --- a/src/main/java/com/c2kernel/lifecycle/instance/Activity.java +++ b/src/main/java/com/c2kernel/lifecycle/instance/Activity.java @@ -129,7 +129,7 @@ public class Activity extends WfVertex /** cf Item request * @throws ObjectNotFoundException * @throws PersistencyException */ - public void request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException + public String request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException { // Find requested transition @@ -203,6 +203,8 @@ public class Activity extends WfVertex //refresh all the job lists pushJobsToAgents(itemSysKey); + + return outcome; } protected String runActivityLogic(AgentPath agent, int itemSysKey, diff --git a/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java b/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java index 016298f..e6d1bf9 100644 --- a/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java +++ b/src/main/java/com/c2kernel/lifecycle/instance/CompositeActivity.java @@ -413,12 +413,12 @@ public class CompositeActivity extends Activity } @Override - public void request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException + public String request(AgentPath agent, int itemSysKey, int transitionID, String requestData) throws AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectNotFoundException, PersistencyException { if (getChildrenGraphModel().getStartVertex() != null && !getStateMachine().getState(state).isFinished() && transitionID == CompositeActivity.START) ((WfVertex) getChildrenGraphModel().getStartVertex()).run(agent, itemSysKey); - super.request(agent, itemSysKey, transitionID, requestData); + return super.request(agent, itemSysKey, transitionID, requestData); } public void refreshJobs(int itemSysKey) diff --git a/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java b/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java index fa5e66b..8ff2fe2 100644 --- a/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java +++ b/src/main/java/com/c2kernel/lifecycle/instance/Workflow.java @@ -99,12 +99,12 @@ public class Workflow extends CompositeActivity implements C2KLocalObject * @throws PersistencyException */ //requestData is xmlstring - public void requestAction(AgentPath agent, String stepPath, int itemSysKey, int transitionID, String requestData) + public String requestAction(AgentPath agent, String stepPath, int itemSysKey, int transitionID, String requestData) throws ObjectNotFoundException, AccessRightsException, InvalidTransitionException, InvalidDataException, ObjectAlreadyExistsException, PersistencyException { Logger.msg(3, "Action: " + transitionID + " " + stepPath + " by " + agent.getAgentName()); if (search(stepPath) != null) - ((Activity) search(stepPath)).request(agent, itemSysKey, transitionID, requestData); + return ((Activity) search(stepPath)).request(agent, itemSysKey, transitionID, requestData); else throw new ObjectNotFoundException(stepPath + " not found", ""); } -- cgit v1.2.3