summaryrefslogtreecommitdiff
path: root/source/com/c2kernel/entity
diff options
context:
space:
mode:
Diffstat (limited to 'source/com/c2kernel/entity')
-rwxr-xr-xsource/com/c2kernel/entity/C2KLocalObject.java32
-rwxr-xr-xsource/com/c2kernel/entity/CorbaServer.java191
-rwxr-xr-xsource/com/c2kernel/entity/TraceableEntity.java335
-rwxr-xr-xsource/com/c2kernel/entity/TraceableLocator.java84
-rwxr-xr-xsource/com/c2kernel/entity/agent/ActiveEntity.java281
-rwxr-xr-xsource/com/c2kernel/entity/agent/ActiveLocator.java87
-rwxr-xr-xsource/com/c2kernel/entity/agent/Job.java343
-rwxr-xr-xsource/com/c2kernel/entity/agent/JobArrayList.java30
-rwxr-xr-xsource/com/c2kernel/entity/agent/JobList.java125
-rwxr-xr-xsource/com/c2kernel/entity/proxy/AgentProxy.java301
-rwxr-xr-xsource/com/c2kernel/entity/proxy/DomainPathSubscriber.java18
-rwxr-xr-xsource/com/c2kernel/entity/proxy/EntityProxy.java246
-rwxr-xr-xsource/com/c2kernel/entity/proxy/EntityProxyManager.java341
-rwxr-xr-xsource/com/c2kernel/entity/proxy/EntityProxyObserver.java25
-rwxr-xr-xsource/com/c2kernel/entity/proxy/ItemProxy.java212
-rwxr-xr-xsource/com/c2kernel/entity/proxy/MemberControl.java43
-rwxr-xr-xsource/com/c2kernel/entity/proxy/MemberSubscription.java121
-rwxr-xr-xsource/com/c2kernel/entity/proxy/ProxyClientConnection.java179
-rwxr-xr-xsource/com/c2kernel/entity/proxy/ProxyMessage.java110
-rwxr-xr-xsource/com/c2kernel/entity/proxy/ProxyServerConnection.java129
-rwxr-xr-xsource/com/c2kernel/entity/proxy/ProxySubscriber.java36
-rwxr-xr-xsource/com/c2kernel/entity/transfer/TransferItem.java131
-rwxr-xr-xsource/com/c2kernel/entity/transfer/TransferSet.java107
23 files changed, 3507 insertions, 0 deletions
diff --git a/source/com/c2kernel/entity/C2KLocalObject.java b/source/com/c2kernel/entity/C2KLocalObject.java
new file mode 100755
index 0000000..06bf867
--- /dev/null
+++ b/source/com/c2kernel/entity/C2KLocalObject.java
@@ -0,0 +1,32 @@
+package com.c2kernel.entity;
+
+import java.io.Serializable;
+
+/**
+ * Objects that are to be stored by Cristal Entities must implement this interface and be (un)marshallable by Castor
+ * i.e. have a map file properly registered in the kernel. Domain implementors should not create new C2KLocalObjects
+ * <p>Each object will be stored as the path /clustertype/name in most cases. Exceptions are:
+ * <ul>
+ * <li>Outcomes - /Outcome/SchemaType/SchemaVersion/EventId
+ * <li>Viewpoints - /ViewPoint/SchemaType/Name
+ * </ul>
+ *
+ * @see com.c2kernel.persistency.ClusterStorage
+ * @see com.c2kernel.persistency.ClusterStorageManager
+ *
+ * @author Andrew Branson
+ *
+ * $Revision: 1.5 $
+ * $Date: 2004/01/22 11:10:41 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ */
+
+public interface C2KLocalObject extends Serializable {
+
+ public void setName(String name);
+ public String getName();
+
+ public String getClusterType();
+}
diff --git a/source/com/c2kernel/entity/CorbaServer.java b/source/com/c2kernel/entity/CorbaServer.java
new file mode 100755
index 0000000..948b1a6
--- /dev/null
+++ b/source/com/c2kernel/entity/CorbaServer.java
@@ -0,0 +1,191 @@
+package com.c2kernel.entity;
+
+import java.util.Map;
+
+import org.omg.PortableServer.POA;
+import org.omg.PortableServer.POAManager;
+import org.omg.PortableServer.Servant;
+import org.omg.PortableServer.POAManagerPackage.AdapterInactive;
+
+import com.c2kernel.common.CannotManageException;
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.ObjectAlreadyExistsException;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.entity.agent.ActiveEntity;
+import com.c2kernel.entity.agent.ActiveLocator;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.EntityPath;
+import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.SoftCache;
+
+/**************************************************************************
+ *
+ * $Revision: 1.8 $
+ * $Date: 2005/10/13 08:13:44 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+
+public class CorbaServer {
+ private Map mEntityCache;
+ private POA mRootPOA;
+ private POA mItemPOA;
+ private POA mAgentPOA;
+ private POAManager mPOAManager;
+
+ public CorbaServer() throws InvalidDataException {
+ mEntityCache = new SoftCache(50);
+
+ // init POA
+ try {
+ setupPOA();
+ mPOAManager.activate();
+ } catch (Exception ex) {
+ Logger.error(ex);
+ throw new InvalidDataException("Error initialising POA", "");
+ }
+
+ new Thread(new Runnable() {
+ public void run() {
+ Thread.currentThread().setName("ORB Invoker");
+ Gateway.getORB().run();
+ }
+ }).start();
+ }
+
+ public void close() {
+ try {
+ mPOAManager.deactivate(true, true);
+ } catch (AdapterInactive ex) {
+ Logger.error(ex);
+ }
+ }
+
+ /**************************************************************************
+ * Initialises the C2KRootPOA with policies which are suitable for Factory objects
+ **************************************************************************/
+ public void setupPOA() throws Exception {
+
+ //Initialise the RootPOA
+ mRootPOA = org.omg.PortableServer.POAHelper.narrow(
+ Gateway.getORB().resolve_initial_references("RootPOA"));
+
+ //Initilaise the default POAManager
+
+ mPOAManager = mRootPOA.the_POAManager();
+
+ org.omg.CORBA.Policy[] poaPolicies;
+
+ // Create POA for use by the entities
+ org.omg.CORBA.Policy[] policies = new org.omg.CORBA.Policy[6];
+
+ policies[0] = mRootPOA.create_id_assignment_policy(
+ org.omg.PortableServer.IdAssignmentPolicyValue.USER_ID);
+ policies[1] = mRootPOA.create_lifespan_policy(
+ org.omg.PortableServer.LifespanPolicyValue.PERSISTENT);
+ policies[2] = mRootPOA.create_servant_retention_policy(
+ org.omg.PortableServer.ServantRetentionPolicyValue.NON_RETAIN);
+ policies[3] = mRootPOA.create_id_uniqueness_policy(
+ org.omg.PortableServer.IdUniquenessPolicyValue.UNIQUE_ID);
+ policies[4] = mRootPOA.create_request_processing_policy(
+ org.omg.PortableServer.RequestProcessingPolicyValue.
+ USE_SERVANT_MANAGER);
+ policies[5] = mRootPOA.create_implicit_activation_policy(
+ org.omg.PortableServer.ImplicitActivationPolicyValue.
+ NO_IMPLICIT_ACTIVATION);
+
+ mItemPOA = mRootPOA.create_POA( "Item",
+ mRootPOA.the_POAManager(),
+ policies );
+ mAgentPOA = mRootPOA.create_POA( "Agent",
+ mRootPOA.the_POAManager(),
+ policies );
+
+ //Create the locators
+ TraceableLocator itemLocator = new TraceableLocator( mItemPOA );
+ mItemPOA.set_servant_manager( itemLocator._this( Gateway.getORB() ) );
+
+ ActiveLocator agentLocator = new ActiveLocator( mAgentPOA );
+ mAgentPOA.set_servant_manager( agentLocator._this( Gateway.getORB() ) );
+
+ }
+
+
+ /**************************************************************************
+ * Returns a CORBA servant for a pre-existing entity
+ **************************************************************************/
+ private Servant getEntity(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException {
+ try {
+ EntityPath entityPath = new EntityPath(sysKey);
+ Servant entity = null;
+ synchronized (mEntityCache) {
+ entity = (Servant)mEntityCache.get(entityPath);
+ if (entity == null) {
+ Logger.msg(7, "Creating new servant for "+sysKey);
+
+ Class entityClass = Gateway.getLDAPLookup().getEntityClass(entityPath);
+
+ if (entityClass == TraceableEntity.class) {
+ if (poa == null) poa = mItemPOA;
+ entity = new TraceableEntity(sysKey, poa);
+ }
+ else if (entityClass == ActiveEntity.class) {
+ if (poa == null) poa = mAgentPOA;
+ entity = new ActiveEntity(sysKey, poa);
+ }
+ mEntityCache.put(entityPath, entity);
+ }
+ }
+ return entity;
+
+ } catch (InvalidEntityPathException ex) {
+ throw new ObjectNotFoundException("Invalid Entity Key", "");
+ }
+ }
+
+ /**************************************************************************
+ * Wrapper for fetching Items
+ **************************************************************************/
+ public TraceableEntity getItem(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException {
+ return (TraceableEntity)getEntity(sysKey, poa);
+ }
+
+ /**************************************************************************
+ * Wrapper for fetching Agents
+ **************************************************************************/
+ public ActiveEntity getAgent(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException {
+ return (ActiveEntity)getEntity(sysKey, poa);
+ }
+
+ /**
+ * @param entityPath
+ * @return
+ */
+ public Servant createEntity(EntityPath entityPath) throws CannotManageException, ObjectAlreadyExistsException {
+ try {
+ if (entityPath == null)
+ entityPath = Gateway.getLDAPLookup().getNextKeyManager().generateNextEntityKey();
+ } catch (Exception ex) {
+ Logger.error(ex);
+ throw new CannotManageException("Cannot generate next entity key");
+ }
+ boolean isAgent = entityPath instanceof AgentPath;
+ POA myPOA = isAgent?mAgentPOA:mItemPOA;
+ org.omg.CORBA.Object obj = myPOA.create_reference_with_id(entityPath.getOID(), isAgent?AgentHelper.id():ItemHelper.id());
+ entityPath.setIOR(obj);
+ Servant entity;
+ if (isAgent)
+ entity = new ActiveEntity(entityPath.getSysKey(), myPOA);
+ else
+ entity = new TraceableEntity(entityPath.getSysKey(), myPOA);
+ synchronized (mEntityCache) {
+ mEntityCache.put(entityPath, entity);
+ }
+ return entity;
+
+ }
+}
diff --git a/source/com/c2kernel/entity/TraceableEntity.java b/source/com/c2kernel/entity/TraceableEntity.java
new file mode 100755
index 0000000..49bbe47
--- /dev/null
+++ b/source/com/c2kernel/entity/TraceableEntity.java
@@ -0,0 +1,335 @@
+/**************************************************************************
+ * TraceableEntity
+ *
+ * $Workfile$
+ * $Revision: 1.108 $
+ * $Date: 2005/10/06 14:46:22 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity;
+
+import java.util.Iterator;
+
+import com.c2kernel.common.AccessRightsException;
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.InvalidTransitionException;
+import com.c2kernel.common.ObjectAlreadyExistsException;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.common.PersistencyException;
+import com.c2kernel.entity.agent.JobArrayList;
+import com.c2kernel.lifecycle.instance.CompositeActivity;
+import com.c2kernel.lifecycle.instance.Workflow;
+import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.persistency.ClusterStorageException;
+import com.c2kernel.persistency.TransactionManager;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.property.Property;
+import com.c2kernel.property.PropertyArrayList;
+import com.c2kernel.utils.CastorXMLUtility;
+import com.c2kernel.utils.Logger;
+
+/**************************************************************************
+*
+* @author $Author: abranson $ $Date: 2005/10/06 14:46:22 $
+* @version $Revision: 1.108 $
+* <pre>
+* ,. '\'\ ,---.
+* . | \\ l\\l_ // |
+* _ _ | \\/ `/ `.| |
+* /~\\ \ //~\ | Y | | || Y |
+* | \\ \ // | | \| | |\ / |
+* [ || || ] \ | o|o | > /
+* ] Y || || Y [ \___\_--_ /_/__/
+* | \_|l,------.l|_/ | /.-\(____) /--.\
+* | >' `< | `--(______)----'
+* \ (/~`--____--'~\) / u// u / \
+* `-_>-__________-<_-' / \ / /|
+* /(_#(__)#_)\ ( .) / / ]
+* \___/__\___/ `.`' / [
+* /__`--'__\ |`-' |
+* /\(__,>-~~ __) | |_
+* /\//\\( `--~~ ) _l |-:.
+* '\/ <^\ /^> | ` ( < \\
+* _\ >-__-< /_ ,-\ ,-~~->. \ `:._,/
+* (___\ /___) (____/ (____) `-'
+* Kovax and, paradoxically, Kovax
+* </pre>
+***************************************************************************/
+
+public class TraceableEntity extends ItemPOA
+{
+
+ private int mSystemKey;
+ private org.omg.PortableServer.POA mPoa;
+ private TransactionManager mStorage;
+
+
+ /**************************************************************************
+ * Constructor used by the Locator only
+ **************************************************************************/
+ public TraceableEntity( int key,
+ org.omg.PortableServer.POA poa )
+ {
+ Logger.msg(5,"TraceableEntity::constructor() - SystemKey:" + key );
+
+ mSystemKey = key;
+ mPoa = poa;
+ mStorage = Gateway.getStorage();
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public org.omg.PortableServer.POA _default_POA()
+ {
+ if(mPoa != null)
+ return mPoa;
+ else
+ return super._default_POA();
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public int getSystemKey()
+ {
+ Logger.msg(8, "TraceableEntity::getSystemKey() - " + mSystemKey);
+ return mSystemKey;
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public void initialise( int agentId,
+ String propString,
+ String initWfString
+ )
+ throws AccessRightsException,
+ InvalidDataException,
+ PersistencyException
+ {
+ Logger.msg(1, "TraceableEntity::initialise("+mSystemKey+") - agent:"+agentId);
+ synchronized (this) {
+ Workflow lc = null;
+ PropertyArrayList props = null;
+
+ AgentPath agentPath;
+ try {
+ agentPath = new AgentPath(agentId);
+ } catch (InvalidEntityPathException e) {
+ throw new AccessRightsException("Invalid Agent Id:" + agentId);
+ }
+
+ //unmarshalling checks the validity of the received strings
+
+ // create properties
+ if (!propString.equals("")) {
+ try {
+ props = (PropertyArrayList)CastorXMLUtility.unmarshall(propString);
+ for (Iterator i = props.list.iterator(); i.hasNext();) {
+ Property thisProp = (Property)i.next();
+ mStorage.put(mSystemKey, thisProp, props);
+ }
+ } catch (Throwable ex) {
+ Logger.msg(8, "TraceableEntity::initialise("+mSystemKey+ ") - Properties were invalid: "+propString);
+ Logger.error(ex);
+ mStorage.abort(props);
+ }
+ mStorage.commit(props);
+ }
+
+ // create wf
+ try {
+ if (initWfString == null || initWfString.equals(""))
+ lc = new Workflow(new CompositeActivity());
+ else
+ lc = new Workflow((CompositeActivity)CastorXMLUtility.unmarshall(initWfString));
+ lc.initialise(mSystemKey, agentPath);
+ mStorage.put(mSystemKey, lc, null);
+ } catch (Throwable ex) {
+ Logger.msg(8, "TraceableEntity::initialise("+mSystemKey+") - Workflow was invalid: "+initWfString);
+ Logger.error(ex);
+ }
+ }
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ //requestdata is xmlstring
+ public void requestAction( int agentId,
+ String stepPath,
+ int transitionID,
+ String requestData
+ )
+ throws AccessRightsException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ InvalidDataException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ synchronized (this) {
+ try {
+
+ Logger.msg(1, "TraceableEntity::request("+mSystemKey+") - " +
+ Transitions.getTransitionName(transitionID) + " "+stepPath + " by " +agentId );
+
+ AgentPath agent = new AgentPath(agentId);
+ Workflow lifeCycle = (Workflow)mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE+"/workflow", null);
+
+ lifeCycle.requestAction( agent,
+ stepPath,
+ transitionID,
+ requestData );
+
+ // store the workflow if we've changed the state of the domain wf
+ if (!(stepPath.startsWith("workflow/predefined")))
+ mStorage.put(mSystemKey, lifeCycle, null);
+
+ // Normal operation exceptions
+ } catch (AccessRightsException ex) {
+ Logger.msg("Propagating AccessRightsException back to the calling agent");
+ throw ex;
+ } catch (InvalidTransitionException ex) {
+ Logger.msg("Propagating InvalidTransitionException back to the calling agent");
+ throw ex;
+ } catch (ObjectNotFoundException ex) {
+ Logger.msg("Propagating ObjectNotFoundException back to the calling agent");
+ throw ex;
+ // errors
+ } catch (ClusterStorageException ex) {
+ Logger.error(ex);
+ throw new PersistencyException("Error on storage: "+ex.getMessage(), "");
+ } catch (InvalidEntityPathException ex) {
+ Logger.error(ex);
+ throw new AccessRightsException("Invalid Agent Id: "+agentId, "");
+ } catch (InvalidDataException ex) {
+ Logger.error(ex);
+ Logger.msg("Propagating InvalidDataException back to the calling agent");
+ throw ex;
+ } catch (ObjectAlreadyExistsException ex) {
+ Logger.error(ex);
+ Logger.msg("Propagating ObjectAlreadyExistsException back to the calling agent");
+ throw ex;
+ // non-CORBA exception hasn't been caught!
+ } catch (Throwable ex) {
+ Logger.error("Unknown Error: requestAction on "+mSystemKey+" by "+agentId+" executing "+stepPath);
+ Logger.error(ex);
+ throw new InvalidDataException("Extraordinary Exception during execution:"+ex.getClass().getName()+" - "+ex.getMessage(), "");
+ }
+ }
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public String queryLifeCycle( int agentId,
+ boolean filter
+ )
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException
+ {
+ synchronized (this) {
+ Logger.msg(1, "TraceableEntity::queryLifeCycle("+mSystemKey+") - agent: " + agentId);
+
+ try
+ {
+ AgentPath agent = new AgentPath(agentId);
+ Workflow wf = (Workflow)mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE+"/workflow", null);
+ JobArrayList jobBag = new JobArrayList();
+ CompositeActivity domainWf = (CompositeActivity)wf.search("workflow/domain");
+ jobBag.list = filter?domainWf.calculateJobs(agent, true):domainWf.calculateAllJobs(agent, true);
+ Logger.msg(1, "TraceableEntity::queryLifeCycle("+mSystemKey+") - Returning "+jobBag.list.size()+" jobs.");
+ return CastorXMLUtility.marshall( jobBag );
+ }
+ catch( Throwable ex )
+ {
+ Logger.error(ex);
+ return "<ERROR/>";
+ }
+ }
+ }
+
+ /**************************************************************************
+ * The description for operation getData.
+ *
+ * @param path - the path to the object required
+ * the suffix 'all' retrieves a listing of all keys on that level
+ *
+ * @return The result string in xml format
+ * except 'all' which returns a comma sep list
+ *
+ * @exception ObjectNotFoundException
+ * ************************************************************************/
+ public String queryData(String path)
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException
+ {
+ synchronized (this) {
+ String result = "";
+
+ Logger.msg(1, "TraceableEntity::queryData("+mSystemKey+") - " + path );
+
+ try
+ { // check for cluster contents query
+
+ if (path.endsWith("/all"))
+ {
+ int allPos = path.lastIndexOf("all");
+ String query = path.substring(0,allPos);
+ String[] ids = mStorage.getClusterContents( mSystemKey, query );
+
+ for( int i=0; i<ids.length; i++ )
+ {
+ result += ids[i];
+
+ if( i != ids.length-1 )
+ result += ",";
+ }
+ }
+ //****************************************************************
+ else
+ { // retrieve the object instead
+ C2KLocalObject obj = mStorage.get( mSystemKey, path, null );
+
+ // marshall it, or in the case of an outcome get the data.
+ result = CastorXMLUtility.marshall(obj);
+ }
+ }
+ catch (ObjectNotFoundException ex) {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ Logger.warning("TraceableEntity::queryData("+mSystemKey+") - "+
+ path + " Failed: "+ex.getClass().getName());
+ throw new PersistencyException("Server exception: "+ex.getClass().getName(), "");
+ }
+
+ if( Logger.doLog(9) )
+ Logger.msg(9, "TraceableEntity::queryData("+mSystemKey+") - result:" + result );
+
+ return result;
+ }
+ }
+ /**
+ *
+ */
+ protected void finalize() throws Throwable {
+ Logger.msg(7, "Item "+mSystemKey+" reaped");
+ Gateway.getStorage().clearCache(mSystemKey, null);
+ super.finalize();
+ }
+
+}
diff --git a/source/com/c2kernel/entity/TraceableLocator.java b/source/com/c2kernel/entity/TraceableLocator.java
new file mode 100755
index 0000000..09a19e3
--- /dev/null
+++ b/source/com/c2kernel/entity/TraceableLocator.java
@@ -0,0 +1,84 @@
+/**************************************************************************
+ * TraceableLocator
+ *
+ * $Workfile$
+ * $Revision: 1.15 $
+ * $Date: 2005/10/05 07:39:37 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity;
+
+
+import java.sql.Timestamp;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.Logger;
+
+
+/**************************************************************************
+ *
+ * @author $Author: abranson $ $Date: 2005/10/05 07:39:37 $
+ * @version $Revision: 1.15 $
+ **************************************************************************/
+public class TraceableLocator extends org.omg.PortableServer.ServantLocatorPOA
+{
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ private org.omg.PortableServer.POA mParentPoa;
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public TraceableLocator( org.omg.PortableServer.POA poa )
+ {
+ mParentPoa = poa;
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public org.omg.PortableServer.Servant preinvoke(
+ byte[] oid,
+ org.omg.PortableServer.POA poa,
+ String operation,
+ org.omg.PortableServer.ServantLocatorPackage.CookieHolder cookie )
+ {
+ try
+ {
+
+ int syskey = Integer.parseInt(new String(oid));
+ org.omg.PortableServer.Servant servant=null;
+
+ Logger.msg(1,"===========================================================");
+ Logger.msg(1,"Item called at "+new Timestamp( System.currentTimeMillis()) +": " + operation +
+ "(" + syskey + ")." );
+
+ return Gateway.getCorbaServer().getItem(syskey, mParentPoa);
+
+ }
+ catch (ObjectNotFoundException ex)
+ {
+ Logger.error("ObjectNotFoundException::TraceableLocator::preinvoke() " + ex.toString());
+ throw new org.omg.CORBA.OBJECT_NOT_EXIST();
+ }
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public void postinvoke(
+ byte[] oid,
+ org.omg.PortableServer.POA poa,
+ String operation,
+ java.lang.Object the_cookie,
+ org.omg.PortableServer.Servant the_servant )
+ { }
+}
diff --git a/source/com/c2kernel/entity/agent/ActiveEntity.java b/source/com/c2kernel/entity/agent/ActiveEntity.java
new file mode 100755
index 0000000..cb77bbb
--- /dev/null
+++ b/source/com/c2kernel/entity/agent/ActiveEntity.java
@@ -0,0 +1,281 @@
+/**************************************************************************
+ * BasicAgent.java
+ *
+ * $Revision: 1.39 $
+ * $Date: 2005/04/26 06:48:12 $
+ *
+ * Copyright (C) 2001-2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.agent;
+
+import java.util.Iterator;
+
+import com.c2kernel.common.*;
+import com.c2kernel.entity.AgentPOA;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.lookup.RolePath;
+import com.c2kernel.persistency.ClusterStorageException;
+import com.c2kernel.persistency.TransactionManager;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.property.PropertyArrayList;
+import com.c2kernel.utils.CastorXMLUtility;
+import com.c2kernel.utils.Logger;
+
+/**************************************************************************
+ * ActiveEntity
+ *
+ * @author $Author: abranson $ $Date: 2005/04/26 06:48:12 $
+ * @version $Revision: 1.39 $
+ **************************************************************************/
+public class ActiveEntity extends AgentPOA
+{
+
+ /**************************************************************************
+ * The CORBA Portable Object Adapter which holds the Agent
+ **************************************************************************/
+ private org.omg.PortableServer.POA mPOA = null;
+
+ /**************************************************************************
+ * The C2Kernel system key of the Agent
+ **************************************************************************/
+ private int mSystemKey = -1;
+
+ /**************************************************************************
+ * Connection to the persistency backeng
+ **************************************************************************/
+ private TransactionManager mDatabase = null;
+
+ /**************************************************************************
+ * The agent's joblist
+ **************************************************************************/
+ private JobList currentJobs;
+ /**
+ *
+ * @param key
+ * @param poa
+ */
+ public ActiveEntity( int key,
+ org.omg.PortableServer.POA poa )
+ {
+ Logger.msg(5, "ActiveEntity::constructor() - SystemKey:" + key );
+
+ mSystemKey = key;
+ mPOA = poa;
+ mDatabase = Gateway.getStorage();
+
+ Logger.msg(5, "ActiveEntity::constructor - completed.");
+ }
+
+ /**
+ * initialise cristal2 properties & collector
+ */
+ public void initialise( String agentProps )
+ throws AccessRightsException,
+ InvalidDataException,
+ PersistencyException
+ {
+ PropertyArrayList props = null;
+ Logger.msg(1, "ActiveEntity::initialise("+mSystemKey+")");
+ //initialise cristal2 properties & collector
+ try
+ {
+ props = initProps( agentProps );
+ mDatabase.commit( props );
+ }
+ catch( ClusterStorageException ex )
+ {
+ Logger.error("ActiveEntity::init() - Failed to init props/collector, aborting!");
+ Logger.error(ex);
+
+ mDatabase.abort( props );
+ throw new PersistencyException("Failed to init props => transaction aborted!");
+ }
+
+ Logger.msg(5, "ActiveEntity::init() - completed.");
+ }
+
+ /**
+ *
+ * @param propsString
+ * @return Properties
+ * @throws InvalidDataException Properties cannot be unmarshalled
+ * @throws ClusterStorageException
+ */
+ private PropertyArrayList initProps( String propsString )
+ throws InvalidDataException,
+ ClusterStorageException
+ {
+ PropertyArrayList props = null;
+
+ // create properties
+ if( !propsString.equals("") && propsString != null )
+ {
+ try
+ {
+ props = (PropertyArrayList)CastorXMLUtility.unmarshall(propsString);
+ }
+ catch( Exception ex )
+ {
+ //any exception during unmarshall indicates that data was
+ //incorrect or the castor mapping was not set up
+ Logger.error(ex);
+ throw new InvalidDataException(ex.toString(), null);
+ }
+
+ Iterator iter = props.list.iterator();
+
+ while( iter.hasNext() )
+ mDatabase.put( mSystemKey, (C2KLocalObject)iter.next(), props );
+ }
+ else
+ {
+ Logger.warning("ActiveEntity::initProps() - NO Properties!");
+ }
+
+ return props;
+ }
+
+ /**************************************************************************
+ *
+ *
+ **************************************************************************/
+ public org.omg.PortableServer.POA _default_POA()
+ {
+ if(mPOA != null)
+ return mPOA;
+ else
+ return super._default_POA();
+ }
+
+
+ /**************************************************************************
+ *
+ *
+ **************************************************************************/
+ public int getSystemKey()
+ {
+ return mSystemKey;
+ }
+
+
+ /**************************************************************************
+ *
+ *
+ **************************************************************************/
+ public String queryData(String xpath)
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException
+ {
+ String result = "";
+ int allPos = -1;
+
+ Logger.msg(1, "ActiveEntity::queryData("+mSystemKey+") - " + xpath );
+
+ try
+ {
+ if( (allPos=xpath.indexOf("all")) != -1 )
+ {
+ String query = xpath.substring(0,allPos);
+ String[] ids = mDatabase.getClusterContents( mSystemKey, query );
+
+ for( int i=0; i<ids.length; i++ )
+ {
+
+ result += ids[i];
+
+ if( i != ids.length-1 )
+ result += ",";
+ }
+ }
+ //****************************************************************
+ else
+ {
+ C2KLocalObject obj = mDatabase.get( mSystemKey, xpath, null );
+
+ result = CastorXMLUtility.marshall(obj);
+ }
+
+ }
+ catch (ObjectNotFoundException ex) {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ Logger.error("ActiveEntity::queryData("+mSystemKey+") - " +
+ xpath + " FAILED");
+ Logger.error(ex);
+ result = "<ERROR/>";
+ }
+
+ Logger.msg(7, "ActiveEntity::queryData("+mSystemKey+") - result:" + result );
+
+ return result;
+ }
+
+
+
+ /**
+ * Called by an activity when it reckons we need to update our joblist for it
+ */
+
+ public synchronized void refreshJobList(int sysKey, String stepPath, String newJobs) {
+ try {
+ JobArrayList newJobList = (JobArrayList)CastorXMLUtility.unmarshall(newJobs);
+
+ // get our joblist
+ if (currentJobs == null)
+ currentJobs = new JobList( mSystemKey, null);
+
+ // remove old jobs for this item
+ currentJobs.removeJobsForStep( sysKey, stepPath );
+
+ // merge new jobs in
+ for (Iterator iter = newJobList.list.iterator(); iter.hasNext();) {
+ Job newJob = (Job)iter.next();
+ Logger.msg(6, "Adding job for "+newJob.getItemSysKey()+"/"+newJob.getStepPath()+":"+newJob.getPossibleTransition());
+ currentJobs.addJob(newJob);
+ }
+
+ } catch (Throwable ex) {
+ Logger.error("Could not refresh job list.");
+ Logger.error(ex);
+ }
+
+ }
+
+ public void addRole(String roleName) throws CannotManageException, ObjectNotFoundException {
+ RolePath newRole = Gateway.getLDAPLookup().getRoleManager().getRolePath(roleName);
+ try {
+ newRole.addAgent(new AgentPath(mSystemKey));
+ } catch (InvalidEntityPathException ex) {
+ throw new CannotManageException("Invalid syskey for agent: "+mSystemKey, "");
+ } catch (ObjectCannotBeUpdated ex) {
+ throw new CannotManageException("Could not update role");
+ }
+ }
+
+ public void removeRole(String roleName) throws CannotManageException, ObjectNotFoundException {
+ RolePath rolePath = Gateway.getLDAPLookup().getRoleManager().getRolePath(roleName);
+ try {
+ rolePath.removeAgent(new AgentPath(mSystemKey));
+ } catch (InvalidEntityPathException e) {
+ throw new CannotManageException("Invalid syskey for agent: "+mSystemKey, "");
+ } catch (ObjectCannotBeUpdated ex) {
+ throw new CannotManageException("Could not update role");
+ }
+ }
+ /**
+ *
+ */
+ protected void finalize() throws Throwable {
+ Logger.msg(7, "Agent "+mSystemKey+" reaped");
+ Gateway.getStorage().clearCache(mSystemKey, null);
+ super.finalize();
+ }
+
+}
diff --git a/source/com/c2kernel/entity/agent/ActiveLocator.java b/source/com/c2kernel/entity/agent/ActiveLocator.java
new file mode 100755
index 0000000..25324ee
--- /dev/null
+++ b/source/com/c2kernel/entity/agent/ActiveLocator.java
@@ -0,0 +1,87 @@
+/**************************************************************************
+ * TraceableLocator
+ *
+ * $Workfile$
+ * $Revision: 1.9 $
+ * $Date: 2005/10/05 07:39:36 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.agent;
+
+
+import java.sql.Timestamp;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.Logger;
+
+
+/**************************************************************************
+ *
+ * @author $Author: abranson $ $Date: 2005/10/05 07:39:36 $
+ * @version $Revision: 1.9 $
+ **************************************************************************/
+public class ActiveLocator extends org.omg.PortableServer.ServantLocatorPOA
+{
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ private org.omg.PortableServer.POA mParentPoa;
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public ActiveLocator( org.omg.PortableServer.POA poa )
+ {
+ mParentPoa = poa;
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public org.omg.PortableServer.Servant preinvoke(
+ byte[] oid,
+ org.omg.PortableServer.POA poa,
+ String operation,
+ org.omg.PortableServer.ServantLocatorPackage.CookieHolder cookie )
+ {
+
+ try
+ {
+ int syskey = Integer.parseInt(new String(oid));
+
+ org.omg.PortableServer.Servant servant;
+
+ Logger.msg(1,"===========================================================");
+ Logger.msg(1,"Agent called at "+new Timestamp( System.currentTimeMillis()) +": " + operation +
+ "(" + syskey + ")." );
+
+ return Gateway.getCorbaServer().getAgent(syskey, mParentPoa);
+
+ }
+ catch (ObjectNotFoundException ex)
+ {
+ Logger.error("ObjectNotFoundException::ActiveLocator::preinvoke() "+ex.toString());
+ throw new org.omg.CORBA.OBJECT_NOT_EXIST();
+ }
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public void postinvoke(
+ byte[] oid,
+ org.omg.PortableServer.POA poa,
+ String operation,
+ java.lang.Object the_cookie,
+ org.omg.PortableServer.Servant the_servant )
+ {
+ }
+}
diff --git a/source/com/c2kernel/entity/agent/Job.java b/source/com/c2kernel/entity/agent/Job.java
new file mode 100755
index 0000000..12423d6
--- /dev/null
+++ b/source/com/c2kernel/entity/agent/Job.java
@@ -0,0 +1,343 @@
+package com.c2kernel.entity.agent;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.entity.proxy.AgentProxy;
+import com.c2kernel.entity.proxy.ItemProxy;
+import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
+import com.c2kernel.lookup.EntityPath;
+import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.persistency.outcome.Outcome;
+import com.c2kernel.persistency.outcome.Viewpoint;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.CastorHashMap;
+import com.c2kernel.utils.KeyValuePair;
+import com.c2kernel.utils.Logger;
+
+/*******************************************************************************
+ * @author $Author: abranson $ $Date: 2005/05/20 13:07:49 $
+ * @version $Revision: 1.62 $
+ ******************************************************************************/
+public class Job implements C2KLocalObject
+{
+ private int mID;
+
+ private String mName;
+
+ private int mItemSysKey;
+
+ private String mStepPath;
+
+ private int mPossibleTransition;
+
+ private int mCurrentState;
+
+ private int mTargetState;
+
+ private String mStepName;
+
+ private int mAgentId = -1;
+
+ private String mAgentName;
+
+ private String mAgentRole;
+
+ private CastorHashMap mActProps = new CastorHashMap();
+
+ private String mOutcome;
+
+ private String mStepType;
+
+ private ItemProxy item = null;
+
+ private AgentProxy agent = null;
+
+ /***************************************************************************
+ * Empty constructor for Castor
+ **************************************************************************/
+ public Job()
+ {
+ }
+
+ /***************************************************************************
+ *
+ **************************************************************************/
+ public Job(int sysKey, String path, int transition, int currState, int targState, String stepName, CastorHashMap actProps, String stepType, String agentName)
+ {
+ setItemSysKey(sysKey);
+ setStepPath(path);
+ setPossibleTransition(transition);
+ setCurrentState(currState);
+ setTargetState(targState);
+ setStepName(stepName);
+ setActProps(actProps);
+ setStepType(stepType);
+ setAgentName(agentName);
+ }
+
+ public int getItemSysKey()
+ {
+ return mItemSysKey;
+ }
+
+ public ItemProxy getItemProxy() throws ObjectNotFoundException, InvalidEntityPathException
+ {
+ if (item == null)
+ item = (ItemProxy) Gateway.getProxyManager().getProxy(new EntityPath(mItemSysKey));
+ return item;
+ }
+
+ public AgentProxy getAgentProxy() throws ObjectNotFoundException, InvalidEntityPathException
+ {
+ if (agent == null)
+ agent = (AgentProxy) Gateway.getProxyManager().getProxy(new EntityPath(getAgentId()));
+ return agent;
+ }
+
+ public String getDescription()
+ {
+ String desc = (String) mActProps.get("Description");
+ if (desc == null)
+ desc = "No Description";
+ return desc;
+ }
+
+ public String getSchemaType()
+ {
+ return (String) mActProps.get("SchemaType");
+ }
+
+ public int getSchemaVersion()
+ {
+ try
+ {
+ return Integer.parseInt((String) mActProps.get("SchemaVersion"));
+ } catch (NumberFormatException ex)
+ {
+ return -1;
+ }
+ }
+
+ public void setOutcome(String outcome)
+ {
+ mOutcome = outcome;
+ }
+
+ public String getOutcomeString()
+ {
+ Logger.debug(8, "getOutcomeString() " + (mOutcome == null && isOutcomeUsed()));
+ if (mOutcome == null && isOutcomeUsed())
+ {
+ String viewName = (String) getActProp("Viewpoint");
+ mOutcome = null;
+ if (viewName.length() > 0)
+ try
+ {
+ Viewpoint view = (Viewpoint) Gateway.getStorage().get(getItemSysKey(), ClusterStorage.VIEWPOINT + "/" + getSchemaType() + "/" + viewName, null);
+ mOutcome = view.getOutcome().getData();
+ } catch (Exception ex)
+ { // not found, return null
+ }
+ }
+ return mOutcome;
+ }
+
+ public Outcome getOutcome()
+ {
+ Logger.msg(1, "Get outcome");
+ return new Outcome(-1, getOutcomeString(), getSchemaType(), getSchemaVersion());
+ }
+
+ public int getAgentId() throws ObjectNotFoundException
+ {
+ if (mAgentId == -1)
+ mAgentId = Gateway.getLDAPLookup().getRoleManager().getAgentPath(getAgentName()).getSysKey();
+ return mAgentId;
+ }
+
+ public void setAgentId(int id)
+ {
+ mAgentId = id;
+ agent = null;
+ }
+
+ public String getAgentName()
+ {
+ if (mAgentName == null)
+ mAgentName = (String) mActProps.get("AgentName");
+ return mAgentName;
+ }
+
+ public void setAgentName(String agentName)
+ {
+ mAgentName = agentName;
+ }
+
+ public String getAgentRole()
+ {
+ if (mAgentRole == null)
+ mAgentRole = (String) mActProps.get("Agent Role");
+ return mAgentRole;
+ }
+
+ public void setAgentRole(String role)
+ {
+ mAgentRole = role;
+ }
+
+ public boolean isOutcomeUsed()
+ {
+ String schemaType = getSchemaType();
+ return (Boolean.TRUE.equals(getActProp("AlwaysUseOutcome")) || mPossibleTransition == Transitions.DONE || mPossibleTransition == Transitions.COMPLETE) && !(schemaType == null || schemaType.equals(""));
+ }
+
+ public int getID()
+ {
+ return mID;
+ }
+
+ public String getName()
+ {
+ return mName;
+ }
+
+ public void setID(int id)
+ {
+ mID = id;
+ mName = String.valueOf(id);
+ }
+
+ public void setName(String name)
+ {
+ mName = name;
+ try
+ {
+ mID = Integer.parseInt(name);
+ } catch (NumberFormatException ex)
+ {
+ mID = -1;
+ }
+ }
+
+ public void setItemSysKey(int sysKey)
+ {
+ mItemSysKey = sysKey;
+ item = null;
+ }
+
+ public String getClusterType()
+ {
+ return ClusterStorage.JOB;
+ }
+
+ public boolean equals(Job job)
+ {
+ return (getItemSysKey() == job.getItemSysKey()) && this.mStepPath.equals(job.mStepPath) && this.mPossibleTransition == job.mPossibleTransition;
+ }
+
+ public Object getActProp(String name)
+ {
+ return mActProps.get(name);
+ }
+
+ public String getActPropString(String name)
+ {
+ try
+ {
+ return mActProps.get(name).toString();
+ } catch (NullPointerException ex)
+ {
+ return null;
+ }
+ }
+
+ public String getStepName()
+ {
+ return mStepName;
+ }
+
+ public void setStepName(String string)
+ {
+ mStepName = string;
+ }
+
+ public String getStepPath()
+ {
+ return mStepPath;
+ }
+
+ public void setStepPath(String string)
+ {
+ mStepPath = string;
+ }
+
+ public int getPossibleTransition()
+ {
+ return mPossibleTransition;
+ }
+
+ public void setPossibleTransition(int lint)
+ {
+ mPossibleTransition = lint;
+ }
+
+ public CastorHashMap getActProps()
+ {
+ return mActProps;
+ }
+
+ public void setActProps(CastorHashMap map)
+ {
+ mActProps = map;
+ }
+
+ public KeyValuePair[] getKeyValuePairs()
+ {
+ return mActProps.getKeyValuePairs();
+ }
+
+ public void setKeyValuePairs(KeyValuePair[] pairs)
+ {
+ mActProps.setKeyValuePairs(pairs);
+ }
+
+ public int getCurrentState()
+ {
+ return mCurrentState;
+ }
+
+ public void setCurrentState(int string)
+ {
+ mCurrentState = string;
+ }
+
+ public int getTargetState() {
+ return mTargetState;
+ }
+
+ public void setTargetState(int mTargetState) {
+ this.mTargetState = mTargetState;
+ }
+
+ /**
+ * Returns the actType.
+ *
+ * @return String
+ */
+ public String getStepType()
+ {
+ return mStepType;
+ }
+
+ /**
+ * Sets the actType.
+ *
+ * @param actType
+ * The actType to set
+ */
+ public void setStepType(String actType)
+ {
+ mStepType = actType;
+ }
+} \ No newline at end of file
diff --git a/source/com/c2kernel/entity/agent/JobArrayList.java b/source/com/c2kernel/entity/agent/JobArrayList.java
new file mode 100755
index 0000000..b6eb2c2
--- /dev/null
+++ b/source/com/c2kernel/entity/agent/JobArrayList.java
@@ -0,0 +1,30 @@
+/**************************************************************************
+ *
+ * $Revision: 1.2 $
+ * $Date: 2003/06/20 11:44:30 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.agent;
+
+import java.util.ArrayList;
+
+import com.c2kernel.utils.CastorArrayList;
+
+public class JobArrayList extends CastorArrayList
+{
+
+ public JobArrayList()
+ {
+ super();
+ }
+
+ public JobArrayList(ArrayList aList)
+ {
+ super(aList);
+ }
+
+
+}
diff --git a/source/com/c2kernel/entity/agent/JobList.java b/source/com/c2kernel/entity/agent/JobList.java
new file mode 100755
index 0000000..d0d05d5
--- /dev/null
+++ b/source/com/c2kernel/entity/agent/JobList.java
@@ -0,0 +1,125 @@
+package com.c2kernel.entity.agent;
+
+import java.util.Iterator;
+import java.util.Vector;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.persistency.RemoteMap;
+import com.c2kernel.utils.Logger;
+
+
+/**************************************************************************
+*
+* @author $Author: abranson $ $Date: 2006/03/03 13:52:21 $
+* @version $Revision: 1.15 $
+***************************************************************************/
+public class JobList extends RemoteMap
+{
+
+ /**************************************************************************
+ * Empty constructor for Castor
+ **************************************************************************/
+ public JobList(int sysKey, Object locker)
+ throws ObjectNotFoundException, InvalidEntityPathException
+ {
+ super(sysKey, ClusterStorage.JOB, locker);
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public void addJob( Job job )
+ {
+ synchronized(this) {
+ int jobId = getLastId()+1;
+ job.setID(jobId);
+ put(String.valueOf(jobId), job);
+ }
+ }
+
+ /**
+ * Cannot be stored
+ */
+ public String getClusterType() {
+ return null;
+ }
+
+ public int containsJob( Job job )
+ {
+ Iterator actMembers = keySet().iterator();
+ Job j = null;
+
+ while( actMembers.hasNext() )
+ {
+ j = (Job)actMembers.next();
+
+ if( j.equals(job) )
+ return j.getID();
+ }
+
+ return -1;
+ }
+
+ public Job getJob(int id) {
+ return (Job)get(String.valueOf(id));
+ }
+
+
+ /**
+ * @param job
+ */
+ public void removeJobsWithSysKey( int sysKey )
+ {
+ Iterator currentMembers = values().iterator();
+ Job j = null;
+
+ while( currentMembers.hasNext() )
+ {
+ j = (Job)currentMembers.next();
+
+ if( j.getItemSysKey() == sysKey )
+ remove( String.valueOf(j.getID()) );
+ }
+
+ Logger.msg(5, "JobList::removeJobsWithSysKey() - " + sysKey + " DONE." );
+ }
+
+ public void removeJobsForStep( int sysKey, String stepPath )
+ {
+ Iterator currentMembers = values().iterator();
+ while( currentMembers.hasNext() )
+ {
+ Job j = (Job)currentMembers.next();
+ if( j.getItemSysKey() == sysKey && j.getStepPath().equals(stepPath))
+ remove( String.valueOf(j.getID()) );
+ }
+
+ Logger.msg(5, "JobList::removeJobsForStep() - " + sysKey + " DONE." );
+ }
+ /**
+ * @param itemKey
+ * @param string
+ * @return
+ */
+ public Vector getJobsOfSysKey(int sysKey)
+ {
+ Iterator currentMembers = values().iterator();
+ Job j = null;
+ Vector jobs = new Vector();
+
+ while( currentMembers.hasNext() )
+ {
+ j = (Job)currentMembers.next();
+
+ if( j.getItemSysKey() == sysKey )
+ jobs.add(j);
+ }
+
+ Logger.msg(5, "JobList::getJobsOfSysKey() - returning " + jobs.size() + " Jobs." );
+
+ return jobs;
+ }
+} \ No newline at end of file
diff --git a/source/com/c2kernel/entity/proxy/AgentProxy.java b/source/com/c2kernel/entity/proxy/AgentProxy.java
new file mode 100755
index 0000000..af77031
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/AgentProxy.java
@@ -0,0 +1,301 @@
+/**************************************************************************
+ * AgentProxy.java
+ *
+ * $Revision: 1.37 $
+ * $Date: 2005/10/05 07:39:36 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.proxy;
+
+import java.util.Date;
+import java.util.Enumeration;
+
+import com.c2kernel.common.AccessRightsException;
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.InvalidTransitionException;
+import com.c2kernel.common.ObjectAlreadyExistsException;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.common.PersistencyException;
+import com.c2kernel.entity.Agent;
+import com.c2kernel.entity.AgentHelper;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.entity.ManageableEntity;
+import com.c2kernel.entity.agent.Job;
+import com.c2kernel.lifecycle.instance.predefined.PredefinedStep;
+import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.DomainPath;
+import com.c2kernel.lookup.EntityPath;
+import com.c2kernel.lookup.InvalidEntityPathException;
+import com.c2kernel.lookup.Path;
+import com.c2kernel.persistency.outcome.OutcomeValidator;
+import com.c2kernel.persistency.outcome.Schema;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.scripting.ErrorInfo;
+import com.c2kernel.scripting.Script;
+import com.c2kernel.scripting.ScriptingEngineException;
+import com.c2kernel.utils.CastorXMLUtility;
+import com.c2kernel.utils.LocalObjectLoader;
+import com.c2kernel.utils.Logger;
+
+/******************************************************************************
+ * It is a wrapper for the connection and communication with Agent
+ * It caches data loaded from the Agent to reduce communication
+ *
+ * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $
+ * @author $Author: abranson $
+ ******************************************************************************/
+public class AgentProxy extends EntityProxy
+{
+ AgentPath path;
+
+ /**************************************************************************
+ * Creates an AgentProxy without cache and change notification
+ **************************************************************************/
+ public AgentProxy( org.omg.CORBA.Object ior,
+ int systemKey)
+ throws ObjectNotFoundException
+ {
+ super(ior, systemKey);
+ try {
+ path = new AgentPath(systemKey);
+ } catch (InvalidEntityPathException e) {
+ throw new ObjectNotFoundException();
+ }
+ }
+
+ public ManageableEntity narrow() throws ObjectNotFoundException
+ {
+ try {
+ return AgentHelper.narrow(mIOR);
+ } catch (org.omg.CORBA.BAD_PARAM ex) { }
+ throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down.");
+ }
+ /**************************************************************************
+ *
+ *
+ **************************************************************************/
+ public void initialise( String agentProps, String collector )
+ throws AccessRightsException,
+ InvalidDataException,
+ PersistencyException,
+ ObjectNotFoundException
+ {
+ Logger.msg(7, "AgentProxy::initialise - started");
+
+ ((Agent)getEntity()).initialise( agentProps );
+ }
+
+ public AgentPath getPath() {
+ return path;
+ }
+
+ /**
+ * Executes a job on the given item using this agent.
+ *
+ * @param item - item holding this job
+ * @param job - the job to execute
+ */
+ public void execute(ItemProxy item, Job job)
+ throws AccessRightsException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ InvalidDataException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ OutcomeValidator validator = null;
+ String scriptName = job.getActPropString("ScriptName");
+ Date startTime = new Date();
+ Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName());
+ // get the outcome validator if present
+ if (job.isOutcomeUsed())
+ {
+
+ // get schema info from act props
+ String schemaName = job.getActPropString("SchemaType");
+ int schemaVersion;
+ try {
+ schemaVersion = Integer.parseInt(job.getActPropString("SchemaVersion"));
+ } catch (Exception e) {
+ throw new InvalidDataException(e.getClass().getName()+" extracing schema version", "");
+ }
+ Logger.msg(5, "AgentProxy - fetching schema "+schemaName+"_"+schemaVersion+" for validation");
+ // retrieve schema
+ Schema schema = LocalObjectLoader.getSchema(schemaName, schemaVersion);
+
+ if (schema == null)
+ throw new InvalidDataException("Job references outcome type "+schemaName+" version "+schemaVersion+" that does not exist in this centre.", "");
+
+ try {
+ validator = OutcomeValidator.getValidator(schema);
+ } catch (Exception e) {
+ throw new InvalidDataException("Could not create validator: "+e.getMessage(), "");
+ }
+ }
+
+ if(scriptName != null && scriptName.length() > 0 &&
+ (job.getPossibleTransition() == Transitions.DONE || job.getPossibleTransition() == Transitions.COMPLETE)) {
+ Logger.msg(3, "AgentProxy - executing script "+scriptName);
+ try {
+
+ // pre-validate outcome from script if there is one
+ if (job.getOutcomeString()!= null && validator != null) {
+ Logger.msg(5, "AgentProxy - validating outcome before script execution");
+ String error = validator.validate(job.getOutcomeString());
+ if (error.length() > 0) {
+ Logger.error("Outcome not valid: \n " + error);
+ throw new InvalidDataException(error, "");
+ }
+ }
+
+ // load script
+ ErrorInfo scriptErrors = (ErrorInfo)callScript(item, job);
+
+ if (scriptErrors.getFatal()) {
+ Logger.msg(3, "AgentProxy - fatal script error");
+ Logger.error(scriptErrors.getErrors());
+ throw new InvalidDataException("Fatal Script Error: \n"+scriptErrors.getErrors(), "");
+ }
+ if (scriptErrors.getErrors().length() > 0)
+ Logger.warning("Script errors: "+scriptErrors.getErrors());
+ } catch (ScriptingEngineException ex) {
+ Logger.error(ex);
+ throw new InvalidDataException(ex.getMessage(), "");
+ }
+ }
+
+ if (job.isOutcomeUsed()) {
+ Logger.msg(3, "AgentProxy - validating outcome");
+ String error = validator.validate(job.getOutcomeString());
+ if (error.length() > 0)
+ throw new InvalidDataException(error, "");
+ }
+
+ job.setAgentId(getSystemKey());
+ Logger.msg(3, "AgentProxy - submitting job to item proxy");
+ item.requestAction(job);
+ if (Logger.doLog(3)) {
+ Date timeNow = new Date();
+ long secsNow = (timeNow.getTime()-startTime.getTime())/1000;
+ Logger.msg(3, "Execution took "+secsNow+" seconds");
+ }
+ }
+
+ public Object callScript(ItemProxy item, Job job) throws ScriptingEngineException {
+ Script script = new Script(item, this, job);
+ return script.execute();
+ }
+
+ /**
+ * Standard execution of jobs. Note that this method should always be the one used from clients - all execution
+ * parameters are taken from the job where they're probably going to be correct.
+ *
+ * @param job
+ * @throws AccessRightsException
+ * @throws InvalidDataException
+ * @throws InvalidTransitionException
+ * @throws ObjectNotFoundException
+ * @throws PersistencyException
+ * @throws ObjectAlreadyExistsException
+ */
+ public void execute(Job job)
+ throws AccessRightsException,
+ InvalidDataException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ try {
+ ItemProxy targetItem = (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(job.getItemSysKey()));
+ execute(targetItem, job);
+ } catch (InvalidEntityPathException e) {
+ throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), "");
+ }
+ }
+
+ public void execute(ItemProxy item, String predefStep, C2KLocalObject obj)
+ throws AccessRightsException,
+ InvalidDataException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ String param;
+ try {
+ param = marshall(obj);
+ } catch (Exception ex) {
+ Logger.error(ex);
+ throw new InvalidDataException("Error on marshall", "");
+ }
+ execute(item, predefStep, param);
+ }
+
+ public void execute(ItemProxy item, String predefStep, String param)
+ throws AccessRightsException,
+ InvalidDataException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ String[] params = new String[1];
+ params[0] = param;
+ execute(item, predefStep, params);
+ }
+
+ public void execute(ItemProxy item, String predefStep, String[] params)
+ throws AccessRightsException,
+ InvalidDataException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, Transitions.DONE, PredefinedStep.bundleData(params));
+ }
+
+ /** Wrappers for scripts */
+ public String marshall(Object obj) throws Exception {
+ return CastorXMLUtility.marshall(obj);
+ }
+
+ public Object unmarshall(String obj) throws Exception {
+ return CastorXMLUtility.unmarshall(obj);
+ }
+
+ /** Let scripts resolve items */
+ public ItemProxy searchItem(String name) throws ObjectNotFoundException {
+ Enumeration results = Gateway.getLDAPLookup().search(new DomainPath(""),name);
+
+ Path returnPath = null;
+ if (!results.hasMoreElements())
+ throw new ObjectNotFoundException(name, "");
+
+ while(results.hasMoreElements()) {
+ Path nextMatch = (Path)results.nextElement();
+ if (returnPath != null && nextMatch.getSysKey() != -1 && returnPath.getSysKey() != nextMatch.getSysKey())
+ throw new ObjectNotFoundException("Too many items with that name");
+ returnPath = nextMatch;
+ }
+
+ return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath);
+ }
+
+ public ItemProxy getItem(String path) throws ObjectNotFoundException {
+ return (getItem(new DomainPath(path)));
+ }
+
+ public ItemProxy getItem(DomainPath path) throws ObjectNotFoundException {
+ return (ItemProxy)Gateway.getProxyManager().getProxy(path);
+ }
+
+ public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException {
+ return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey));
+ }
+}
diff --git a/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java b/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java
new file mode 100755
index 0000000..e09178d
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java
@@ -0,0 +1,18 @@
+package com.c2kernel.entity.proxy;
+
+import com.c2kernel.lookup.DomainPath;
+
+/**************************************************************************
+ *
+ * $Revision: 1.1 $
+ * $Date: 2004/02/05 16:11:57 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+public interface DomainPathSubscriber {
+
+ public void pathAdded(DomainPath path);
+ public void pathRemoved(DomainPath path);
+}
diff --git a/source/com/c2kernel/entity/proxy/EntityProxy.java b/source/com/c2kernel/entity/proxy/EntityProxy.java
new file mode 100755
index 0000000..a5b6822
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/EntityProxy.java
@@ -0,0 +1,246 @@
+/**************************************************************************
+ * EntityProxy.java
+ *
+ * $Revision: 1.35 $
+ * $Date: 2005/05/10 11:40:09 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.proxy;
+
+import java.util.*;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.entity.*;
+import com.c2kernel.persistency.*;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.property.Property;
+import com.c2kernel.utils.*;
+
+
+/******************************************************************************
+* It is a wrapper for the connection and communication with Entities.
+* It can cache data loaded from the Entity to reduce communication with it.
+* This cache is syncronised with corresponding Entity through an event mechanism.
+*
+* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $
+* @author $Author: abranson $
+******************************************************************************/
+
+abstract public class EntityProxy implements ManageableEntity
+{
+
+ protected ManageableEntity mEntity = null;
+ protected org.omg.CORBA.Object mIOR;
+ protected int mSystemKey;
+ private HashMap mSubscriptions;
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ protected EntityProxy( org.omg.CORBA.Object ior,
+ int systemKey)
+ throws ObjectNotFoundException
+ {
+ Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity");
+
+ initialise( ior, systemKey);
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ private void initialise( org.omg.CORBA.Object ior,
+ int systemKey)
+ throws ObjectNotFoundException
+ {
+ Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity");
+
+ mIOR = ior;
+ mSystemKey = systemKey;
+ mSubscriptions = new HashMap();
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public ManageableEntity getEntity() throws ObjectNotFoundException
+ {
+ if (mEntity == null) {
+ mEntity = narrow();
+ }
+ return mEntity;
+ }
+
+ abstract public ManageableEntity narrow() throws ObjectNotFoundException;
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ //check who is using.. and if toString() is sufficient
+ public int getSystemKey()
+ {
+ return mSystemKey;
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public String queryData( String path )
+ throws ObjectNotFoundException
+ {
+
+ try {
+ Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path);
+ if (path.endsWith("all")) {
+ Logger.msg(7, "EntityProxy.queryData() - listing contents");
+ String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3));
+ StringBuffer retString = new StringBuffer();
+ for (int i = 0; i < result.length; i++) {
+ retString.append(result[i]);
+ if (i<result.length-1) retString.append(",");
+ }
+ Logger.msg(7, "EntityProxy.queryData() - "+retString.toString());
+ return retString.toString();
+ }
+ C2KLocalObject target = Gateway.getStorage().get(mSystemKey, path, null);
+ return CastorXMLUtility.marshall(target);
+ } catch (ObjectNotFoundException e) {
+ throw e;
+ } catch (Exception e) {
+ Logger.error(e);
+ return "<ERROR>"+e.getMessage()+"</ERROR>";
+ }
+ }
+
+ public String[] getContents( String path ) throws ObjectNotFoundException {
+ try {
+ return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()));
+ } catch (ClusterStorageException e) {
+ throw new ObjectNotFoundException(e.toString());
+ }
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public C2KLocalObject getObject( String xpath )
+ throws ObjectNotFoundException
+ {
+ // load from storage, falling back to proxy loader if not found in others
+ try
+ {
+ return Gateway.getStorage().get( mSystemKey, xpath , this);
+ }
+ catch( ClusterStorageException ex )
+ {
+ Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath);
+ throw new ObjectNotFoundException( ex.toString() );
+ }
+ }
+
+
+
+ public String getProperty( String name )
+ throws ObjectNotFoundException
+ {
+ Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey);
+ Property prop = (Property)getObject("Property/"+name);
+ try
+ {
+ return prop.getValue();
+ }
+ catch (NullPointerException ex)
+ {
+ throw new ObjectNotFoundException();
+ }
+ }
+
+ public String getName()
+ {
+ try {
+ return getProperty("Name");
+ } catch (ObjectNotFoundException ex) {
+ return null;
+ }
+ }
+
+
+ /**************************************************************************
+ * Subscription methods
+ **************************************************************************/
+
+ public void subscribe (EntityProxyObserver observer,
+ String interest,
+ boolean preload)
+ {
+ MemberSubscription newSub = new MemberSubscription(this, interest, observer, preload);
+ synchronized (this){
+ mSubscriptions.put( newSub, observer );
+ }
+ new Thread(newSub).start();
+ Logger.msg(7, "Subscribed "+observer.getClass().getName()+" for "+interest);
+ }
+
+
+ public void unsubscribe(EntityProxyObserver observer)
+ {
+ synchronized (this){
+ for (Iterator e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription thisSub = (MemberSubscription)e.next();
+ if (mSubscriptions.get( thisSub ) == observer) {
+ e.remove();
+ Logger.msg(7, "Unsubscribed "+observer.getClass().getName());
+ }
+ }
+ }
+ }
+
+ public void dumpSubscriptions(int logLevel) {
+ if (mSubscriptions.size() == 0) return;
+ Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":");
+ synchronized(this) {
+ for (Iterator iter = mSubscriptions.keySet().iterator(); iter.hasNext();) {
+ MemberSubscription element = (MemberSubscription)iter.next();
+ EntityProxyObserver obs = element.getObserver();
+ if (obs != null)
+ Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest);
+ else
+ Logger.msg(logLevel, " Phantom subscription to "+element.interest);
+ }
+ }
+ }
+
+ public void notify(ProxyMessage message) {
+ Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey);
+ synchronized (this){
+ if (!message.getServer().equals(EntityProxyManager.serverName))
+ Gateway.getStorage().clearCache(mSystemKey, message.getPath());
+ for (Iterator e = mSubscriptions.keySet().iterator(); e.hasNext();) {
+ MemberSubscription newSub = (MemberSubscription)e.next();
+ if (newSub.getObserver() == null) { // phantom
+ Logger.msg(4, "Removing phantom subscription to "+newSub.interest);
+ e.remove();
+ }
+ else
+ newSub.update(message.getPath(), message.getState());
+ }
+ }
+ }
+
+ /**
+ * If this is reaped, clear out the cache for it too.
+ */
+ protected void finalize() throws Throwable {
+ Logger.msg(7, "Proxy "+mSystemKey+" reaped");
+ Gateway.getStorage().clearCache(mSystemKey, null);
+ Gateway.getProxyManager().removeProxy(mSystemKey);
+ super.finalize();
+ }
+
+}
diff --git a/source/com/c2kernel/entity/proxy/EntityProxyManager.java b/source/com/c2kernel/entity/proxy/EntityProxyManager.java
new file mode 100755
index 0000000..386bc2c
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/EntityProxyManager.java
@@ -0,0 +1,341 @@
+/**************************************************************************
+ * EntityProxyFactory.java
+ *
+ * $Revision: 1.45 $
+ * $Date: 2005/05/10 11:40:09 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.proxy;
+
+import java.util.*;
+
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.lookup.AgentPath;
+import com.c2kernel.lookup.DomainPath;
+import com.c2kernel.lookup.Path;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.property.Property;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.SoftCache;
+import com.c2kernel.utils.server.SimpleTCPIPServer;
+
+
+public class EntityProxyManager
+{
+ SoftCache proxyPool = new SoftCache(50);
+ HashMap treeSubscribers = new HashMap();
+ HashMap connections = new HashMap();
+
+ // server objects
+ static ArrayList proxyClients = new ArrayList();
+ static SimpleTCPIPServer proxyServer = null;
+ static String serverName = null;
+
+ /**
+ * Create an entity proxy manager to listen for proxy events and reap unused proxies
+ */
+ public EntityProxyManager()
+ {
+ Logger.msg(5, "EntityProxyManager - Starting.....");
+
+ Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers"));
+ while(servers.hasMoreElements()) {
+ Path thisServerPath = (Path)servers.nextElement();
+ try {
+ int syskey = thisServerPath.getSysKey();
+ String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue();
+ String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue();
+ int remotePort = Integer.parseInt(portStr);
+ connectToProxyServer(remoteServer, remotePort);
+
+ } catch (Exception ex) {
+ Logger.error("Exception retrieving proxy server connection data for "+thisServerPath);
+ Logger.error(ex);
+ }
+ }
+ }
+
+ public void connectToProxyServer(String name, int port) {
+ ProxyServerConnection oldConn = (ProxyServerConnection)connections.get(name);
+ if (oldConn != null)
+ oldConn.shutdown();
+ connections.put(name, new ProxyServerConnection(name, port, this));
+ }
+
+
+ protected void resubscribe(ProxyServerConnection conn) {
+ synchronized (proxyPool) {
+ for (Iterator iter = proxyPool.keySet().iterator(); iter.hasNext();) {
+ Integer key = (Integer)iter.next();
+ ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false);
+ Logger.msg(5, "Subscribing to entity "+key);
+ conn.sendMessage(sub);
+ }
+ }
+ }
+
+ /**
+ * @param sub
+ */
+ private void sendMessage(ProxyMessage sub) {
+ for (Iterator iter = connections.values().iterator(); iter.hasNext();) {
+ ProxyServerConnection element = (ProxyServerConnection) iter.next();
+ element.sendMessage(sub);
+ }
+
+ }
+
+ public void shutdown() {
+ Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections");
+ for (Iterator iter = connections.values().iterator(); iter.hasNext();) {
+ ProxyServerConnection element = (ProxyServerConnection) iter.next();
+ element.shutdown();
+ }
+ }
+
+ protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException {
+ if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString());
+
+ if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response
+ return;
+
+ if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info
+ informTreeSubscribers(thisMessage.getState(), thisMessage.getPath());
+ else {
+ // proper proxy message
+ Logger.msg(5, "Received proxy message: "+thisMessage.toString());
+ Integer key = new Integer(thisMessage.getSysKey());
+ EntityProxy relevant = (EntityProxy)proxyPool.get(key);
+ if (relevant == null)
+ Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for.");
+ else
+ try {
+ relevant.notify(thisMessage);
+ } catch (Throwable ex) {
+ Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString());
+ Logger.error(ex);
+ }
+ }
+ }
+
+ private void informTreeSubscribers(boolean state, String path) {
+ DomainPath last = new DomainPath(path);
+ DomainPath parent; boolean first = true;
+ synchronized(treeSubscribers) {
+ while((parent = last.getParent()) != null) {
+
+ for (Iterator iter = treeSubscribers.keySet().iterator(); iter.hasNext();) {
+ DomainPathSubscriber sub = (DomainPathSubscriber)iter.next();
+ DomainPath interest = (DomainPath)treeSubscribers.get(sub);
+ if (interest.equals(parent)) {
+ if (state == ProxyMessage.ADDED)
+ sub.pathAdded(last);
+ else if (first)
+ sub.pathRemoved(last);
+ }
+ }
+ last = parent;
+ first = false;
+ }
+ }
+ }
+
+ public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) {
+ synchronized(treeSubscribers) {
+ treeSubscribers.put(sub, interest);
+ }
+ }
+
+ public void unsubscribeTree(DomainPathSubscriber sub) {
+ synchronized(treeSubscribers) {
+ treeSubscribers.remove(sub);
+ }
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ private EntityProxy createProxy( org.omg.CORBA.Object ior,
+ int systemKey,
+ boolean isItem )
+ throws ObjectNotFoundException
+ {
+
+ EntityProxy newProxy = null;
+
+ Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey);
+
+ if( isItem )
+ {
+ newProxy = new ItemProxy(ior, systemKey);
+ }
+ else
+ {
+ newProxy = new AgentProxy(ior, systemKey);
+ }
+
+ // subscribe to changes from server
+ ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false);
+ sendMessage(sub);
+ reportCurrentProxies(9);
+ return ( newProxy );
+ }
+
+ protected void removeProxy( int systemKey )
+ {
+ ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true);
+ Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey);
+ sendMessage(sub);
+ }
+
+
+ /**************************************************************************
+ * EntityProxy getProxy( ManageableEntity, SystemKey)
+ *
+ * Called by the other GetProxy methods. Fills in either the ior or the
+ * SystemKey
+ **************************************************************************/
+ private EntityProxy getProxy( org.omg.CORBA.Object ior,
+ int systemKey,
+ boolean isItem )
+ throws ObjectNotFoundException
+ {
+ Integer key = new Integer(systemKey);
+
+ synchronized(proxyPool) {
+ EntityProxy newProxy;
+ // return it if it exists
+ newProxy = (EntityProxy)proxyPool.get(key);
+ if (newProxy == null) {
+ // create a new one
+ newProxy = createProxy(ior, systemKey, isItem );
+ proxyPool.put(key, newProxy);
+ }
+ return newProxy;
+
+ }
+ }
+
+ /**************************************************************************
+ * EntityProxy getProxy( String )
+ *
+ * Proxy from Alias
+ **************************************************************************/
+ public EntityProxy getProxy( Path path )
+ throws ObjectNotFoundException
+ {
+
+ //convert namePath to dn format
+ Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")");
+ boolean isItem = !(path.getEntity() instanceof AgentPath);
+ return getProxy( Gateway.getLDAPLookup().getIOR(path),
+ path.getSysKey(),
+ isItem );
+
+ }
+
+ /**************************************************************************
+ * void reportCurrentProxies()
+ *
+ * A utility to Dump the current proxies loaded
+ **************************************************************************/
+ public void reportCurrentProxies(int logLevel)
+ {
+ if (!Logger.doLog(logLevel)) return;
+ Logger.msg(logLevel, "Current proxies: ");
+ try {
+ synchronized(proxyPool) {
+ Iterator i = proxyPool.keySet().iterator();
+
+ for( int count=0; i.hasNext(); count++ )
+ {
+ Integer nextProxy = (Integer)i.next();
+ EntityProxy thisProxy = (EntityProxy)proxyPool.get(nextProxy);
+ if (thisProxy != null)
+ Logger.msg(logLevel,
+ "" + count + ": "
+ + proxyPool.get(nextProxy).getClass().getName()
+ + ": " + nextProxy);
+ }
+ }
+ } catch (ConcurrentModificationException ex) {
+ Logger.msg(logLevel, "Proxy cache modified. Aborting.");
+ }
+ }
+
+
+ /**************************************************************************
+ * Static Proxy Server methods
+ **************************************************************************/
+
+ /**
+ * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops
+ * @param c2kProps
+ */
+ public static void initServer()
+ {
+ Logger.msg(5, "EntityProxyFactory::initServer - Starting.....");
+ String port = Gateway.getProperty("ItemServer.Proxy.port");
+ serverName = Gateway.getProperty("ItemServer.name");
+ if (port == null) {
+ Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes.");
+ return;
+ }
+
+ // set up the proxy server
+ try {
+ int portNo = Integer.parseInt(port);
+ Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port);
+ proxyServer = new SimpleTCPIPServer(portNo, ProxyClientConnection.class, 200);
+ proxyServer.startListening();
+ } catch (Exception ex) {
+ Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes.");
+ Logger.error(ex);
+ }
+ }
+
+ public static void sendProxyEvent(ProxyMessage message) {
+ if (proxyServer != null && message.getPath() != null)
+ synchronized(proxyClients) {
+ for (Iterator iter = proxyClients.iterator(); iter.hasNext();) {
+ ProxyClientConnection client = (ProxyClientConnection)iter.next();
+ client.sendMessage(message);
+ }
+ }
+ }
+
+ public static void reportConnections(int logLevel) {
+ synchronized(proxyClients) {
+ Logger.msg(logLevel, "Currently connected proxy clients:");
+ for (Iterator iter = proxyClients.iterator(); iter.hasNext();) {
+ ProxyClientConnection client = (ProxyClientConnection)iter.next();
+ Logger.msg(logLevel, " "+client);
+ }
+ }
+ }
+
+ public static void shutdownServer() {
+ if (proxyServer != null) {
+ Logger.msg(1, "EntityProxyManager: Closing Server.");
+ proxyServer.stopListening();
+ }
+ }
+
+ public static void registerProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.add(client);
+ }
+ }
+
+ public static void unRegisterProxyClient(ProxyClientConnection client) {
+ synchronized(proxyClients) {
+ proxyClients.remove(client);
+ }
+ }
+}
+
diff --git a/source/com/c2kernel/entity/proxy/EntityProxyObserver.java b/source/com/c2kernel/entity/proxy/EntityProxyObserver.java
new file mode 100755
index 0000000..985143d
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/EntityProxyObserver.java
@@ -0,0 +1,25 @@
+package com.c2kernel.entity.proxy;
+
+import com.c2kernel.entity.C2KLocalObject;
+
+
+
+public interface EntityProxyObserver
+{
+ /**************************************************************************
+ * Subscribed items are broken apart and fed one by one to these methods.
+ * Replacement after an event is done by feeding the new memberbase with the same id.
+ * ID could be an XPath?
+ **************************************************************************/
+ public void add(C2KLocalObject contents);
+
+ /**************************************************************************
+ * the 'type' parameter should be an indication of the type of object
+ * supplied so that the subscriber can associate the call back with
+ * one of its subscriptions. If we go with an Xpath subscription form,
+ * then the id will probably be sufficient.
+ * Should be comparable (substring whatever) with the parameter given to
+ * the subscribe method of ItemProxy.
+ **************************************************************************/
+ public void remove(String id);
+}
diff --git a/source/com/c2kernel/entity/proxy/ItemProxy.java b/source/com/c2kernel/entity/proxy/ItemProxy.java
new file mode 100755
index 0000000..e816a53
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/ItemProxy.java
@@ -0,0 +1,212 @@
+/**************************************************************************
+ * ItemProxy.java
+ *
+ * $Revision: 1.25 $
+ * $Date: 2005/05/10 11:40:09 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.proxy;
+
+import java.util.ArrayList;
+
+import com.c2kernel.common.AccessRightsException;
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.InvalidTransitionException;
+import com.c2kernel.common.ObjectAlreadyExistsException;
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.common.PersistencyException;
+import com.c2kernel.entity.*;
+import com.c2kernel.entity.agent.Job;
+import com.c2kernel.entity.agent.JobArrayList;
+import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
+import com.c2kernel.utils.CastorXMLUtility;
+import com.c2kernel.utils.Logger;
+
+/******************************************************************************
+ * It is a wrapper for the connection and communication with Item
+ * It caches data loaded from the Item to reduce communication
+ *
+ * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $
+ * @author $Author: abranson $
+ ******************************************************************************/
+public class ItemProxy extends EntityProxy
+{
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ protected ItemProxy( org.omg.CORBA.Object ior,
+ int systemKey)
+ throws ObjectNotFoundException
+ {
+ super(ior, systemKey);
+
+ }
+
+ public ManageableEntity narrow() throws ObjectNotFoundException
+ {
+ try {
+ return ItemHelper.narrow(mIOR);
+ } catch (org.omg.CORBA.BAD_PARAM ex) { }
+ throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down.");
+ }
+ /**************************************************************************
+ *
+ *
+ **************************************************************************/
+ public void initialise( int agentId,
+ String itemProps,
+ String workflow )
+ throws AccessRightsException,
+ InvalidDataException,
+ PersistencyException,
+ ObjectNotFoundException
+ {
+ Logger.msg(7, "ItemProxy::initialise - started");
+
+ ((Item)getEntity()).initialise( agentId, itemProps, workflow );
+ }
+
+ public void setProperty(AgentProxy agent, String name, String value)
+ throws AccessRightsException,
+ PersistencyException
+ {
+ String[] params = new String[2];
+ params[0] = name;
+ params[1] = value;
+ try {
+ agent.execute(this, "WriteProperty", params);
+ } catch (AccessRightsException e) {
+ throw (e);
+ } catch (PersistencyException e) {
+ throw (e);
+ } catch (Exception e) {
+ Logger.error(e);
+ throw new PersistencyException("Could not store property");
+ }
+ }
+ /**************************************************************************
+ *
+ **************************************************************************/
+ protected void requestAction( Job thisJob )
+ throws AccessRightsException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ InvalidDataException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ String outcome = thisJob.getOutcomeString();
+ // check fields that should have been filled in
+ if (outcome==null)
+ if (thisJob.isOutcomeUsed())
+ throw new InvalidDataException("Outcome is required.", "");
+ else
+ outcome="";
+
+ if (thisJob.getAgentId() == -1)
+ throw new InvalidDataException("No Agent specified.", "");
+
+ Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName());
+ requestAction (thisJob.getAgentId(), thisJob.getStepPath(),
+ thisJob.getPossibleTransition(), outcome);
+ }
+
+ //requestData is xmlString
+ public void requestAction( int agentId,
+ String stepPath,
+ int transitionID,
+ String requestData
+ )
+ throws AccessRightsException,
+ InvalidTransitionException,
+ ObjectNotFoundException,
+ InvalidDataException,
+ PersistencyException,
+ ObjectAlreadyExistsException
+ {
+ ((Item)getEntity()).requestAction( agentId,
+ stepPath,
+ transitionID,
+ requestData );
+ }
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ public String queryLifeCycle( int agentId,
+ boolean filter
+ )
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException
+ {
+ return ((Item)getEntity()).queryLifeCycle( agentId,
+ filter );
+ }
+
+
+ /**************************************************************************
+ *
+ **************************************************************************/
+ private ArrayList getJobList(int agentId, boolean filter)
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException
+ {
+ JobArrayList thisJobList;
+ try {
+ String jobs = queryLifeCycle(agentId, filter);
+ thisJobList = (JobArrayList)CastorXMLUtility.unmarshall(jobs);
+ }
+ catch (Exception e) {
+ Logger.error("Exception::ItemProxy::getJobList() - Cannot unmarshall the jobs");
+ Logger.error(e);
+ return new ArrayList();
+ }
+ return thisJobList.list;
+ }
+
+ public ArrayList getJobList(AgentProxy agent)
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException
+ {
+ return getJobList(agent.getSystemKey());
+ }
+
+ private ArrayList getJobList(int agentId)
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException
+ {
+ return getJobList(agentId, true);
+ }
+
+ private Job getJobByName(String actName, int agentId)
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException {
+
+ ArrayList jobList = getJobList(agentId);
+ for (Object jobObj : jobList) {
+ Job job = (Job)jobObj;
+ int transition = job.getPossibleTransition();
+ if (job.getStepName().equals(actName))
+ if (transition == Transitions.COMPLETE || transition == Transitions.DONE)
+ return job;
+ }
+ return null;
+
+ }
+
+ public Job getJobByName(String actName, AgentProxy agent)
+ throws AccessRightsException,
+ ObjectNotFoundException,
+ PersistencyException {
+ return getJobByName(actName, agent.getSystemKey());
+ }
+}
diff --git a/source/com/c2kernel/entity/proxy/MemberControl.java b/source/com/c2kernel/entity/proxy/MemberControl.java
new file mode 100755
index 0000000..5f483ae
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/MemberControl.java
@@ -0,0 +1,43 @@
+package com.c2kernel.entity.proxy;
+
+import com.c2kernel.entity.C2KLocalObject;
+
+
+/**
+ * @version $Revision: 1.2 $ $Date: 2004/02/04 11:02:44 $
+ * @author $Author: abranson $
+ */
+
+public class MemberControl implements C2KLocalObject {
+
+ public static final int ERROR = -1;
+ public static final int END = -2;
+ public static MemberControl theEND = new MemberControl(END, null);
+ private int id;
+ private String name;
+
+ public MemberControl(int code, String msg) {
+ this.setID(code);
+ switch (code) {
+ case MemberControl.ERROR:
+ this.setName("ERROR: The path "+msg+" was not found.");
+ break;
+ case MemberControl.END:
+ this.setName("END: End of preload");
+ break;
+ default:
+ this.setName("Unsupported control message code: "+code);
+ }
+ }
+
+ public String toString() {
+ return "MemberControl: "+this.getName();
+ }
+
+ public int getID() { return id; }
+ public void setID(int id) { this.id = id; }
+ public String getName() { return name; }
+ public void setName(String name) { this.name = name; }
+ public String getClusterType() { return null; }
+
+}
diff --git a/source/com/c2kernel/entity/proxy/MemberSubscription.java b/source/com/c2kernel/entity/proxy/MemberSubscription.java
new file mode 100755
index 0000000..fdd3e96
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/MemberSubscription.java
@@ -0,0 +1,121 @@
+
+package com.c2kernel.entity.proxy;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.utils.Logger;
+
+public class MemberSubscription implements Runnable {
+ EntityProxy subject;
+ String interest;
+ // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used
+ WeakReference observerReference;
+ ArrayList contents = new ArrayList();
+ boolean preLoad;
+
+ public MemberSubscription(EntityProxy subject, String interest,
+ EntityProxyObserver observer, boolean preLoad) {
+ setObserver(observer);
+ this.interest = interest;
+ this.subject = subject;
+ this.preLoad = preLoad;
+ }
+
+ public void run() {
+ Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest);
+ if (preLoad) loadChildren();
+ }
+
+ private void loadChildren() {
+ C2KLocalObject newMember;
+ EntityProxyObserver observer = getObserver();
+ if (observer == null) return; //reaped
+ try {
+ // fetch contents of path
+ String children = subject.queryData(interest+"/all");
+ StringTokenizer tok = new StringTokenizer(children, ",");
+ ArrayList newContents = new ArrayList();
+ while (tok.hasMoreTokens())
+ newContents.add(tok.nextToken());
+
+ // look to see what's new
+ for (Iterator iter = newContents.iterator(); iter.hasNext();) {
+ String newChild = (String)iter.next();
+
+ // load child object
+ try {
+ newMember = subject.getObject(interest+"/"+newChild);
+ contents.remove(newChild);
+ } catch (ObjectNotFoundException ex) {
+ newMember = new MemberControl(MemberControl.ERROR, "Listed member "+newChild+" was not found.");
+ }
+ try {
+ observer.add(newMember);
+ } catch (Throwable ex) {
+ Logger.error("Error publishing member to "+observer);
+ Logger.error(ex);
+ }
+ }
+ // report what's left in old contents as deleted
+ for (Iterator iter = contents.iterator(); iter.hasNext();) {
+ String oldChild = (String)iter.next();
+ observer.remove(interest+"/"+oldChild);
+ }
+ //replace contents arraylist
+ contents = newContents;
+ //report that we're done
+ observer.add(MemberControl.theEND);
+ } catch (Exception ex) {
+ observer.add(new MemberControl(MemberControl.ERROR, "Query on "+interest+" failed with "+ex));
+ }
+ }
+
+ public boolean isRelevant(String path) {
+ Logger.msg(7, "Checking relevance of "+path+" to "+interest);
+ return (path.startsWith(interest));
+ }
+
+ public void update(String path, boolean deleted) {
+ EntityProxyObserver observer = getObserver();
+ if (observer == null) return; //reaped
+ Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted);
+ if (!path.startsWith(interest)) // doesn't concern us
+ return;
+
+ if (path.equals(interest)) // refresh contents
+ loadChildren();
+ else {
+ String name = path.substring(interest.length());
+ if (deleted) {
+ Logger.msg(4, "Removing "+path);
+ contents.remove(name);
+ observer.remove(name);
+ }
+ else {
+ try {
+ C2KLocalObject newMember = subject.getObject(path);
+ Logger.msg(4, "Adding "+path);
+ contents.add(name);
+ observer.add(newMember);
+ } catch (ObjectNotFoundException e) {
+ Logger.error("Member Subscription: could not load "+path);
+ Logger.error(e);
+ }
+ }
+ }
+ }
+
+ public void setObserver(EntityProxyObserver observer) {
+ observerReference = new WeakReference(observer);
+ }
+
+ public EntityProxyObserver getObserver() {
+ EntityProxyObserver observer = (EntityProxyObserver)observerReference.get();
+ return observer;
+ }
+}
+
diff --git a/source/com/c2kernel/entity/proxy/ProxyClientConnection.java b/source/com/c2kernel/entity/proxy/ProxyClientConnection.java
new file mode 100755
index 0000000..46f1e3d
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/ProxyClientConnection.java
@@ -0,0 +1,179 @@
+package com.c2kernel.entity.proxy;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.utils.Logger;
+import com.c2kernel.utils.server.SocketHandler;
+
+/**************************************************************************
+ *
+ * $Revision: 1.18 $
+ * $Date: 2005/05/10 11:40:09 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+public class ProxyClientConnection implements SocketHandler {
+
+ Socket clientSocket = null;
+ static int clientId = -1;
+ int thisClientId;
+ ArrayList sysKeys;
+ PrintWriter response;
+ BufferedReader request;
+ boolean closing = false;
+
+ public ProxyClientConnection() {
+ super();
+ thisClientId = ++clientId;
+ EntityProxyManager.registerProxyClient(this);
+ Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready.");
+ }
+
+
+ public String getName() {
+ return "Proxy Client Connection";
+ }
+
+ public boolean isBusy() {
+ return clientSocket != null;
+ }
+
+ public synchronized void setSocket(Socket newSocket) {
+ try {
+ Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort());
+ newSocket.setSoTimeout(500);
+ clientSocket = newSocket;
+ response = new PrintWriter(clientSocket.getOutputStream(), true);
+ sysKeys = new ArrayList();
+ } catch (SocketException ex) {
+ Logger.msg("Could not set socket timeout:");
+ Logger.error(ex);
+ closeSocket();
+ } catch (IOException ex) {
+ Logger.msg("Could not setup output stream:");
+ Logger.error(ex);
+ closeSocket();
+ }
+ }
+
+ /**
+ * Main loop. Reads proxy commands from the client and acts on them.
+ */
+ public void run() {
+ Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress());
+ Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress());
+ try {
+ request = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
+ String input = null;
+ ProxyMessage thisMessage;
+ while (clientSocket != null) {
+ try {
+ input = request.readLine();
+ Logger.msg(9, "ProxyClientConnection "+thisClientId+" - received "+input);
+ thisMessage = new ProxyMessage(input);
+ processMessage(thisMessage);
+ } catch (InterruptedIOException ex) { //timeout
+ } catch (InvalidDataException ex) { // invalid proxy message
+ Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input);
+ }
+
+ }
+ } catch (IOException ex) {
+ if (!closing)
+ Logger.error("ProxyClientConnection "+thisClientId+" - Error reading from socket.");
+ }
+ closeSocket();
+ Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed.");
+ }
+
+ private void processMessage(ProxyMessage message) throws InvalidDataException {
+
+ // proxy disconnection
+ if (message.getPath().equals(ProxyMessage.BYEPATH)) {
+ Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting");
+ closeSocket();
+ }
+
+ // proxy checking connection
+ else if (message.getPath().equals(ProxyMessage.PINGPATH))
+ response.println(ProxyMessage.pingMessage);
+
+ // new subscription to entity changes
+ else if (message.getPath().equals(ProxyMessage.ADDPATH)) {
+ Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getSysKey());
+ synchronized (sysKeys) {
+ sysKeys.add(new Integer(message.getSysKey()));
+ }
+ }
+
+ // remove of subscription to entity changes
+ else if (message.getPath().equals(ProxyMessage.DELPATH)) {
+ synchronized (sysKeys) {
+ sysKeys.remove(new Integer(message.getSysKey()));
+ }
+ Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getSysKey());
+ }
+
+ else // unknown message
+ Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message);
+
+ }
+
+ public synchronized void sendMessage(ProxyMessage message) {
+ if (clientSocket==null) return; // idle
+ boolean relevant = message.getSysKey() == ProxyMessage.NA;
+ synchronized (sysKeys) {
+ for (Iterator iter = sysKeys.iterator(); iter.hasNext() && !relevant;) {
+ Integer thisKey = (Integer)iter.next();
+ if (thisKey.intValue() == message.getSysKey())
+ relevant = true;
+ }
+ }
+ if (!relevant) return; // not for our client
+
+ response.println(message);
+ }
+
+ public void shutdown() {
+ if (isBusy()) {
+ closing = true;
+ Logger.msg("ProxyClientConnection "+thisClientId+" closing.");
+ closeSocket();
+ }
+ }
+
+ public String toString() {
+ if (clientSocket == null) return thisClientId+": idle";
+ else return thisClientId+": "+clientSocket.getInetAddress();
+ }
+
+ private synchronized void closeSocket() {
+ if (clientSocket==null) return;
+ try {
+ request.close();
+ response.close();
+ clientSocket.close();
+ } catch (IOException e) {
+ Logger.error("ProxyClientConnection "+thisClientId+" - Could not close socket.");
+ Logger.error(e);
+ }
+ synchronized (sysKeys) {
+ sysKeys = null;
+ }
+
+ clientSocket = null;
+
+ }
+
+}
diff --git a/source/com/c2kernel/entity/proxy/ProxyMessage.java b/source/com/c2kernel/entity/proxy/ProxyMessage.java
new file mode 100755
index 0000000..66f1f34
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/ProxyMessage.java
@@ -0,0 +1,110 @@
+package com.c2kernel.entity.proxy;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.util.StringTokenizer;
+
+import com.c2kernel.common.InvalidDataException;
+
+
+/**************************************************************************
+ *
+ * $Revision: 1.11 $
+ * $Date: 2005/05/10 11:40:09 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+public class ProxyMessage {
+
+ // special server message paths
+ public static final String BYEPATH = "bye";
+ public static final String ADDPATH = "add";
+ public static final String DELPATH = "del";
+ public static final String PINGPATH = "ping";
+ public static final boolean ADDED = false;
+ public static final boolean DELETED = true;
+ public static final int NA = -1;
+
+ static ProxyMessage byeMessage = new ProxyMessage(NA, BYEPATH, ADDED);
+ static ProxyMessage pingMessage = new ProxyMessage(NA, PINGPATH, ADDED);
+
+ private int sysKey = NA;
+ private String path = "";
+ private String server = null;
+ private boolean state = ADDED;
+
+ public ProxyMessage() {
+ super();
+ }
+ public ProxyMessage(int sysKey, String path, boolean state) {
+ this();
+ setSysKey(sysKey);
+ setPath(path);
+ setState(state);
+ }
+
+ public ProxyMessage(String line) throws InvalidDataException, IOException {
+ if (line == null)
+ throw new IOException("Null proxy message");
+ StringTokenizer tok = new StringTokenizer(line,":");
+ if (tok.countTokens()!=2)
+ throw new InvalidDataException("String '"+line+"' does not constitute a valid proxy message.", "");
+ sysKey = Integer.parseInt(tok.nextToken());
+ path = tok.nextToken();
+ if (path.startsWith("-")) {
+ state = DELETED;
+ path = path.substring(1);
+ }
+ }
+
+ public ProxyMessage(DatagramPacket packet) throws InvalidDataException, IOException {
+ this(new String(packet.getData()));
+ }
+
+ public int getSysKey() {
+ return sysKey;
+ }
+
+ public void setSysKey(int sysKey) {
+ this.sysKey = sysKey;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String newPath) {
+ this.path = newPath;
+ }
+
+ public boolean getState() {
+ return state;
+ }
+
+ public void setState(boolean state) {
+ this.state = state;
+ }
+
+ public String toString() {
+ return sysKey+":"+(state?"-":"")+path;
+ }
+ public DatagramPacket getPacket(ProxySubscriber host) {
+ return getPacket(host.getHost(), host.getPort());
+ }
+
+ public DatagramPacket getPacket(InetAddress host, int port) {
+ byte[] packetString = toString().getBytes();
+ return new DatagramPacket(packetString, packetString.length, host, port);
+ }
+
+ public String getServer() {
+ return server;
+ }
+
+ public void setServer(String server) {
+ this.server = server;
+ }
+}
diff --git a/source/com/c2kernel/entity/proxy/ProxyServerConnection.java b/source/com/c2kernel/entity/proxy/ProxyServerConnection.java
new file mode 100755
index 0000000..191492f
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/ProxyServerConnection.java
@@ -0,0 +1,129 @@
+/**************************************************************************
+ * EntityProxyFactory.java
+ *
+ * $Revision: 1.3 $
+ * $Date: 2005/05/25 12:11:44 $
+ *
+ * Copyright (C) 2001 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+package com.c2kernel.entity.proxy;
+
+import java.io.*;
+import java.net.Socket;
+
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.utils.Logger;
+
+
+public class ProxyServerConnection extends Thread
+{
+
+ public boolean serverIsActive = true;
+ // proxy client details
+ String serverName;
+ int serverPort;
+ Socket serverConnection;
+ EntityProxyManager manager;
+ // for talking to the proxy server
+ PrintWriter serverStream;
+ boolean listening = false;
+ static boolean isServer = false;
+
+ /**
+ * Create an entity proxy manager to listen for proxy events and reap unused proxies
+ */
+ public ProxyServerConnection(String host, int port, EntityProxyManager manager)
+ {
+ Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port);
+ serverName = host;
+ serverPort = port;
+ this.manager = manager;
+ listening = true;
+ start();
+ }
+
+ public void run() {
+ Thread.currentThread().setName("Proxy Client Connection Listener to "+serverName+":"+serverPort);
+ while (listening) {
+ try {
+ if (serverConnection == null) connect();
+ if (serverConnection != null) {
+ BufferedReader request = new BufferedReader(new InputStreamReader(serverConnection.getInputStream()));
+ String input = null;
+ ProxyMessage thisMessage;
+ while (listening && serverConnection != null) {
+ try {
+ input = request.readLine();
+ thisMessage = new ProxyMessage(input);
+ thisMessage.setServer(serverName);
+ manager.processMessage(thisMessage);
+ } catch (InterruptedIOException ex) { // timeout - send a ping
+ sendMessage(ProxyMessage.pingMessage);
+ } catch (InvalidDataException ex) { // invalid proxy message
+ if (input != null)
+ Logger.error("EntityProxyManager - Invalid proxy message: "+input);
+ }
+ }
+ }
+ } catch (IOException ex) {
+ Logger.error("ProxyServerConnection - Disconnected from "+serverName+":"+serverPort);
+ try {
+ serverStream.close();
+ serverConnection.close();
+ } catch (IOException e1) { }
+
+
+ serverStream = null;
+ serverConnection = null;
+ }
+ }
+
+ if (serverStream != null) {
+ try {
+ Logger.msg(1, "Disconnecting from proxy server on "+serverName+":"+serverPort);
+ serverStream.println(ProxyMessage.byeMessage.toString());
+ serverStream.close();
+ serverConnection.close();
+ serverConnection = null;
+ } catch (Exception e) {
+ Logger.error("Error disconnecting from proxy server.");
+ }
+ }
+ }
+
+ public void connect() {
+ Logger.msg(3, "ProxyServerConnection - connecting to proxy server on "+serverName+":"+serverPort);
+ try {
+ serverConnection = new Socket(serverName, serverPort);
+ serverConnection.setKeepAlive(true);
+ serverIsActive = true;
+ serverConnection.setSoTimeout(5000);
+ serverStream = new PrintWriter(serverConnection.getOutputStream(), true);
+ Logger.msg("Connected to proxy server on "+serverName+":"+serverPort);
+ manager.resubscribe(this);
+ } catch (Exception e) {
+ Logger.msg(3, "Could not connect to proxy server. Retrying in 5s");
+ try { Thread.sleep(5000); } catch (InterruptedException ex) { }
+ serverStream = null;
+ serverConnection = null;
+ serverIsActive = false;
+ }
+ }
+
+ public void shutdown() {
+ Logger.msg("Proxy Client: flagging shutdown.");
+ listening = false;
+ }
+
+ /**
+ * @param sub
+ */
+ public void sendMessage(ProxyMessage sub) {
+ if (serverStream != null)
+ serverStream.println(sub);
+ }
+
+}
+
diff --git a/source/com/c2kernel/entity/proxy/ProxySubscriber.java b/source/com/c2kernel/entity/proxy/ProxySubscriber.java
new file mode 100755
index 0000000..67d0a60
--- /dev/null
+++ b/source/com/c2kernel/entity/proxy/ProxySubscriber.java
@@ -0,0 +1,36 @@
+package com.c2kernel.entity.proxy;
+
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.util.ArrayList;
+
+import com.c2kernel.common.InvalidDataException;
+
+/**************************************************************************
+ *
+ * $Revision: 1.1 $
+ * $Date: 2003/04/24 10:12:40 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+public class ProxySubscriber {
+
+ private InetAddress host;
+ private int port;
+ public ArrayList sysKeys = new ArrayList();
+
+ public ProxySubscriber(DatagramPacket packet) throws InvalidDataException {
+ host = packet.getAddress();
+ port = packet.getPort();
+ }
+
+ public InetAddress getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/source/com/c2kernel/entity/transfer/TransferItem.java b/source/com/c2kernel/entity/transfer/TransferItem.java
new file mode 100755
index 0000000..a869741
--- /dev/null
+++ b/source/com/c2kernel/entity/transfer/TransferItem.java
@@ -0,0 +1,131 @@
+package com.c2kernel.entity.transfer;
+
+import java.io.File;
+import java.util.*;
+
+import com.c2kernel.common.ObjectNotFoundException;
+import com.c2kernel.entity.*;
+import com.c2kernel.lifecycle.instance.Workflow;
+import com.c2kernel.lookup.*;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.persistency.outcome.Outcome;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.property.*;
+import com.c2kernel.utils.*;
+
+public class TransferItem {
+ public ArrayList domainPaths;
+ public int sysKey;
+ static int importAgentId;
+
+ public TransferItem() throws Exception {
+ try {
+ importAgentId = Gateway.getLDAPLookup().getRoleManager().getAgentPath("system").getSysKey();
+ } catch (ObjectNotFoundException e) {
+ Logger.error("TransferItem - System user not found!");
+ throw e;
+ }
+ }
+
+ public TransferItem(int sysKey) throws Exception {
+ this.sysKey = sysKey;
+ domainPaths = new ArrayList();
+ Property name = (Property)Gateway.getStorage().get(sysKey, ClusterStorage.PROPERTY + "/Name", null);
+ Enumeration paths = Gateway.getLDAPLookup().search(new DomainPath(), name.getValue());
+ while (paths.hasMoreElements()) {
+ DomainPath thisPath = (DomainPath)paths.nextElement();
+ domainPaths.add(thisPath.toString());
+ }
+ }
+
+ public void exportItem(File dir, String path) throws Exception {
+ Logger.msg("Path " + path + " in " + sysKey);
+ String[] contents = Gateway.getStorage().getClusterContents(sysKey, path);
+ if (contents.length > 0) {
+ FileStringUtility.createNewDir(dir.getCanonicalPath());
+ for (int i = 0; i < contents.length; i++) {
+ exportItem(new File(dir, contents[i]), path + "/" + contents[i]);
+ }
+ } else { //no children, try to dump object
+ try {
+ C2KLocalObject obj = Gateway.getStorage().get(sysKey, path, null);
+ Logger.msg("Dumping object " + path + " in " + sysKey);
+ File dumpPath = new File(dir.getCanonicalPath() + ".xml");
+ FileStringUtility.string2File(dumpPath, CastorXMLUtility.marshall(obj));
+ return;
+ } catch (ObjectNotFoundException ex) {
+ } // not an object
+ }
+ }
+
+ public void importItem(File dir) throws Exception {
+ // check if already exists
+ try {
+ Property name = (Property)Gateway.getStorage().get(sysKey, ClusterStorage.PROPERTY + "/Name", null);
+ throw new Exception("Syskey " + sysKey + " already in use as " + name.getValue());
+ } catch (Exception ex) {
+ }
+
+ ArrayList events, outcomes, viewpoints = new ArrayList();
+ // retrieve objects
+ ArrayList objectFiles = FileStringUtility.listDir(dir.getCanonicalPath(), false, true);
+ ArrayList objects = new ArrayList();
+ for (Iterator iter = objectFiles.iterator(); iter.hasNext();) {
+ String element = (String)iter.next();
+ String xmlFile = FileStringUtility.file2String(element);
+ C2KLocalObject newObj;
+ String choppedPath = element.substring(dir.getCanonicalPath().length()+1, element.length()-4);
+ Logger.msg(choppedPath);
+ if (choppedPath.startsWith(ClusterStorage.OUTCOME))
+ newObj = new Outcome(choppedPath, xmlFile);
+ else
+ newObj = (C2KLocalObject)CastorXMLUtility.unmarshall(xmlFile);
+
+ objects.add(newObj);
+ }
+
+ // create item
+ EntityPath entityPath = new EntityPath(sysKey);
+ TraceableEntity newItem = (TraceableEntity)Gateway.getCorbaServer().createEntity(entityPath);
+ Gateway.getLDAPLookup().add(entityPath);
+
+ PropertyArrayList props = new PropertyArrayList();
+ Workflow wf = null;
+ // put objects
+ for (Iterator iter = objects.iterator(); iter.hasNext();) {
+ C2KLocalObject obj = (C2KLocalObject)iter.next();
+ if (obj instanceof Property)
+ props.list.add(obj);
+ else if (obj instanceof Workflow)
+ wf = (Workflow)obj;
+ }
+
+ if (wf == null)
+ throw new Exception("No workflow found in import for "+sysKey);
+
+ // init item
+ newItem.initialise(importAgentId, CastorXMLUtility.marshall(props), CastorXMLUtility.marshall(wf.search("workflow/domain")));
+
+ // store objects
+ importByType(ClusterStorage.COLLECTION, objects);
+ importByType(ClusterStorage.HISTORY, objects);
+ importByType(ClusterStorage.OUTCOME, objects);
+ importByType(ClusterStorage.VIEWPOINT, objects);
+ Gateway.getStorage().commit(this);
+ // add domPaths
+ for (Iterator iter = domainPaths.iterator(); iter.hasNext();) {
+ String element = (String)iter.next();
+ DomainPath newPath = new DomainPath(element, entityPath);
+ Gateway.getLDAPLookup().add(newPath);
+ }
+ }
+
+ private void importByType(String type, ArrayList objects) throws Exception {
+ for (Iterator iter = objects.iterator(); iter.hasNext();) {
+ C2KLocalObject element = (C2KLocalObject)iter.next();
+ if (element.getClusterType().equals(type))
+ Gateway.getStorage().put(sysKey, element, this);
+ }
+
+ }
+} \ No newline at end of file
diff --git a/source/com/c2kernel/entity/transfer/TransferSet.java b/source/com/c2kernel/entity/transfer/TransferSet.java
new file mode 100755
index 0000000..ee98f8d
--- /dev/null
+++ b/source/com/c2kernel/entity/transfer/TransferSet.java
@@ -0,0 +1,107 @@
+package com.c2kernel.entity.transfer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import com.c2kernel.lookup.EntityPath;
+import com.c2kernel.lookup.NextKeyManager;
+import com.c2kernel.process.Gateway;
+import com.c2kernel.utils.CastorXMLUtility;
+import com.c2kernel.utils.FileStringUtility;
+import com.c2kernel.utils.Logger;
+
+/**************************************************************************
+ *
+ * $Revision: 1.5 $
+ * $Date: 2005/04/26 06:48:13 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+
+public class TransferSet {
+
+ public ArrayList items;
+
+ public TransferSet() {
+ }
+
+ public TransferSet(int[] sysKeys) {
+ items = new ArrayList();
+ for (int i = 0; i < sysKeys.length; i++) {
+ try {
+ items.add(new TransferItem(sysKeys[i]));
+ } catch (Exception ex) {
+ Logger.error("Could not add item "+sysKeys[i]);
+ Logger.error(ex);
+ }
+ }
+ }
+
+ public void exportPackage(File dir) throws Exception {
+ if (items==null || items.size() == 0)
+ throw new Exception("Nothing to dump");
+ FileStringUtility.createNewDir(dir.getAbsolutePath());
+ for (Iterator iter = items.iterator(); iter.hasNext();) {
+ TransferItem element = (TransferItem)iter.next();
+ try {
+ element.exportItem(new File(dir, String.valueOf(element.sysKey)), "/");
+ } catch (Exception ex) {
+ Logger.error("Error dumping item "+element.sysKey);
+ Logger.error(ex);
+ }
+ }
+
+ try {
+ String self = CastorXMLUtility.marshall(this);
+ FileStringUtility.string2File(new File(dir, "transferSet.xml"), self);
+ } catch (Exception ex) {
+ Logger.error("Error writing header file");
+ Logger.error(ex);
+ }
+ }
+
+ public void importPackage(File rootDir) {
+ for (Iterator iter = items.iterator(); iter.hasNext();) {
+ TransferItem element = (TransferItem)iter.next();
+ Logger.msg(5, "Importing "+element.sysKey);
+ try {
+ element.importItem(new File(rootDir, String.valueOf(element.sysKey)));
+ } catch (Exception ex) {
+ Logger.error("Import of item "+element.sysKey+" failed. Rolling back");
+ Logger.error(ex);
+ Gateway.getStorage().abort(element);
+ }
+ }
+ checkLastKey();
+ }
+
+ private void checkLastKey()
+ {
+ // find highest key in out import set
+ int packageLastKey = 0;
+ for (Iterator iter = items.iterator(); iter.hasNext();) {
+ TransferItem element = (TransferItem)iter.next();
+ if (element.sysKey > packageLastKey)
+ packageLastKey = element.sysKey;
+ }
+
+ try
+ { // find the current last key
+ NextKeyManager nextKeyMan = Gateway.getLDAPLookup().getNextKeyManager();
+ EntityPath lastKey = nextKeyMan.getLastEntityPath();
+ Logger.msg(1, "Last key imported was "+packageLastKey+". LDAP lastkey was "+lastKey.getSysKey());
+
+
+ if (packageLastKey > lastKey.getSysKey()) { // set new last
+ Logger.msg(1, "Updating lastKey to "+packageLastKey);
+ nextKeyMan.writeLastEntityKey(packageLastKey);
+ }
+ }
+ catch (Exception ex)
+ {
+ Logger.error("Exception::LoadKeys::processFile() " + ex);
+ }
+ }
+}