From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- .../java/com/c2kernel/entity/C2KLocalObject.java | 32 ++ src/main/java/com/c2kernel/entity/CorbaServer.java | 190 +++++++++++ .../java/com/c2kernel/entity/TraceableEntity.java | 341 ++++++++++++++++++++ .../java/com/c2kernel/entity/TraceableLocator.java | 85 +++++ .../com/c2kernel/entity/agent/ActiveEntity.java | 295 ++++++++++++++++++ .../com/c2kernel/entity/agent/ActiveLocator.java | 87 ++++++ src/main/java/com/c2kernel/entity/agent/Job.java | 346 +++++++++++++++++++++ .../com/c2kernel/entity/agent/JobArrayList.java | 30 ++ .../java/com/c2kernel/entity/agent/JobList.java | 108 +++++++ .../java/com/c2kernel/entity/proxy/AgentProxy.java | 302 ++++++++++++++++++ .../entity/proxy/DomainPathSubscriber.java | 18 ++ .../com/c2kernel/entity/proxy/EntityProxy.java | 248 +++++++++++++++ .../c2kernel/entity/proxy/EntityProxyManager.java | 339 ++++++++++++++++++++ .../c2kernel/entity/proxy/EntityProxyObserver.java | 27 ++ .../java/com/c2kernel/entity/proxy/ItemProxy.java | 213 +++++++++++++ .../c2kernel/entity/proxy/MemberSubscription.java | 118 +++++++ .../entity/proxy/ProxyClientConnection.java | 185 +++++++++++ .../com/c2kernel/entity/proxy/ProxyMessage.java | 102 ++++++ .../entity/proxy/ProxyServerConnection.java | 134 ++++++++ .../com/c2kernel/entity/transfer/TransferItem.java | 133 ++++++++ .../com/c2kernel/entity/transfer/TransferSet.java | 103 ++++++ 21 files changed, 3436 insertions(+) create mode 100644 src/main/java/com/c2kernel/entity/C2KLocalObject.java create mode 100644 src/main/java/com/c2kernel/entity/CorbaServer.java create mode 100644 src/main/java/com/c2kernel/entity/TraceableEntity.java create mode 100644 src/main/java/com/c2kernel/entity/TraceableLocator.java create mode 100644 src/main/java/com/c2kernel/entity/agent/ActiveEntity.java create mode 100644 src/main/java/com/c2kernel/entity/agent/ActiveLocator.java create mode 100644 src/main/java/com/c2kernel/entity/agent/Job.java create mode 100644 src/main/java/com/c2kernel/entity/agent/JobArrayList.java create mode 100644 src/main/java/com/c2kernel/entity/agent/JobList.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/AgentProxy.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxy.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ItemProxy.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java create mode 100644 src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java create mode 100644 src/main/java/com/c2kernel/entity/transfer/TransferItem.java create mode 100644 src/main/java/com/c2kernel/entity/transfer/TransferSet.java (limited to 'src/main/java/com/c2kernel/entity') diff --git a/src/main/java/com/c2kernel/entity/C2KLocalObject.java b/src/main/java/com/c2kernel/entity/C2KLocalObject.java new file mode 100644 index 0000000..ec30dc1 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/C2KLocalObject.java @@ -0,0 +1,32 @@ +package com.c2kernel.entity; + +import java.io.Serializable; + +/** + * Objects that are to be stored by Cristal Entities must implement this interface and be (un)marshallable by Castor + * i.e. have a map file properly registered in the kernel. Domain implementors should not create new C2KLocalObjects + *

Each object will be stored as the path /clustertype/name in most cases. Exceptions are: + *

+ * + * @see com.c2kernel.persistency.ClusterStorage + * @see com.c2kernel.persistency.ClusterStorageManager + * + * @author Andrew Branson + * + * $Revision: 1.5 $ + * $Date: 2004/01/22 11:10:41 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + */ + +public interface C2KLocalObject extends Serializable { + + public void setName(String name); + public String getName(); + + public String getClusterType(); +} diff --git a/src/main/java/com/c2kernel/entity/CorbaServer.java b/src/main/java/com/c2kernel/entity/CorbaServer.java new file mode 100644 index 0000000..84d2ef2 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/CorbaServer.java @@ -0,0 +1,190 @@ +package com.c2kernel.entity; + +import java.util.Map; + +import org.omg.PortableServer.POA; +import org.omg.PortableServer.POAManager; +import org.omg.PortableServer.Servant; +import org.omg.PortableServer.POAManagerPackage.AdapterInactive; + +import com.c2kernel.common.CannotManageException; +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.ObjectAlreadyExistsException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.agent.ActiveEntity; +import com.c2kernel.entity.agent.ActiveLocator; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.InvalidEntityPathException; +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.SoftCache; + +/************************************************************************** + * + * $Revision: 1.8 $ + * $Date: 2005/10/13 08:13:44 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + + +public class CorbaServer { + private Map mEntityCache; + private POA mRootPOA; + private POA mItemPOA; + private POA mAgentPOA; + private POAManager mPOAManager; + + public CorbaServer() throws InvalidDataException { + mEntityCache = new SoftCache(50); + + // init POA + try { + setupPOA(); + mPOAManager.activate(); + } catch (Exception ex) { + Logger.error(ex); + throw new InvalidDataException("Error initialising POA", ""); + } + + new Thread(new Runnable() { + @Override + public void run() { + Thread.currentThread().setName("ORB Invoker"); + Gateway.getORB().run(); + } + }).start(); + } + + public void close() { + try { + mPOAManager.deactivate(true, true); + } catch (AdapterInactive ex) { + Logger.error(ex); + } + } + + /************************************************************************** + * Initialises the C2KRootPOA with policies which are suitable for Factory objects + **************************************************************************/ + public void setupPOA() throws Exception { + + //Initialise the RootPOA + mRootPOA = org.omg.PortableServer.POAHelper.narrow( + Gateway.getORB().resolve_initial_references("RootPOA")); + + //Initilaise the default POAManager + + mPOAManager = mRootPOA.the_POAManager(); + + // Create POA for use by the entities + org.omg.CORBA.Policy[] policies = new org.omg.CORBA.Policy[6]; + + policies[0] = mRootPOA.create_id_assignment_policy( + org.omg.PortableServer.IdAssignmentPolicyValue.USER_ID); + policies[1] = mRootPOA.create_lifespan_policy( + org.omg.PortableServer.LifespanPolicyValue.PERSISTENT); + policies[2] = mRootPOA.create_servant_retention_policy( + org.omg.PortableServer.ServantRetentionPolicyValue.NON_RETAIN); + policies[3] = mRootPOA.create_id_uniqueness_policy( + org.omg.PortableServer.IdUniquenessPolicyValue.UNIQUE_ID); + policies[4] = mRootPOA.create_request_processing_policy( + org.omg.PortableServer.RequestProcessingPolicyValue. + USE_SERVANT_MANAGER); + policies[5] = mRootPOA.create_implicit_activation_policy( + org.omg.PortableServer.ImplicitActivationPolicyValue. + NO_IMPLICIT_ACTIVATION); + + mItemPOA = mRootPOA.create_POA( "Item", + mRootPOA.the_POAManager(), + policies ); + mAgentPOA = mRootPOA.create_POA( "Agent", + mRootPOA.the_POAManager(), + policies ); + + //Create the locators + TraceableLocator itemLocator = new TraceableLocator( mItemPOA ); + mItemPOA.set_servant_manager( itemLocator._this( Gateway.getORB() ) ); + + ActiveLocator agentLocator = new ActiveLocator( mAgentPOA ); + mAgentPOA.set_servant_manager( agentLocator._this( Gateway.getORB() ) ); + + } + + + /************************************************************************** + * Returns a CORBA servant for a pre-existing entity + **************************************************************************/ + private Servant getEntity(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException { + try { + EntityPath entityPath = new EntityPath(sysKey); + Servant entity = null; + synchronized (mEntityCache) { + entity = mEntityCache.get(entityPath); + if (entity == null) { + Logger.msg(7, "Creating new servant for "+sysKey); + + Class entityClass = Gateway.getLDAPLookup().getEntityClass(entityPath); + + if (entityClass == TraceableEntity.class) { + if (poa == null) poa = mItemPOA; + entity = new TraceableEntity(sysKey, poa); + } + else if (entityClass == ActiveEntity.class) { + if (poa == null) poa = mAgentPOA; + entity = new ActiveEntity(sysKey, poa); + } + mEntityCache.put(entityPath, entity); + } + } + return entity; + + } catch (InvalidEntityPathException ex) { + throw new ObjectNotFoundException("Invalid Entity Key", ""); + } + } + + /************************************************************************** + * Wrapper for fetching Items + **************************************************************************/ + public TraceableEntity getItem(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException { + return (TraceableEntity)getEntity(sysKey, poa); + } + + /************************************************************************** + * Wrapper for fetching Agents + **************************************************************************/ + public ActiveEntity getAgent(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException { + return (ActiveEntity)getEntity(sysKey, poa); + } + + /** + * @param entityPath + * @return + */ + public Servant createEntity(EntityPath entityPath) throws CannotManageException, ObjectAlreadyExistsException { + try { + if (entityPath == null) + entityPath = Gateway.getLDAPLookup().getNextKeyManager().generateNextEntityKey(); + } catch (Exception ex) { + Logger.error(ex); + throw new CannotManageException("Cannot generate next entity key"); + } + boolean isAgent = entityPath instanceof AgentPath; + POA myPOA = isAgent?mAgentPOA:mItemPOA; + org.omg.CORBA.Object obj = myPOA.create_reference_with_id(entityPath.getOID(), isAgent?AgentHelper.id():ItemHelper.id()); + entityPath.setIOR(obj); + Servant entity; + if (isAgent) + entity = new ActiveEntity(entityPath.getSysKey(), myPOA); + else + entity = new TraceableEntity(entityPath.getSysKey(), myPOA); + synchronized (mEntityCache) { + mEntityCache.put(entityPath, entity); + } + return entity; + + } +} diff --git a/src/main/java/com/c2kernel/entity/TraceableEntity.java b/src/main/java/com/c2kernel/entity/TraceableEntity.java new file mode 100644 index 0000000..c7aff82 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/TraceableEntity.java @@ -0,0 +1,341 @@ +/************************************************************************** + * TraceableEntity + * + * $Workfile$ + * $Revision: 1.108 $ + * $Date: 2005/10/06 14:46:22 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity; + + +import com.c2kernel.common.AccessRightsException; +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.InvalidTransitionException; +import com.c2kernel.common.ObjectAlreadyExistsException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.agent.JobArrayList; +import com.c2kernel.lifecycle.instance.CompositeActivity; +import com.c2kernel.lifecycle.instance.Workflow; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.InvalidEntityPathException; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.ClusterStorageException; +import com.c2kernel.persistency.TransactionManager; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.property.PropertyArrayList; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.Logger; + +/************************************************************************** +* +* @author $Author: abranson $ $Date: 2005/10/06 14:46:22 $ +* @version $Revision: 1.108 $ +*
+*                                ,.   '\'\    ,---.
+*                            .  | \\  l\\l_ //    |
+*        _              _       |  \\/ `/  `.|    |
+*      /~\\   \        //~\     | Y |   |   ||  Y |
+*      |  \\   \      //  |     |  \|   |   |\ /  |
+*      [   ||        ||   ]     \   |  o|o  | >  /
+*     ] Y  ||        ||  Y [     \___\_--_ /_/__/
+*     |  \_|l,------.l|_/  |     /.-\(____) /--.\
+*     |   >'          `<   |     `--(______)----'
+*     \  (/~`--____--'~\)  /         u// u / \
+*      `-_>-__________-<_-'          / \  / /|
+*          /(_#(__)#_)\             ( .) / / ]
+*          \___/__\___/              `.`' /   [
+*           /__`--'__\                |`-'    |
+*        /\(__,>-~~ __)               |       |_
+*     /\//\\(  `--~~ )               _l       |-:.
+*     '\/  <^\      /^>             |  `   (  <  \\
+*          _\ >-__-< /_           ,-\  ,-~~->. \  `:._,/
+*        (___\    /___)         (____/    (____)   `-'
+*             Kovax            and, paradoxically, Kovax
+* 
+***************************************************************************/ + +public class TraceableEntity extends ItemPOA +{ + + private int mSystemKey; + private org.omg.PortableServer.POA mPoa; + private TransactionManager mStorage; + + + /************************************************************************** + * Constructor used by the Locator only + **************************************************************************/ + public TraceableEntity( int key, + org.omg.PortableServer.POA poa ) + { + Logger.msg(5,"TraceableEntity::constructor() - SystemKey:" + key ); + + mSystemKey = key; + mPoa = poa; + mStorage = Gateway.getStorage(); + } + + + /************************************************************************** + * + **************************************************************************/ + @Override + public org.omg.PortableServer.POA _default_POA() + { + if(mPoa != null) + return mPoa; + else + return super._default_POA(); + } + + + /************************************************************************** + * + **************************************************************************/ + @Override + public int getSystemKey() + { + Logger.msg(8, "TraceableEntity::getSystemKey() - " + mSystemKey); + return mSystemKey; + } + + /************************************************************************** + * + **************************************************************************/ + @Override + public void initialise( int agentId, + String propString, + String initWfString + ) + throws AccessRightsException, + InvalidDataException, + PersistencyException + { + Logger.msg(1, "TraceableEntity::initialise("+mSystemKey+") - agent:"+agentId); + synchronized (this) { + Workflow lc = null; + PropertyArrayList props = null; + + AgentPath agentPath; + try { + agentPath = new AgentPath(agentId); + } catch (InvalidEntityPathException e) { + throw new AccessRightsException("Invalid Agent Id:" + agentId); + } + + //unmarshalling checks the validity of the received strings + + // create properties + if (!propString.equals("")) { + try { + props = (PropertyArrayList)CastorXMLUtility.unmarshall(propString); + for (Object name : props.list) { + Property thisProp = (Property)name; + mStorage.put(mSystemKey, thisProp, props); + } + } catch (Throwable ex) { + Logger.msg(8, "TraceableEntity::initialise("+mSystemKey+ ") - Properties were invalid: "+propString); + Logger.error(ex); + mStorage.abort(props); + } + mStorage.commit(props); + } + + // create wf + try { + if (initWfString == null || initWfString.equals("")) + lc = new Workflow(new CompositeActivity()); + else + lc = new Workflow((CompositeActivity)CastorXMLUtility.unmarshall(initWfString)); + lc.initialise(mSystemKey, agentPath); + mStorage.put(mSystemKey, lc, null); + } catch (Throwable ex) { + Logger.msg(8, "TraceableEntity::initialise("+mSystemKey+") - Workflow was invalid: "+initWfString); + Logger.error(ex); + } + } + } + + /************************************************************************** + * + **************************************************************************/ + //requestdata is xmlstring + @Override + public void requestAction( int agentId, + String stepPath, + int transitionID, + String requestData + ) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException + { + synchronized (this) { + try { + + Logger.msg(1, "TraceableEntity::request("+mSystemKey+") - " + + Transitions.getTransitionName(transitionID) + " "+stepPath + " by " +agentId ); + + AgentPath agent = new AgentPath(agentId); + Workflow lifeCycle = (Workflow)mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE+"/workflow", null); + + lifeCycle.requestAction( agent, + stepPath, + transitionID, + requestData ); + + // store the workflow if we've changed the state of the domain wf + if (!(stepPath.startsWith("workflow/predefined"))) + mStorage.put(mSystemKey, lifeCycle, null); + + // Normal operation exceptions + } catch (AccessRightsException ex) { + Logger.msg("Propagating AccessRightsException back to the calling agent"); + throw ex; + } catch (InvalidTransitionException ex) { + Logger.msg("Propagating InvalidTransitionException back to the calling agent"); + throw ex; + } catch (ObjectNotFoundException ex) { + Logger.msg("Propagating ObjectNotFoundException back to the calling agent"); + throw ex; + // errors + } catch (ClusterStorageException ex) { + Logger.error(ex); + throw new PersistencyException("Error on storage: "+ex.getMessage(), ""); + } catch (InvalidEntityPathException ex) { + Logger.error(ex); + throw new AccessRightsException("Invalid Agent Id: "+agentId, ""); + } catch (InvalidDataException ex) { + Logger.error(ex); + Logger.msg("Propagating InvalidDataException back to the calling agent"); + throw ex; + } catch (ObjectAlreadyExistsException ex) { + Logger.error(ex); + Logger.msg("Propagating ObjectAlreadyExistsException back to the calling agent"); + throw ex; + // non-CORBA exception hasn't been caught! + } catch (Throwable ex) { + Logger.error("Unknown Error: requestAction on "+mSystemKey+" by "+agentId+" executing "+stepPath); + Logger.error(ex); + throw new InvalidDataException("Extraordinary Exception during execution:"+ex.getClass().getName()+" - "+ex.getMessage(), ""); + } + } + } + + /************************************************************************** + * + **************************************************************************/ + @Override + public String queryLifeCycle( int agentId, + boolean filter + ) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + synchronized (this) { + Logger.msg(1, "TraceableEntity::queryLifeCycle("+mSystemKey+") - agent: " + agentId); + + try + { + AgentPath agent = new AgentPath(agentId); + Workflow wf = (Workflow)mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE+"/workflow", null); + JobArrayList jobBag = new JobArrayList(); + CompositeActivity domainWf = (CompositeActivity)wf.search("workflow/domain"); + jobBag.list = filter?domainWf.calculateJobs(agent, true):domainWf.calculateAllJobs(agent, true); + Logger.msg(1, "TraceableEntity::queryLifeCycle("+mSystemKey+") - Returning "+jobBag.list.size()+" jobs."); + return CastorXMLUtility.marshall( jobBag ); + } + catch( Throwable ex ) + { + Logger.error(ex); + return ""; + } + } + } + + /************************************************************************** + * The description for operation getData. + * + * @param path - the path to the object required + * the suffix 'all' retrieves a listing of all keys on that level + * + * @return The result string in xml format + * except 'all' which returns a comma sep list + * + * @exception ObjectNotFoundException + * ************************************************************************/ + @Override + public String queryData(String path) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + synchronized (this) { + String result = ""; + + Logger.msg(1, "TraceableEntity::queryData("+mSystemKey+") - " + path ); + + try + { // check for cluster contents query + + if (path.endsWith("/all")) + { + int allPos = path.lastIndexOf("all"); + String query = path.substring(0,allPos); + String[] ids = mStorage.getClusterContents( mSystemKey, query ); + + for( int i=0; i transaction aborted!"); + } + + Logger.msg(5, "ActiveEntity::init() - completed."); + } + + /** + * + * @param propsString + * @return Properties + * @throws InvalidDataException Properties cannot be unmarshalled + * @throws ClusterStorageException + */ + private PropertyArrayList initProps( String propsString ) + throws InvalidDataException, + ClusterStorageException + { + PropertyArrayList props = null; + + // create properties + if( propsString != null && !propsString.equals("") ) + { + try + { + props = (PropertyArrayList)CastorXMLUtility.unmarshall(propsString); + } + catch( Exception ex ) + { + //any exception during unmarshall indicates that data was + //incorrect or the castor mapping was not set up + Logger.error(ex); + throw new InvalidDataException(ex.toString(), null); + } + + Iterator iter = props.list.iterator(); + + while( iter.hasNext() ) + mDatabase.put( mSystemKey, iter.next(), props ); + } + else + { + Logger.warning("ActiveEntity::initProps() - NO Properties!"); + } + + return props; + } + + /************************************************************************** + * + * + **************************************************************************/ + @Override + public org.omg.PortableServer.POA _default_POA() + { + if(mPOA != null) + return mPOA; + else + return super._default_POA(); + } + + + /************************************************************************** + * + * + **************************************************************************/ + @Override + public int getSystemKey() + { + return mSystemKey; + } + + + /************************************************************************** + * + * + **************************************************************************/ + @Override + public String queryData(String xpath) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + String result = ""; + int allPos = -1; + + Logger.msg(1, "ActiveEntity::queryData("+mSystemKey+") - " + xpath ); + + try + { + if( (allPos=xpath.indexOf("all")) != -1 ) + { + String query = xpath.substring(0,allPos); + String[] ids = mDatabase.getClusterContents( mSystemKey, query ); + + for( int i=0; i 0) + try + { + Viewpoint view = (Viewpoint) Gateway.getStorage().get(getItemSysKey(), ClusterStorage.VIEWPOINT + "/" + getSchemaType() + "/" + viewName, null); + mOutcome = view.getOutcome().getData(); + } catch (Exception ex) + { // not found, return null + } + } + return mOutcome; + } + + public Outcome getOutcome() + { + Logger.msg(1, "Get outcome"); + return new Outcome(-1, getOutcomeString(), getSchemaType(), getSchemaVersion()); + } + + public int getAgentId() throws ObjectNotFoundException + { + if (mAgentId == -1) + mAgentId = Gateway.getLDAPLookup().getRoleManager().getAgentPath(getAgentName()).getSysKey(); + return mAgentId; + } + + public void setAgentId(int id) + { + mAgentId = id; + agent = null; + } + + public String getAgentName() + { + if (mAgentName == null) + mAgentName = (String) mActProps.get("AgentName"); + return mAgentName; + } + + public void setAgentName(String agentName) + { + mAgentName = agentName; + } + + public String getAgentRole() + { + if (mAgentRole == null) + mAgentRole = (String) mActProps.get("Agent Role"); + return mAgentRole; + } + + public void setAgentRole(String role) + { + mAgentRole = role; + } + + public boolean isOutcomeUsed() + { + String schemaType = getSchemaType(); + return (Boolean.TRUE.equals(getActProp("AlwaysUseOutcome")) || mPossibleTransition == Transitions.DONE || mPossibleTransition == Transitions.COMPLETE) && !(schemaType == null || schemaType.equals("")); + } + + public int getID() + { + return mID; + } + + @Override + public String getName() + { + return mName; + } + + public void setID(int id) + { + mID = id; + mName = String.valueOf(id); + } + + @Override + public void setName(String name) + { + mName = name; + try + { + mID = Integer.parseInt(name); + } catch (NumberFormatException ex) + { + mID = -1; + } + } + + public void setItemSysKey(int sysKey) + { + mItemSysKey = sysKey; + item = null; + } + + @Override + public String getClusterType() + { + return ClusterStorage.JOB; + } + + public boolean equals(Job job) + { + return (getItemSysKey() == job.getItemSysKey()) && this.mStepPath.equals(job.mStepPath) && this.mPossibleTransition == job.mPossibleTransition; + } + + public Object getActProp(String name) + { + return mActProps.get(name); + } + + public String getActPropString(String name) + { + try + { + return mActProps.get(name).toString(); + } catch (NullPointerException ex) + { + return null; + } + } + + public String getStepName() + { + return mStepName; + } + + public void setStepName(String string) + { + mStepName = string; + } + + public String getStepPath() + { + return mStepPath; + } + + public void setStepPath(String string) + { + mStepPath = string; + } + + public int getPossibleTransition() + { + return mPossibleTransition; + } + + public void setPossibleTransition(int lint) + { + mPossibleTransition = lint; + } + + public CastorHashMap getActProps() + { + return mActProps; + } + + public void setActProps(CastorHashMap map) + { + mActProps = map; + } + + public KeyValuePair[] getKeyValuePairs() + { + return mActProps.getKeyValuePairs(); + } + + public void setKeyValuePairs(KeyValuePair[] pairs) + { + mActProps.setKeyValuePairs(pairs); + } + + public int getCurrentState() + { + return mCurrentState; + } + + public void setCurrentState(int string) + { + mCurrentState = string; + } + + public int getTargetState() { + return mTargetState; + } + + public void setTargetState(int mTargetState) { + this.mTargetState = mTargetState; + } + + /** + * Returns the actType. + * + * @return String + */ + public String getStepType() + { + return mStepType; + } + + /** + * Sets the actType. + * + * @param actType + * The actType to set + */ + public void setStepType(String actType) + { + mStepType = actType; + } +} \ No newline at end of file diff --git a/src/main/java/com/c2kernel/entity/agent/JobArrayList.java b/src/main/java/com/c2kernel/entity/agent/JobArrayList.java new file mode 100644 index 0000000..dcb3215 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/agent/JobArrayList.java @@ -0,0 +1,30 @@ +/************************************************************************** + * + * $Revision: 1.2 $ + * $Date: 2003/06/20 11:44:30 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.agent; + +import java.util.ArrayList; + +import com.c2kernel.utils.CastorArrayList; + +public class JobArrayList extends CastorArrayList +{ + + public JobArrayList() + { + super(); + } + + public JobArrayList(ArrayList aList) + { + super(aList); + } + + +} diff --git a/src/main/java/com/c2kernel/entity/agent/JobList.java b/src/main/java/com/c2kernel/entity/agent/JobList.java new file mode 100644 index 0000000..f8a88ee --- /dev/null +++ b/src/main/java/com/c2kernel/entity/agent/JobList.java @@ -0,0 +1,108 @@ +package com.c2kernel.entity.agent; + +import java.util.Iterator; +import java.util.Vector; + +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.RemoteMap; +import com.c2kernel.utils.Logger; + + +/************************************************************************** +* +* @author $Author: abranson $ $Date: 2006/03/03 13:52:21 $ +* @version $Revision: 1.15 $ +***************************************************************************/ +public class JobList extends RemoteMap +{ + + /************************************************************************** + * Empty constructor for Castor + **************************************************************************/ + public JobList(int sysKey, Object locker) + { + super(sysKey, ClusterStorage.JOB, locker); + } + + + /************************************************************************** + * + **************************************************************************/ + public void addJob( Job job ) + { + synchronized(this) { + int jobId = getLastId()+1; + job.setID(jobId); + put(String.valueOf(jobId), job); + } + } + + /** + * Cannot be stored + */ + @Override + public String getClusterType() { + return null; + } + + + public Job getJob(int id) { + return get(String.valueOf(id)); + } + + + /** + * @param job + */ + public void removeJobsWithSysKey( int sysKey ) + { + Iterator currentMembers = values().iterator(); + Job j = null; + + while( currentMembers.hasNext() ) + { + j = currentMembers.next(); + + if( j.getItemSysKey() == sysKey ) + remove( String.valueOf(j.getID()) ); + } + + Logger.msg(5, "JobList::removeJobsWithSysKey() - " + sysKey + " DONE." ); + } + + public void removeJobsForStep( int sysKey, String stepPath ) + { + Iterator currentMembers = values().iterator(); + while( currentMembers.hasNext() ) + { + Job j = currentMembers.next(); + if( j.getItemSysKey() == sysKey && j.getStepPath().equals(stepPath)) + remove( String.valueOf(j.getID()) ); + } + + Logger.msg(5, "JobList::removeJobsForStep() - " + sysKey + " DONE." ); + } + /** + * @param itemKey + * @param string + * @return + */ + public Vector getJobsOfSysKey(int sysKey) + { + Iterator currentMembers = values().iterator(); + Job j = null; + Vector jobs = new Vector(); + + while( currentMembers.hasNext() ) + { + j = currentMembers.next(); + + if( j.getItemSysKey() == sysKey ) + jobs.add(j); + } + + Logger.msg(5, "JobList::getJobsOfSysKey() - returning " + jobs.size() + " Jobs." ); + + return jobs; + } +} \ No newline at end of file diff --git a/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java new file mode 100644 index 0000000..72ed088 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/AgentProxy.java @@ -0,0 +1,302 @@ +/************************************************************************** + * AgentProxy.java + * + * $Revision: 1.37 $ + * $Date: 2005/10/05 07:39:36 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.Date; +import java.util.Enumeration; + +import com.c2kernel.common.AccessRightsException; +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.InvalidTransitionException; +import com.c2kernel.common.ObjectAlreadyExistsException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.Agent; +import com.c2kernel.entity.AgentHelper; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.ManageableEntity; +import com.c2kernel.entity.agent.Job; +import com.c2kernel.lifecycle.instance.predefined.PredefinedStep; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.InvalidEntityPathException; +import com.c2kernel.lookup.Path; +import com.c2kernel.persistency.outcome.OutcomeValidator; +import com.c2kernel.persistency.outcome.Schema; +import com.c2kernel.process.Gateway; +import com.c2kernel.scripting.ErrorInfo; +import com.c2kernel.scripting.Script; +import com.c2kernel.scripting.ScriptingEngineException; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.LocalObjectLoader; +import com.c2kernel.utils.Logger; + +/****************************************************************************** + * It is a wrapper for the connection and communication with Agent + * It caches data loaded from the Agent to reduce communication + * + * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $ + * @author $Author: abranson $ + ******************************************************************************/ +public class AgentProxy extends EntityProxy +{ + AgentPath path; + + /************************************************************************** + * Creates an AgentProxy without cache and change notification + **************************************************************************/ + public AgentProxy( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + super(ior, systemKey); + try { + path = new AgentPath(systemKey); + } catch (InvalidEntityPathException e) { + throw new ObjectNotFoundException(); + } + } + + @Override + public ManageableEntity narrow() throws ObjectNotFoundException + { + try { + return AgentHelper.narrow(mIOR); + } catch (org.omg.CORBA.BAD_PARAM ex) { } + throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down."); + } + /************************************************************************** + * + * + **************************************************************************/ + public void initialise( String agentProps, String collector ) + throws AccessRightsException, + InvalidDataException, + PersistencyException, + ObjectNotFoundException + { + Logger.msg(7, "AgentProxy::initialise - started"); + + ((Agent)getEntity()).initialise( agentProps ); + } + + public AgentPath getPath() { + return path; + } + + /** + * Executes a job on the given item using this agent. + * + * @param item - item holding this job + * @param job - the job to execute + */ + public void execute(ItemProxy item, Job job) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException + { + OutcomeValidator validator = null; + String scriptName = job.getActPropString("ScriptName"); + Date startTime = new Date(); + Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName()); + // get the outcome validator if present + if (job.isOutcomeUsed()) + { + + // get schema info from act props + String schemaName = job.getActPropString("SchemaType"); + int schemaVersion; + try { + schemaVersion = Integer.parseInt(job.getActPropString("SchemaVersion")); + } catch (Exception e) { + throw new InvalidDataException(e.getClass().getName()+" extracing schema version", ""); + } + Logger.msg(5, "AgentProxy - fetching schema "+schemaName+"_"+schemaVersion+" for validation"); + // retrieve schema + Schema schema = LocalObjectLoader.getSchema(schemaName, schemaVersion); + + if (schema == null) + throw new InvalidDataException("Job references outcome type "+schemaName+" version "+schemaVersion+" that does not exist in this centre.", ""); + + try { + validator = OutcomeValidator.getValidator(schema); + } catch (Exception e) { + throw new InvalidDataException("Could not create validator: "+e.getMessage(), ""); + } + } + + if(scriptName != null && scriptName.length() > 0 && + (job.getPossibleTransition() == Transitions.DONE || job.getPossibleTransition() == Transitions.COMPLETE)) { + Logger.msg(3, "AgentProxy - executing script "+scriptName); + try { + + // pre-validate outcome from script if there is one + if (job.getOutcomeString()!= null && validator != null) { + Logger.msg(5, "AgentProxy - validating outcome before script execution"); + String error = validator.validate(job.getOutcomeString()); + if (error.length() > 0) { + Logger.error("Outcome not valid: \n " + error); + throw new InvalidDataException(error, ""); + } + } + + // load script + ErrorInfo scriptErrors = (ErrorInfo)callScript(item, job); + + if (scriptErrors.getFatal()) { + Logger.msg(3, "AgentProxy - fatal script error"); + Logger.error(scriptErrors.getErrors()); + throw new InvalidDataException("Fatal Script Error: \n"+scriptErrors.getErrors(), ""); + } + if (scriptErrors.getErrors().length() > 0) + Logger.warning("Script errors: "+scriptErrors.getErrors()); + } catch (ScriptingEngineException ex) { + Logger.error(ex); + throw new InvalidDataException(ex.getMessage(), ""); + } + } + + if (job.isOutcomeUsed()) { + Logger.msg(3, "AgentProxy - validating outcome"); + String error = validator.validate(job.getOutcomeString()); + if (error.length() > 0) + throw new InvalidDataException(error, ""); + } + + job.setAgentId(getSystemKey()); + Logger.msg(3, "AgentProxy - submitting job to item proxy"); + item.requestAction(job); + if (Logger.doLog(3)) { + Date timeNow = new Date(); + long secsNow = (timeNow.getTime()-startTime.getTime())/1000; + Logger.msg(3, "Execution took "+secsNow+" seconds"); + } + } + + public Object callScript(ItemProxy item, Job job) throws ScriptingEngineException { + Script script = new Script(item, this, job); + return script.execute(); + } + + /** + * Standard execution of jobs. Note that this method should always be the one used from clients - all execution + * parameters are taken from the job where they're probably going to be correct. + * + * @param job + * @throws AccessRightsException + * @throws InvalidDataException + * @throws InvalidTransitionException + * @throws ObjectNotFoundException + * @throws PersistencyException + * @throws ObjectAlreadyExistsException + */ + public void execute(Job job) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + try { + ItemProxy targetItem = (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(job.getItemSysKey())); + execute(targetItem, job); + } catch (InvalidEntityPathException e) { + throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), ""); + } + } + + public void execute(ItemProxy item, String predefStep, C2KLocalObject obj) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + String param; + try { + param = marshall(obj); + } catch (Exception ex) { + Logger.error(ex); + throw new InvalidDataException("Error on marshall", ""); + } + execute(item, predefStep, param); + } + + public void execute(ItemProxy item, String predefStep, String param) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + String[] params = new String[1]; + params[0] = param; + execute(item, predefStep, params); + } + + public void execute(ItemProxy item, String predefStep, String[] params) + throws AccessRightsException, + InvalidDataException, + InvalidTransitionException, + ObjectNotFoundException, + PersistencyException, + ObjectAlreadyExistsException + { + item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, Transitions.DONE, PredefinedStep.bundleData(params)); + } + + /** Wrappers for scripts */ + public String marshall(Object obj) throws Exception { + return CastorXMLUtility.marshall(obj); + } + + public Object unmarshall(String obj) throws Exception { + return CastorXMLUtility.unmarshall(obj); + } + + /** Let scripts resolve items */ + public ItemProxy searchItem(String name) throws ObjectNotFoundException { + Enumeration results = Gateway.getLDAPLookup().search(new DomainPath(""),name); + + Path returnPath = null; + if (!results.hasMoreElements()) + throw new ObjectNotFoundException(name, ""); + + while(results.hasMoreElements()) { + Path nextMatch = results.nextElement(); + if (returnPath != null && nextMatch.getSysKey() != -1 && returnPath.getSysKey() != nextMatch.getSysKey()) + throw new ObjectNotFoundException("Too many items with that name"); + returnPath = nextMatch; + } + + return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath); + } + + public ItemProxy getItem(String path) throws ObjectNotFoundException { + return (getItem(new DomainPath(path))); + } + + public ItemProxy getItem(DomainPath path) throws ObjectNotFoundException { + return (ItemProxy)Gateway.getProxyManager().getProxy(path); + } + + public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException { + return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey)); + } +} diff --git a/src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java b/src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java new file mode 100644 index 0000000..4089325 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/DomainPathSubscriber.java @@ -0,0 +1,18 @@ +package com.c2kernel.entity.proxy; + +import com.c2kernel.lookup.DomainPath; + +/************************************************************************** + * + * $Revision: 1.1 $ + * $Date: 2004/02/05 16:11:57 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public interface DomainPathSubscriber { + + public void pathAdded(DomainPath path); + public void pathRemoved(DomainPath path); +} diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java new file mode 100644 index 0000000..fae2e28 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/EntityProxy.java @@ -0,0 +1,248 @@ +/************************************************************************** + * EntityProxy.java + * + * $Revision: 1.35 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.HashMap; +import java.util.Iterator; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.ManageableEntity; +import com.c2kernel.persistency.ClusterStorageException; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.Logger; + + +/****************************************************************************** +* It is a wrapper for the connection and communication with Entities. +* It can cache data loaded from the Entity to reduce communication with it. +* This cache is syncronised with corresponding Entity through an event mechanism. +* +* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $ +* @author $Author: abranson $ +******************************************************************************/ + +abstract public class EntityProxy implements ManageableEntity +{ + + protected ManageableEntity mEntity = null; + protected org.omg.CORBA.Object mIOR; + protected int mSystemKey; + private HashMap, EntityProxyObserver> mSubscriptions; + + /************************************************************************** + * + **************************************************************************/ + protected EntityProxy( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity"); + + initialise( ior, systemKey); + } + + /************************************************************************** + * + **************************************************************************/ + private void initialise( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity"); + + mIOR = ior; + mSystemKey = systemKey; + mSubscriptions = new HashMap, EntityProxyObserver>(); + } + + + /************************************************************************** + * + **************************************************************************/ + public ManageableEntity getEntity() throws ObjectNotFoundException + { + if (mEntity == null) { + mEntity = narrow(); + } + return mEntity; + } + + abstract public ManageableEntity narrow() throws ObjectNotFoundException; + + /************************************************************************** + * + **************************************************************************/ + //check who is using.. and if toString() is sufficient + @Override + public int getSystemKey() + { + return mSystemKey; + } + + + /************************************************************************** + * + **************************************************************************/ + @Override + public String queryData( String path ) + throws ObjectNotFoundException + { + + try { + Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); + if (path.endsWith("all")) { + Logger.msg(7, "EntityProxy.queryData() - listing contents"); + String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); + StringBuffer retString = new StringBuffer(); + for (int i = 0; i < result.length; i++) { + retString.append(result[i]); + if (i"+e.getMessage()+""; + } + } + + public String[] getContents( String path ) throws ObjectNotFoundException { + try { + return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); + } catch (ClusterStorageException e) { + throw new ObjectNotFoundException(e.toString()); + } + } + + + /************************************************************************** + * + **************************************************************************/ + public C2KLocalObject getObject( String xpath ) + throws ObjectNotFoundException + { + // load from storage, falling back to proxy loader if not found in others + try + { + return Gateway.getStorage().get( mSystemKey, xpath , null); + } + catch( ClusterStorageException ex ) + { + Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); + throw new ObjectNotFoundException( ex.toString() ); + } + } + + + + public String getProperty( String name ) + throws ObjectNotFoundException + { + Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); + Property prop = (Property)getObject("Property/"+name); + try + { + return prop.getValue(); + } + catch (NullPointerException ex) + { + throw new ObjectNotFoundException(); + } + } + + public String getName() + { + try { + return getProperty("Name"); + } catch (ObjectNotFoundException ex) { + return null; + } + } + + + /************************************************************************** + * Subscription methods + **************************************************************************/ + + public void subscribe (MemberSubscription newSub) { + + newSub.setSubject(this); + synchronized (this){ + mSubscriptions.put( newSub, newSub.getObserver() ); + } + new Thread(newSub).start(); + Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); + } + + public void unsubscribe(EntityProxyObserver observer) + { + synchronized (this){ + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription thisSub = e.next(); + if (mSubscriptions.get( thisSub ) == observer) { + e.remove(); + Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); + } + } + } + } + + public void dumpSubscriptions(int logLevel) { + if (mSubscriptions.size() == 0) return; + Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); + synchronized(this) { + for (MemberSubscription element : mSubscriptions.keySet()) { + EntityProxyObserver obs = element.getObserver(); + if (obs != null) + Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); + else + Logger.msg(logLevel, " Phantom subscription to "+element.interest); + } + } + } + + public void notify(ProxyMessage message) { + Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); + synchronized (this){ + if (!message.getServer().equals(EntityProxyManager.serverName)) + Gateway.getStorage().clearCache(mSystemKey, message.getPath()); + for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription newSub = e.next(); + if (newSub.getObserver() == null) { // phantom + Logger.msg(4, "Removing phantom subscription to "+newSub.interest); + e.remove(); + } + else + newSub.update(message.getPath(), message.getState()); + } + } + } + + /** + * If this is reaped, clear out the cache for it too. + */ + @Override + protected void finalize() throws Throwable { + Logger.msg(7, "Proxy "+mSystemKey+" reaped"); + Gateway.getStorage().clearCache(mSystemKey, null); + Gateway.getProxyManager().removeProxy(mSystemKey); + super.finalize(); + } + +} diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java new file mode 100644 index 0000000..192a984 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/EntityProxyManager.java @@ -0,0 +1,339 @@ +/************************************************************************** + * EntityProxyFactory.java + * + * $Revision: 1.45 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.lookup.AgentPath; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.lookup.Path; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.SoftCache; +import com.c2kernel.utils.server.SimpleTCPIPServer; + + +public class EntityProxyManager +{ + SoftCache proxyPool = new SoftCache(50); + HashMap treeSubscribers = new HashMap(); + HashMap connections = new HashMap(); + + // server objects + static ArrayList proxyClients = new ArrayList(); + static SimpleTCPIPServer proxyServer = null; + static String serverName = null; + + /** + * Create an entity proxy manager to listen for proxy events and reap unused proxies + */ + public EntityProxyManager() + { + Logger.msg(5, "EntityProxyManager - Starting....."); + + Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers")); + while(servers.hasMoreElements()) { + Path thisServerPath = servers.nextElement(); + try { + int syskey = thisServerPath.getSysKey(); + String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); + String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); + int remotePort = Integer.parseInt(portStr); + connectToProxyServer(remoteServer, remotePort); + + } catch (Exception ex) { + Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); + Logger.error(ex); + } + } + } + + public void connectToProxyServer(String name, int port) { + ProxyServerConnection oldConn = connections.get(name); + if (oldConn != null) + oldConn.shutdown(); + connections.put(name, new ProxyServerConnection(name, port, this)); + } + + + protected void resubscribe(ProxyServerConnection conn) { + synchronized (proxyPool) { + for (Integer key : proxyPool.keySet()) { + ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); + Logger.msg(5, "Subscribing to entity "+key); + conn.sendMessage(sub); + } + } + } + + /** + * @param sub + */ + private void sendMessage(ProxyMessage sub) { + for (ProxyServerConnection element : connections.values()) { + element.sendMessage(sub); + } + + } + + public void shutdown() { + Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections"); + for (ProxyServerConnection element : connections.values()) { + element.shutdown(); + } + } + + protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { + if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); + + if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response + return; + + if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info + informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); + else { + // proper proxy message + Logger.msg(5, "Received proxy message: "+thisMessage.toString()); + Integer key = new Integer(thisMessage.getSysKey()); + EntityProxy relevant = proxyPool.get(key); + if (relevant == null) + Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); + else + try { + relevant.notify(thisMessage); + } catch (Throwable ex) { + Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); + Logger.error(ex); + } + } + } + + private void informTreeSubscribers(boolean state, String path) { + DomainPath last = new DomainPath(path); + DomainPath parent; boolean first = true; + synchronized(treeSubscribers) { + while((parent = last.getParent()) != null) { + + for (DomainPathSubscriber sub : treeSubscribers.keySet()) { + DomainPath interest = treeSubscribers.get(sub); + if (interest.equals(parent)) { + if (state == ProxyMessage.ADDED) + sub.pathAdded(last); + else if (first) + sub.pathRemoved(last); + } + } + last = parent; + first = false; + } + } + } + + public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { + synchronized(treeSubscribers) { + treeSubscribers.put(sub, interest); + } + } + + public void unsubscribeTree(DomainPathSubscriber sub) { + synchronized(treeSubscribers) { + treeSubscribers.remove(sub); + } + } + + /************************************************************************** + * + **************************************************************************/ + private EntityProxy createProxy( org.omg.CORBA.Object ior, + int systemKey, + boolean isItem ) + throws ObjectNotFoundException + { + + EntityProxy newProxy = null; + + Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey); + + if( isItem ) + { + newProxy = new ItemProxy(ior, systemKey); + } + else + { + newProxy = new AgentProxy(ior, systemKey); + } + + // subscribe to changes from server + ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); + sendMessage(sub); + reportCurrentProxies(9); + return ( newProxy ); + } + + protected void removeProxy( int systemKey ) + { + ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); + Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); + sendMessage(sub); + } + + + /************************************************************************** + * EntityProxy getProxy( ManageableEntity, SystemKey) + * + * Called by the other GetProxy methods. Fills in either the ior or the + * SystemKey + **************************************************************************/ + private EntityProxy getProxy( org.omg.CORBA.Object ior, + int systemKey, + boolean isItem ) + throws ObjectNotFoundException + { + Integer key = new Integer(systemKey); + + synchronized(proxyPool) { + EntityProxy newProxy; + // return it if it exists + newProxy = proxyPool.get(key); + if (newProxy == null) { + // create a new one + newProxy = createProxy(ior, systemKey, isItem ); + proxyPool.put(key, newProxy); + } + return newProxy; + + } + } + + /************************************************************************** + * EntityProxy getProxy( String ) + * + * Proxy from Alias + **************************************************************************/ + public EntityProxy getProxy( Path path ) + throws ObjectNotFoundException + { + + //convert namePath to dn format + Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")"); + boolean isItem = !(path.getEntity() instanceof AgentPath); + return getProxy( Gateway.getLDAPLookup().getIOR(path), + path.getSysKey(), + isItem ); + + } + + /************************************************************************** + * void reportCurrentProxies() + * + * A utility to Dump the current proxies loaded + **************************************************************************/ + public void reportCurrentProxies(int logLevel) + { + if (!Logger.doLog(logLevel)) return; + Logger.msg(logLevel, "Current proxies: "); + try { + synchronized(proxyPool) { + Iterator i = proxyPool.keySet().iterator(); + + for( int count=0; i.hasNext(); count++ ) + { + Integer nextProxy = i.next(); + EntityProxy thisProxy = proxyPool.get(nextProxy); + if (thisProxy != null) + Logger.msg(logLevel, + "" + count + ": " + + proxyPool.get(nextProxy).getClass().getName() + + ": " + nextProxy); + } + } + } catch (ConcurrentModificationException ex) { + Logger.msg(logLevel, "Proxy cache modified. Aborting."); + } + } + + + /************************************************************************** + * Static Proxy Server methods + **************************************************************************/ + + /** + * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops + * @param c2kProps + */ + public static void initServer() + { + Logger.msg(5, "EntityProxyFactory::initServer - Starting....."); + String port = Gateway.getProperty("ItemServer.Proxy.port"); + serverName = Gateway.getProperty("ItemServer.name"); + if (port == null) { + Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes."); + return; + } + + // set up the proxy server + try { + int portNo = Integer.parseInt(port); + Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port); + proxyServer = new SimpleTCPIPServer(portNo, ProxyClientConnection.class, 200); + proxyServer.startListening(); + } catch (Exception ex) { + Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes."); + Logger.error(ex); + } + } + + public static void sendProxyEvent(ProxyMessage message) { + if (proxyServer != null && message.getPath() != null) + synchronized(proxyClients) { + for (ProxyClientConnection client : proxyClients) { + client.sendMessage(message); + } + } + } + + public static void reportConnections(int logLevel) { + synchronized(proxyClients) { + Logger.msg(logLevel, "Currently connected proxy clients:"); + for (ProxyClientConnection client : proxyClients) { + Logger.msg(logLevel, " "+client); + } + } + } + + public static void shutdownServer() { + if (proxyServer != null) { + Logger.msg(1, "EntityProxyManager: Closing Server."); + proxyServer.stopListening(); + } + } + + public static void registerProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.add(client); + } + } + + public static void unRegisterProxyClient(ProxyClientConnection client) { + synchronized(proxyClients) { + proxyClients.remove(client); + } + } +} + diff --git a/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java b/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java new file mode 100644 index 0000000..3ddb99c --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/EntityProxyObserver.java @@ -0,0 +1,27 @@ +package com.c2kernel.entity.proxy; + +import com.c2kernel.entity.C2KLocalObject; + + + +public interface EntityProxyObserver +{ + /************************************************************************** + * Subscribed items are broken apart and fed one by one to these methods. + * Replacement after an event is done by feeding the new memberbase with the same id. + * ID could be an XPath? + **************************************************************************/ + public void add(V contents); + + /************************************************************************** + * the 'type' parameter should be an indication of the type of object + * supplied so that the subscriber can associate the call back with + * one of its subscriptions. If we go with an Xpath subscription form, + * then the id will probably be sufficient. + * Should be comparable (substring whatever) with the parameter given to + * the subscribe method of ItemProxy. + **************************************************************************/ + public void remove(String id); + + public void control(String control, String msg); +} diff --git a/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java new file mode 100644 index 0000000..658e0c8 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ItemProxy.java @@ -0,0 +1,213 @@ +/************************************************************************** + * ItemProxy.java + * + * $Revision: 1.25 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.ArrayList; + +import com.c2kernel.common.AccessRightsException; +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.common.InvalidTransitionException; +import com.c2kernel.common.ObjectAlreadyExistsException; +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.common.PersistencyException; +import com.c2kernel.entity.Item; +import com.c2kernel.entity.ItemHelper; +import com.c2kernel.entity.ManageableEntity; +import com.c2kernel.entity.agent.Job; +import com.c2kernel.entity.agent.JobArrayList; +import com.c2kernel.lifecycle.instance.stateMachine.Transitions; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.Logger; + +/****************************************************************************** + * It is a wrapper for the connection and communication with Item + * It caches data loaded from the Item to reduce communication + * + * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ + * @author $Author: abranson $ + ******************************************************************************/ +public class ItemProxy extends EntityProxy +{ + + /************************************************************************** + * + **************************************************************************/ + protected ItemProxy( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + super(ior, systemKey); + + } + + @Override + public ManageableEntity narrow() throws ObjectNotFoundException + { + try { + return ItemHelper.narrow(mIOR); + } catch (org.omg.CORBA.BAD_PARAM ex) { } + throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); + } + /************************************************************************** + * + * + **************************************************************************/ + public void initialise( int agentId, + String itemProps, + String workflow ) + throws AccessRightsException, + InvalidDataException, + PersistencyException, + ObjectNotFoundException + { + Logger.msg(7, "ItemProxy::initialise - started"); + + ((Item)getEntity()).initialise( agentId, itemProps, workflow ); + } + + public void setProperty(AgentProxy agent, String name, String value) + throws AccessRightsException, + PersistencyException + { + String[] params = new String[2]; + params[0] = name; + params[1] = value; + try { + agent.execute(this, "WriteProperty", params); + } catch (AccessRightsException e) { + throw (e); + } catch (PersistencyException e) { + throw (e); + } catch (Exception e) { + Logger.error(e); + throw new PersistencyException("Could not store property"); + } + } + /************************************************************************** + * + **************************************************************************/ + protected void requestAction( Job thisJob ) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException + { + String outcome = thisJob.getOutcomeString(); + // check fields that should have been filled in + if (outcome==null) + if (thisJob.isOutcomeUsed()) + throw new InvalidDataException("Outcome is required.", ""); + else + outcome=""; + + if (thisJob.getAgentId() == -1) + throw new InvalidDataException("No Agent specified.", ""); + + Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); + requestAction (thisJob.getAgentId(), thisJob.getStepPath(), + thisJob.getPossibleTransition(), outcome); + } + + //requestData is xmlString + public void requestAction( int agentId, + String stepPath, + int transitionID, + String requestData + ) + throws AccessRightsException, + InvalidTransitionException, + ObjectNotFoundException, + InvalidDataException, + PersistencyException, + ObjectAlreadyExistsException + { + ((Item)getEntity()).requestAction( agentId, + stepPath, + transitionID, + requestData ); + } + + /************************************************************************** + * + **************************************************************************/ + public String queryLifeCycle( int agentId, + boolean filter + ) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + return ((Item)getEntity()).queryLifeCycle( agentId, + filter ); + } + + + /************************************************************************** + * + **************************************************************************/ + private ArrayList getJobList(int agentId, boolean filter) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + JobArrayList thisJobList; + try { + String jobs = queryLifeCycle(agentId, filter); + thisJobList = (JobArrayList)CastorXMLUtility.unmarshall(jobs); + } + catch (Exception e) { + Logger.error(e); + throw new PersistencyException("Exception::ItemProxy::getJobList() - Cannot unmarshall the jobs", null); + } + return thisJobList.list; + } + + public ArrayList getJobList(AgentProxy agent) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + return getJobList(agent.getSystemKey()); + } + + private ArrayList getJobList(int agentId) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException + { + return getJobList(agentId, true); + } + + private Job getJobByName(String actName, int agentId) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException { + + ArrayList jobList = getJobList(agentId); + for (Job job : jobList) { + int transition = job.getPossibleTransition(); + if (job.getStepName().equals(actName)) + if (transition == Transitions.COMPLETE || transition == Transitions.DONE) + return job; + } + return null; + + } + + public Job getJobByName(String actName, AgentProxy agent) + throws AccessRightsException, + ObjectNotFoundException, + PersistencyException { + return getJobByName(actName, agent.getSystemKey()); + } +} diff --git a/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java new file mode 100644 index 0000000..157297f --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/MemberSubscription.java @@ -0,0 +1,118 @@ + +package com.c2kernel.entity.proxy; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.StringTokenizer; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.utils.Logger; + +public class MemberSubscription implements Runnable { + public static final String ERROR = "Error"; + public static final String END = "theEND"; + + EntityProxy subject; + String interest; + // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used + WeakReference> observerReference; + ArrayList contents = new ArrayList(); + boolean preLoad; + + public MemberSubscription(EntityProxyObserver observer, String interest, boolean preLoad) { + setObserver(observer); + this.interest = interest; + this.preLoad = preLoad; + } + + @Override + public void run() { + Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest); + if (preLoad) loadChildren(); + } + + private void loadChildren() { + C newMember; + EntityProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + try { + // fetch contents of path + String children = subject.queryData(interest+"/all"); + StringTokenizer tok = new StringTokenizer(children, ","); + ArrayList newContents = new ArrayList(); + while (tok.hasMoreTokens()) + newContents.add(tok.nextToken()); + + // look to see what's new + for (String newChild: newContents) { + + // load child object + try { + newMember = (C)subject.getObject(interest+"/"+newChild); + contents.remove(newChild); + observer.add(newMember); + } catch (ObjectNotFoundException ex) { + observer.control(ERROR, "Listed member "+newChild+" was not found."); + } + } + // report what's left in old contents as deleted + for (String oldChild: contents) { + observer.remove(interest+"/"+oldChild); + } + //replace contents arraylist + contents = newContents; + //report that we're done + observer.control(END, null); + } catch (Exception ex) { + observer.control(ERROR, "Query on "+interest+" failed with "+ex); + } + } + + public boolean isRelevant(String path) { + Logger.msg(7, "Checking relevance of "+path+" to "+interest); + return (path.startsWith(interest)); + } + + public void update(String path, boolean deleted) { + EntityProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); + if (!path.startsWith(interest)) // doesn't concern us + return; + + if (path.equals(interest)) // refresh contents + loadChildren(); + else { + String name = path.substring(interest.length()); + if (deleted) { + Logger.msg(4, "Removing "+path); + contents.remove(name); + observer.remove(name); + } + else { + try { + C newMember = (C)subject.getObject(path); + Logger.msg(4, "Adding "+path); + contents.add(name); + observer.add(newMember); + } catch (ObjectNotFoundException e) { + Logger.error("Member Subscription: could not load "+path); + Logger.error(e); + } + } + } + } + + public void setObserver(EntityProxyObserver observer) { + observerReference = new WeakReference>(observer); + } + + public void setSubject(EntityProxy subject) { + this.subject = subject; + } + + public EntityProxyObserver getObserver() { + return observerReference.get(); + } +} + diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java new file mode 100644 index 0000000..9687f22 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyClientConnection.java @@ -0,0 +1,185 @@ +package com.c2kernel.entity.proxy; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Iterator; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.utils.Logger; +import com.c2kernel.utils.server.SocketHandler; + +/************************************************************************** + * + * $Revision: 1.18 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public class ProxyClientConnection implements SocketHandler { + + Socket clientSocket = null; + static int clientId = -1; + int thisClientId; + ArrayList sysKeys; + PrintWriter response; + BufferedReader request; + boolean closing = false; + + public ProxyClientConnection() { + super(); + thisClientId = ++clientId; + EntityProxyManager.registerProxyClient(this); + Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); + } + + + @Override + public String getName() { + return "Proxy Client Connection"; + } + + @Override + public boolean isBusy() { + return clientSocket != null; + } + + @Override + public synchronized void setSocket(Socket newSocket) { + try { + Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort()); + newSocket.setSoTimeout(500); + clientSocket = newSocket; + response = new PrintWriter(clientSocket.getOutputStream(), true); + sysKeys = new ArrayList(); + } catch (SocketException ex) { + Logger.msg("Could not set socket timeout:"); + Logger.error(ex); + closeSocket(); + } catch (IOException ex) { + Logger.msg("Could not setup output stream:"); + Logger.error(ex); + closeSocket(); + } + } + + /** + * Main loop. Reads proxy commands from the client and acts on them. + */ + @Override + public void run() { + Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress()); + Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress()); + try { + request = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + String input = null; + ProxyMessage thisMessage; + while (clientSocket != null) { + try { + input = request.readLine(); + Logger.msg(9, "ProxyClientConnection "+thisClientId+" - received "+input); + thisMessage = new ProxyMessage(input); + processMessage(thisMessage); + } catch (InterruptedIOException ex) { //timeout + } catch (InvalidDataException ex) { // invalid proxy message + Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input); + } + + } + } catch (IOException ex) { + if (!closing) + Logger.error("ProxyClientConnection "+thisClientId+" - Error reading from socket."); + } + closeSocket(); + Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed."); + } + + private void processMessage(ProxyMessage message) throws InvalidDataException { + + // proxy disconnection + if (message.getPath().equals(ProxyMessage.BYEPATH)) { + Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting"); + closeSocket(); + } + + // proxy checking connection + else if (message.getPath().equals(ProxyMessage.PINGPATH)) + response.println(ProxyMessage.pingMessage); + + // new subscription to entity changes + else if (message.getPath().equals(ProxyMessage.ADDPATH)) { + Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getSysKey()); + synchronized (sysKeys) { + sysKeys.add(new Integer(message.getSysKey())); + } + } + + // remove of subscription to entity changes + else if (message.getPath().equals(ProxyMessage.DELPATH)) { + synchronized (sysKeys) { + sysKeys.remove(new Integer(message.getSysKey())); + } + Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getSysKey()); + } + + else // unknown message + Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message); + + } + + public synchronized void sendMessage(ProxyMessage message) { + if (clientSocket==null) return; // idle + boolean relevant = message.getSysKey() == ProxyMessage.NA; + synchronized (sysKeys) { + for (Iterator iter = sysKeys.iterator(); iter.hasNext() && !relevant;) { + Integer thisKey = iter.next(); + if (thisKey.intValue() == message.getSysKey()) + relevant = true; + } + } + if (!relevant) return; // not for our client + + response.println(message); + } + + @Override + public void shutdown() { + if (isBusy()) { + closing = true; + Logger.msg("ProxyClientConnection "+thisClientId+" closing."); + closeSocket(); + } + } + + @Override + public String toString() { + if (clientSocket == null) return thisClientId+": idle"; + else return thisClientId+": "+clientSocket.getInetAddress(); + } + + private synchronized void closeSocket() { + if (clientSocket==null) return; + try { + request.close(); + response.close(); + clientSocket.close(); + } catch (IOException e) { + Logger.error("ProxyClientConnection "+thisClientId+" - Could not close socket."); + Logger.error(e); + } + synchronized (sysKeys) { + sysKeys = null; + } + + clientSocket = null; + + } + +} diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java b/src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java new file mode 100644 index 0000000..62866eb --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyMessage.java @@ -0,0 +1,102 @@ +package com.c2kernel.entity.proxy; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.util.StringTokenizer; + +import com.c2kernel.common.InvalidDataException; + + +/************************************************************************** + * + * $Revision: 1.11 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public class ProxyMessage { + + // special server message paths + public static final String BYEPATH = "bye"; + public static final String ADDPATH = "add"; + public static final String DELPATH = "del"; + public static final String PINGPATH = "ping"; + public static final boolean ADDED = false; + public static final boolean DELETED = true; + public static final int NA = -1; + + static ProxyMessage byeMessage = new ProxyMessage(NA, BYEPATH, ADDED); + static ProxyMessage pingMessage = new ProxyMessage(NA, PINGPATH, ADDED); + + private int sysKey = NA; + private String path = ""; + private String server = null; + private boolean state = ADDED; + + public ProxyMessage() { + super(); + } + public ProxyMessage(int sysKey, String path, boolean state) { + this(); + setSysKey(sysKey); + setPath(path); + setState(state); + } + + public ProxyMessage(String line) throws InvalidDataException, IOException { + if (line == null) + throw new IOException("Null proxy message"); + StringTokenizer tok = new StringTokenizer(line,":"); + if (tok.countTokens()!=2) + throw new InvalidDataException("String '"+line+"' does not constitute a valid proxy message.", ""); + sysKey = Integer.parseInt(tok.nextToken()); + path = tok.nextToken(); + if (path.startsWith("-")) { + state = DELETED; + path = path.substring(1); + } + } + + public ProxyMessage(DatagramPacket packet) throws InvalidDataException, IOException { + this(new String(packet.getData())); + } + + public int getSysKey() { + return sysKey; + } + + public void setSysKey(int sysKey) { + this.sysKey = sysKey; + } + + public String getPath() { + return path; + } + + public void setPath(String newPath) { + this.path = newPath; + } + + public boolean getState() { + return state; + } + + public void setState(boolean state) { + this.state = state; + } + + @Override + public String toString() { + return sysKey+":"+(state?"-":"")+path; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } +} diff --git a/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java new file mode 100644 index 0000000..6807953 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/proxy/ProxyServerConnection.java @@ -0,0 +1,134 @@ +/************************************************************************** + * EntityProxyFactory.java + * + * $Revision: 1.3 $ + * $Date: 2005/05/25 12:11:44 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.PrintWriter; +import java.net.Socket; + +import com.c2kernel.common.InvalidDataException; +import com.c2kernel.utils.Logger; + + +public class ProxyServerConnection extends Thread +{ + + public boolean serverIsActive = true; + // proxy client details + String serverName; + int serverPort; + Socket serverConnection; + EntityProxyManager manager; + // for talking to the proxy server + PrintWriter serverStream; + boolean listening = false; + static boolean isServer = false; + + /** + * Create an entity proxy manager to listen for proxy events and reap unused proxies + */ + public ProxyServerConnection(String host, int port, EntityProxyManager manager) + { + Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port); + serverName = host; + serverPort = port; + this.manager = manager; + listening = true; + start(); + } + + @Override + public void run() { + Thread.currentThread().setName("Proxy Client Connection Listener to "+serverName+":"+serverPort); + while (listening) { + try { + if (serverConnection == null) connect(); + if (serverConnection != null) { + BufferedReader request = new BufferedReader(new InputStreamReader(serverConnection.getInputStream())); + String input = null; + ProxyMessage thisMessage; + while (listening && serverConnection != null) { + try { + input = request.readLine(); + thisMessage = new ProxyMessage(input); + thisMessage.setServer(serverName); + manager.processMessage(thisMessage); + } catch (InterruptedIOException ex) { // timeout - send a ping + sendMessage(ProxyMessage.pingMessage); + } catch (InvalidDataException ex) { // invalid proxy message + if (input != null) + Logger.error("EntityProxyManager - Invalid proxy message: "+input); + } + } + } + } catch (IOException ex) { + Logger.error("ProxyServerConnection - Disconnected from "+serverName+":"+serverPort); + try { + serverStream.close(); + serverConnection.close(); + } catch (IOException e1) { } + + + serverStream = null; + serverConnection = null; + } + } + + if (serverStream != null) { + try { + Logger.msg(1, "Disconnecting from proxy server on "+serverName+":"+serverPort); + serverStream.println(ProxyMessage.byeMessage.toString()); + serverStream.close(); + serverConnection.close(); + serverConnection = null; + } catch (Exception e) { + Logger.error("Error disconnecting from proxy server."); + } + } + } + + public void connect() { + Logger.msg(3, "ProxyServerConnection - connecting to proxy server on "+serverName+":"+serverPort); + try { + serverConnection = new Socket(serverName, serverPort); + serverConnection.setKeepAlive(true); + serverIsActive = true; + serverConnection.setSoTimeout(5000); + serverStream = new PrintWriter(serverConnection.getOutputStream(), true); + Logger.msg("Connected to proxy server on "+serverName+":"+serverPort); + manager.resubscribe(this); + } catch (Exception e) { + Logger.msg(3, "Could not connect to proxy server. Retrying in 5s"); + try { Thread.sleep(5000); } catch (InterruptedException ex) { } + serverStream = null; + serverConnection = null; + serverIsActive = false; + } + } + + public void shutdown() { + Logger.msg("Proxy Client: flagging shutdown."); + listening = false; + } + + /** + * @param sub + */ + public void sendMessage(ProxyMessage sub) { + if (serverStream != null) + serverStream.println(sub); + } + +} + diff --git a/src/main/java/com/c2kernel/entity/transfer/TransferItem.java b/src/main/java/com/c2kernel/entity/transfer/TransferItem.java new file mode 100644 index 0000000..0e3b764 --- /dev/null +++ b/src/main/java/com/c2kernel/entity/transfer/TransferItem.java @@ -0,0 +1,133 @@ +package com.c2kernel.entity.transfer; + +import java.io.File; +import java.util.ArrayList; +import java.util.Enumeration; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.entity.TraceableEntity; +import com.c2kernel.lifecycle.instance.Workflow; +import com.c2kernel.lookup.DomainPath; +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.Path; +import com.c2kernel.persistency.ClusterStorage; +import com.c2kernel.persistency.outcome.Outcome; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.property.PropertyArrayList; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.FileStringUtility; +import com.c2kernel.utils.Logger; + +public class TransferItem { + public ArrayList domainPaths; + public int sysKey; + static int importAgentId; + + public TransferItem() throws Exception { + try { + importAgentId = Gateway.getLDAPLookup().getRoleManager().getAgentPath("system").getSysKey(); + } catch (ObjectNotFoundException e) { + Logger.error("TransferItem - System user not found!"); + throw e; + } + } + + public TransferItem(int sysKey) throws Exception { + this.sysKey = sysKey; + domainPaths = new ArrayList(); + Property name = (Property)Gateway.getStorage().get(sysKey, ClusterStorage.PROPERTY + "/Name", null); + Enumeration paths = Gateway.getLDAPLookup().search(new DomainPath(), name.getValue()); + while (paths.hasMoreElements()) { + DomainPath thisPath = (DomainPath)paths.nextElement(); + domainPaths.add(thisPath.toString()); + } + } + + public void exportItem(File dir, String path) throws Exception { + Logger.msg("Path " + path + " in " + sysKey); + String[] contents = Gateway.getStorage().getClusterContents(sysKey, path); + if (contents.length > 0) { + FileStringUtility.createNewDir(dir.getCanonicalPath()); + for (String content : contents) { + exportItem(new File(dir, content), path + "/" + content); + } + } else { //no children, try to dump object + try { + C2KLocalObject obj = Gateway.getStorage().get(sysKey, path, null); + Logger.msg("Dumping object " + path + " in " + sysKey); + File dumpPath = new File(dir.getCanonicalPath() + ".xml"); + FileStringUtility.string2File(dumpPath, CastorXMLUtility.marshall(obj)); + return; + } catch (ObjectNotFoundException ex) { + } // not an object + } + } + + public void importItem(File dir) throws Exception { + // check if already exists + try { + Property name = (Property)Gateway.getStorage().get(sysKey, ClusterStorage.PROPERTY + "/Name", null); + throw new Exception("Syskey " + sysKey + " already in use as " + name.getValue()); + } catch (Exception ex) { + } + + // retrieve objects + ArrayList objectFiles = FileStringUtility.listDir(dir.getCanonicalPath(), false, true); + ArrayList objects = new ArrayList(); + for (String element : objectFiles) { + String xmlFile = FileStringUtility.file2String(element); + C2KLocalObject newObj; + String choppedPath = element.substring(dir.getCanonicalPath().length()+1, element.length()-4); + Logger.msg(choppedPath); + if (choppedPath.startsWith(ClusterStorage.OUTCOME)) + newObj = new Outcome(choppedPath, xmlFile); + else + newObj = (C2KLocalObject)CastorXMLUtility.unmarshall(xmlFile); + + objects.add(newObj); + } + + // create item + EntityPath entityPath = new EntityPath(sysKey); + TraceableEntity newItem = (TraceableEntity)Gateway.getCorbaServer().createEntity(entityPath); + Gateway.getLDAPLookup().add(entityPath); + + PropertyArrayList props = new PropertyArrayList(); + Workflow wf = null; + // put objects + for (C2KLocalObject obj : objects) { + if (obj instanceof Property) + props.list.add((Property)obj); + else if (obj instanceof Workflow) + wf = (Workflow)obj; + } + + if (wf == null) + throw new Exception("No workflow found in import for "+sysKey); + + // init item + newItem.initialise(importAgentId, CastorXMLUtility.marshall(props), CastorXMLUtility.marshall(wf.search("workflow/domain"))); + + // store objects + importByType(ClusterStorage.COLLECTION, objects); + importByType(ClusterStorage.HISTORY, objects); + importByType(ClusterStorage.OUTCOME, objects); + importByType(ClusterStorage.VIEWPOINT, objects); + Gateway.getStorage().commit(this); + // add domPaths + for (String element : domainPaths) { + DomainPath newPath = new DomainPath(element, entityPath); + Gateway.getLDAPLookup().add(newPath); + } + } + + private void importByType(String type, ArrayList objects) throws Exception { + for (C2KLocalObject element : objects) { + if (element.getClusterType().equals(type)) + Gateway.getStorage().put(sysKey, element, this); + } + + } +} \ No newline at end of file diff --git a/src/main/java/com/c2kernel/entity/transfer/TransferSet.java b/src/main/java/com/c2kernel/entity/transfer/TransferSet.java new file mode 100644 index 0000000..71a593a --- /dev/null +++ b/src/main/java/com/c2kernel/entity/transfer/TransferSet.java @@ -0,0 +1,103 @@ +package com.c2kernel.entity.transfer; + +import java.io.File; +import java.util.ArrayList; + +import com.c2kernel.lookup.EntityPath; +import com.c2kernel.lookup.NextKeyManager; +import com.c2kernel.process.Gateway; +import com.c2kernel.utils.CastorXMLUtility; +import com.c2kernel.utils.FileStringUtility; +import com.c2kernel.utils.Logger; + +/************************************************************************** + * + * $Revision: 1.5 $ + * $Date: 2005/04/26 06:48:13 $ + * + * Copyright (C) 2003 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +public class TransferSet { + + public ArrayList items; + + public TransferSet() { + } + + public TransferSet(int[] sysKeys) { + items = new ArrayList(); + for (int sysKey : sysKeys) { + try { + items.add(new TransferItem(sysKey)); + } catch (Exception ex) { + Logger.error("Could not add item "+sysKey); + Logger.error(ex); + } + } + } + + public void exportPackage(File dir) throws Exception { + if (items==null || items.size() == 0) + throw new Exception("Nothing to dump"); + FileStringUtility.createNewDir(dir.getAbsolutePath()); + for (TransferItem element : items) { + try { + element.exportItem(new File(dir, String.valueOf(element.sysKey)), "/"); + } catch (Exception ex) { + Logger.error("Error dumping item "+element.sysKey); + Logger.error(ex); + } + } + + try { + String self = CastorXMLUtility.marshall(this); + FileStringUtility.string2File(new File(dir, "transferSet.xml"), self); + } catch (Exception ex) { + Logger.error("Error writing header file"); + Logger.error(ex); + } + } + + public void importPackage(File rootDir) { + for (TransferItem element : items) { + Logger.msg(5, "Importing "+element.sysKey); + try { + element.importItem(new File(rootDir, String.valueOf(element.sysKey))); + } catch (Exception ex) { + Logger.error("Import of item "+element.sysKey+" failed. Rolling back"); + Logger.error(ex); + Gateway.getStorage().abort(element); + } + } + checkLastKey(); + } + + private void checkLastKey() + { + // find highest key in out import set + int packageLastKey = 0; + for (TransferItem element : items) { + if (element.sysKey > packageLastKey) + packageLastKey = element.sysKey; + } + + try + { // find the current last key + NextKeyManager nextKeyMan = Gateway.getLDAPLookup().getNextKeyManager(); + EntityPath lastKey = nextKeyMan.getLastEntityPath(); + Logger.msg(1, "Last key imported was "+packageLastKey+". LDAP lastkey was "+lastKey.getSysKey()); + + + if (packageLastKey > lastKey.getSysKey()) { // set new last + Logger.msg(1, "Updating lastKey to "+packageLastKey); + nextKeyMan.writeLastEntityKey(packageLastKey); + } + } + catch (Exception ex) + { + Logger.error("Exception::LoadKeys::processFile() " + ex); + } + } +} -- cgit v1.2.3