From b086f57f56bf0eb9dab9cf321a0f69aaaae84347 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Wed, 30 May 2012 08:37:45 +0200 Subject: Initial Maven Conversion --- source/com/c2kernel/entity/C2KLocalObject.java | 32 -- source/com/c2kernel/entity/CorbaServer.java | 190 ----------- source/com/c2kernel/entity/TraceableEntity.java | 341 -------------------- source/com/c2kernel/entity/TraceableLocator.java | 85 ----- source/com/c2kernel/entity/agent/ActiveEntity.java | 295 ------------------ .../com/c2kernel/entity/agent/ActiveLocator.java | 87 ------ source/com/c2kernel/entity/agent/Job.java | 346 --------------------- source/com/c2kernel/entity/agent/JobArrayList.java | 30 -- source/com/c2kernel/entity/agent/JobList.java | 108 ------- source/com/c2kernel/entity/proxy/AgentProxy.java | 302 ------------------ .../entity/proxy/DomainPathSubscriber.java | 18 -- source/com/c2kernel/entity/proxy/EntityProxy.java | 248 --------------- .../c2kernel/entity/proxy/EntityProxyManager.java | 339 -------------------- .../c2kernel/entity/proxy/EntityProxyObserver.java | 27 -- source/com/c2kernel/entity/proxy/ItemProxy.java | 213 ------------- .../c2kernel/entity/proxy/MemberSubscription.java | 118 ------- .../entity/proxy/ProxyClientConnection.java | 185 ----------- source/com/c2kernel/entity/proxy/ProxyMessage.java | 102 ------ .../entity/proxy/ProxyServerConnection.java | 134 -------- .../com/c2kernel/entity/transfer/TransferItem.java | 133 -------- .../com/c2kernel/entity/transfer/TransferSet.java | 103 ------ 21 files changed, 3436 deletions(-) delete mode 100644 source/com/c2kernel/entity/C2KLocalObject.java delete mode 100644 source/com/c2kernel/entity/CorbaServer.java delete mode 100644 source/com/c2kernel/entity/TraceableEntity.java delete mode 100644 source/com/c2kernel/entity/TraceableLocator.java delete mode 100644 source/com/c2kernel/entity/agent/ActiveEntity.java delete mode 100644 source/com/c2kernel/entity/agent/ActiveLocator.java delete mode 100644 source/com/c2kernel/entity/agent/Job.java delete mode 100644 source/com/c2kernel/entity/agent/JobArrayList.java delete mode 100644 source/com/c2kernel/entity/agent/JobList.java delete mode 100644 source/com/c2kernel/entity/proxy/AgentProxy.java delete mode 100644 source/com/c2kernel/entity/proxy/DomainPathSubscriber.java delete mode 100644 source/com/c2kernel/entity/proxy/EntityProxy.java delete mode 100644 source/com/c2kernel/entity/proxy/EntityProxyManager.java delete mode 100644 source/com/c2kernel/entity/proxy/EntityProxyObserver.java delete mode 100644 source/com/c2kernel/entity/proxy/ItemProxy.java delete mode 100644 source/com/c2kernel/entity/proxy/MemberSubscription.java delete mode 100644 source/com/c2kernel/entity/proxy/ProxyClientConnection.java delete mode 100644 source/com/c2kernel/entity/proxy/ProxyMessage.java delete mode 100644 source/com/c2kernel/entity/proxy/ProxyServerConnection.java delete mode 100644 source/com/c2kernel/entity/transfer/TransferItem.java delete mode 100644 source/com/c2kernel/entity/transfer/TransferSet.java (limited to 'source/com/c2kernel/entity') diff --git a/source/com/c2kernel/entity/C2KLocalObject.java b/source/com/c2kernel/entity/C2KLocalObject.java deleted file mode 100644 index ec30dc1..0000000 --- a/source/com/c2kernel/entity/C2KLocalObject.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.c2kernel.entity; - -import java.io.Serializable; - -/** - * Objects that are to be stored by Cristal Entities must implement this interface and be (un)marshallable by Castor - * i.e. have a map file properly registered in the kernel. Domain implementors should not create new C2KLocalObjects - *

Each object will be stored as the path /clustertype/name in most cases. Exceptions are: - *

- * - * @see com.c2kernel.persistency.ClusterStorage - * @see com.c2kernel.persistency.ClusterStorageManager - * - * @author Andrew Branson - * - * $Revision: 1.5 $ - * $Date: 2004/01/22 11:10:41 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - */ - -public interface C2KLocalObject extends Serializable { - - public void setName(String name); - public String getName(); - - public String getClusterType(); -} diff --git a/source/com/c2kernel/entity/CorbaServer.java b/source/com/c2kernel/entity/CorbaServer.java deleted file mode 100644 index 84d2ef2..0000000 --- a/source/com/c2kernel/entity/CorbaServer.java +++ /dev/null @@ -1,190 +0,0 @@ -package com.c2kernel.entity; - -import java.util.Map; - -import org.omg.PortableServer.POA; -import org.omg.PortableServer.POAManager; -import org.omg.PortableServer.Servant; -import org.omg.PortableServer.POAManagerPackage.AdapterInactive; - -import com.c2kernel.common.CannotManageException; -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.ObjectAlreadyExistsException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.entity.agent.ActiveEntity; -import com.c2kernel.entity.agent.ActiveLocator; -import com.c2kernel.lookup.AgentPath; -import com.c2kernel.lookup.EntityPath; -import com.c2kernel.lookup.InvalidEntityPathException; -import com.c2kernel.process.Gateway; -import com.c2kernel.utils.Logger; -import com.c2kernel.utils.SoftCache; - -/************************************************************************** - * - * $Revision: 1.8 $ - * $Date: 2005/10/13 08:13:44 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - - -public class CorbaServer { - private Map mEntityCache; - private POA mRootPOA; - private POA mItemPOA; - private POA mAgentPOA; - private POAManager mPOAManager; - - public CorbaServer() throws InvalidDataException { - mEntityCache = new SoftCache(50); - - // init POA - try { - setupPOA(); - mPOAManager.activate(); - } catch (Exception ex) { - Logger.error(ex); - throw new InvalidDataException("Error initialising POA", ""); - } - - new Thread(new Runnable() { - @Override - public void run() { - Thread.currentThread().setName("ORB Invoker"); - Gateway.getORB().run(); - } - }).start(); - } - - public void close() { - try { - mPOAManager.deactivate(true, true); - } catch (AdapterInactive ex) { - Logger.error(ex); - } - } - - /************************************************************************** - * Initialises the C2KRootPOA with policies which are suitable for Factory objects - **************************************************************************/ - public void setupPOA() throws Exception { - - //Initialise the RootPOA - mRootPOA = org.omg.PortableServer.POAHelper.narrow( - Gateway.getORB().resolve_initial_references("RootPOA")); - - //Initilaise the default POAManager - - mPOAManager = mRootPOA.the_POAManager(); - - // Create POA for use by the entities - org.omg.CORBA.Policy[] policies = new org.omg.CORBA.Policy[6]; - - policies[0] = mRootPOA.create_id_assignment_policy( - org.omg.PortableServer.IdAssignmentPolicyValue.USER_ID); - policies[1] = mRootPOA.create_lifespan_policy( - org.omg.PortableServer.LifespanPolicyValue.PERSISTENT); - policies[2] = mRootPOA.create_servant_retention_policy( - org.omg.PortableServer.ServantRetentionPolicyValue.NON_RETAIN); - policies[3] = mRootPOA.create_id_uniqueness_policy( - org.omg.PortableServer.IdUniquenessPolicyValue.UNIQUE_ID); - policies[4] = mRootPOA.create_request_processing_policy( - org.omg.PortableServer.RequestProcessingPolicyValue. - USE_SERVANT_MANAGER); - policies[5] = mRootPOA.create_implicit_activation_policy( - org.omg.PortableServer.ImplicitActivationPolicyValue. - NO_IMPLICIT_ACTIVATION); - - mItemPOA = mRootPOA.create_POA( "Item", - mRootPOA.the_POAManager(), - policies ); - mAgentPOA = mRootPOA.create_POA( "Agent", - mRootPOA.the_POAManager(), - policies ); - - //Create the locators - TraceableLocator itemLocator = new TraceableLocator( mItemPOA ); - mItemPOA.set_servant_manager( itemLocator._this( Gateway.getORB() ) ); - - ActiveLocator agentLocator = new ActiveLocator( mAgentPOA ); - mAgentPOA.set_servant_manager( agentLocator._this( Gateway.getORB() ) ); - - } - - - /************************************************************************** - * Returns a CORBA servant for a pre-existing entity - **************************************************************************/ - private Servant getEntity(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException { - try { - EntityPath entityPath = new EntityPath(sysKey); - Servant entity = null; - synchronized (mEntityCache) { - entity = mEntityCache.get(entityPath); - if (entity == null) { - Logger.msg(7, "Creating new servant for "+sysKey); - - Class entityClass = Gateway.getLDAPLookup().getEntityClass(entityPath); - - if (entityClass == TraceableEntity.class) { - if (poa == null) poa = mItemPOA; - entity = new TraceableEntity(sysKey, poa); - } - else if (entityClass == ActiveEntity.class) { - if (poa == null) poa = mAgentPOA; - entity = new ActiveEntity(sysKey, poa); - } - mEntityCache.put(entityPath, entity); - } - } - return entity; - - } catch (InvalidEntityPathException ex) { - throw new ObjectNotFoundException("Invalid Entity Key", ""); - } - } - - /************************************************************************** - * Wrapper for fetching Items - **************************************************************************/ - public TraceableEntity getItem(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException { - return (TraceableEntity)getEntity(sysKey, poa); - } - - /************************************************************************** - * Wrapper for fetching Agents - **************************************************************************/ - public ActiveEntity getAgent(int sysKey, org.omg.PortableServer.POA poa) throws ObjectNotFoundException { - return (ActiveEntity)getEntity(sysKey, poa); - } - - /** - * @param entityPath - * @return - */ - public Servant createEntity(EntityPath entityPath) throws CannotManageException, ObjectAlreadyExistsException { - try { - if (entityPath == null) - entityPath = Gateway.getLDAPLookup().getNextKeyManager().generateNextEntityKey(); - } catch (Exception ex) { - Logger.error(ex); - throw new CannotManageException("Cannot generate next entity key"); - } - boolean isAgent = entityPath instanceof AgentPath; - POA myPOA = isAgent?mAgentPOA:mItemPOA; - org.omg.CORBA.Object obj = myPOA.create_reference_with_id(entityPath.getOID(), isAgent?AgentHelper.id():ItemHelper.id()); - entityPath.setIOR(obj); - Servant entity; - if (isAgent) - entity = new ActiveEntity(entityPath.getSysKey(), myPOA); - else - entity = new TraceableEntity(entityPath.getSysKey(), myPOA); - synchronized (mEntityCache) { - mEntityCache.put(entityPath, entity); - } - return entity; - - } -} diff --git a/source/com/c2kernel/entity/TraceableEntity.java b/source/com/c2kernel/entity/TraceableEntity.java deleted file mode 100644 index c7aff82..0000000 --- a/source/com/c2kernel/entity/TraceableEntity.java +++ /dev/null @@ -1,341 +0,0 @@ -/************************************************************************** - * TraceableEntity - * - * $Workfile$ - * $Revision: 1.108 $ - * $Date: 2005/10/06 14:46:22 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity; - - -import com.c2kernel.common.AccessRightsException; -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.InvalidTransitionException; -import com.c2kernel.common.ObjectAlreadyExistsException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.common.PersistencyException; -import com.c2kernel.entity.agent.JobArrayList; -import com.c2kernel.lifecycle.instance.CompositeActivity; -import com.c2kernel.lifecycle.instance.Workflow; -import com.c2kernel.lifecycle.instance.stateMachine.Transitions; -import com.c2kernel.lookup.AgentPath; -import com.c2kernel.lookup.InvalidEntityPathException; -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.persistency.ClusterStorageException; -import com.c2kernel.persistency.TransactionManager; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.property.PropertyArrayList; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.Logger; - -/************************************************************************** -* -* @author $Author: abranson $ $Date: 2005/10/06 14:46:22 $ -* @version $Revision: 1.108 $ -*
-*                                ,.   '\'\    ,---.
-*                            .  | \\  l\\l_ //    |
-*        _              _       |  \\/ `/  `.|    |
-*      /~\\   \        //~\     | Y |   |   ||  Y |
-*      |  \\   \      //  |     |  \|   |   |\ /  |
-*      [   ||        ||   ]     \   |  o|o  | >  /
-*     ] Y  ||        ||  Y [     \___\_--_ /_/__/
-*     |  \_|l,------.l|_/  |     /.-\(____) /--.\
-*     |   >'          `<   |     `--(______)----'
-*     \  (/~`--____--'~\)  /         u// u / \
-*      `-_>-__________-<_-'          / \  / /|
-*          /(_#(__)#_)\             ( .) / / ]
-*          \___/__\___/              `.`' /   [
-*           /__`--'__\                |`-'    |
-*        /\(__,>-~~ __)               |       |_
-*     /\//\\(  `--~~ )               _l       |-:.
-*     '\/  <^\      /^>             |  `   (  <  \\
-*          _\ >-__-< /_           ,-\  ,-~~->. \  `:._,/
-*        (___\    /___)         (____/    (____)   `-'
-*             Kovax            and, paradoxically, Kovax
-* 
-***************************************************************************/ - -public class TraceableEntity extends ItemPOA -{ - - private int mSystemKey; - private org.omg.PortableServer.POA mPoa; - private TransactionManager mStorage; - - - /************************************************************************** - * Constructor used by the Locator only - **************************************************************************/ - public TraceableEntity( int key, - org.omg.PortableServer.POA poa ) - { - Logger.msg(5,"TraceableEntity::constructor() - SystemKey:" + key ); - - mSystemKey = key; - mPoa = poa; - mStorage = Gateway.getStorage(); - } - - - /************************************************************************** - * - **************************************************************************/ - @Override - public org.omg.PortableServer.POA _default_POA() - { - if(mPoa != null) - return mPoa; - else - return super._default_POA(); - } - - - /************************************************************************** - * - **************************************************************************/ - @Override - public int getSystemKey() - { - Logger.msg(8, "TraceableEntity::getSystemKey() - " + mSystemKey); - return mSystemKey; - } - - /************************************************************************** - * - **************************************************************************/ - @Override - public void initialise( int agentId, - String propString, - String initWfString - ) - throws AccessRightsException, - InvalidDataException, - PersistencyException - { - Logger.msg(1, "TraceableEntity::initialise("+mSystemKey+") - agent:"+agentId); - synchronized (this) { - Workflow lc = null; - PropertyArrayList props = null; - - AgentPath agentPath; - try { - agentPath = new AgentPath(agentId); - } catch (InvalidEntityPathException e) { - throw new AccessRightsException("Invalid Agent Id:" + agentId); - } - - //unmarshalling checks the validity of the received strings - - // create properties - if (!propString.equals("")) { - try { - props = (PropertyArrayList)CastorXMLUtility.unmarshall(propString); - for (Object name : props.list) { - Property thisProp = (Property)name; - mStorage.put(mSystemKey, thisProp, props); - } - } catch (Throwable ex) { - Logger.msg(8, "TraceableEntity::initialise("+mSystemKey+ ") - Properties were invalid: "+propString); - Logger.error(ex); - mStorage.abort(props); - } - mStorage.commit(props); - } - - // create wf - try { - if (initWfString == null || initWfString.equals("")) - lc = new Workflow(new CompositeActivity()); - else - lc = new Workflow((CompositeActivity)CastorXMLUtility.unmarshall(initWfString)); - lc.initialise(mSystemKey, agentPath); - mStorage.put(mSystemKey, lc, null); - } catch (Throwable ex) { - Logger.msg(8, "TraceableEntity::initialise("+mSystemKey+") - Workflow was invalid: "+initWfString); - Logger.error(ex); - } - } - } - - /************************************************************************** - * - **************************************************************************/ - //requestdata is xmlstring - @Override - public void requestAction( int agentId, - String stepPath, - int transitionID, - String requestData - ) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - synchronized (this) { - try { - - Logger.msg(1, "TraceableEntity::request("+mSystemKey+") - " + - Transitions.getTransitionName(transitionID) + " "+stepPath + " by " +agentId ); - - AgentPath agent = new AgentPath(agentId); - Workflow lifeCycle = (Workflow)mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE+"/workflow", null); - - lifeCycle.requestAction( agent, - stepPath, - transitionID, - requestData ); - - // store the workflow if we've changed the state of the domain wf - if (!(stepPath.startsWith("workflow/predefined"))) - mStorage.put(mSystemKey, lifeCycle, null); - - // Normal operation exceptions - } catch (AccessRightsException ex) { - Logger.msg("Propagating AccessRightsException back to the calling agent"); - throw ex; - } catch (InvalidTransitionException ex) { - Logger.msg("Propagating InvalidTransitionException back to the calling agent"); - throw ex; - } catch (ObjectNotFoundException ex) { - Logger.msg("Propagating ObjectNotFoundException back to the calling agent"); - throw ex; - // errors - } catch (ClusterStorageException ex) { - Logger.error(ex); - throw new PersistencyException("Error on storage: "+ex.getMessage(), ""); - } catch (InvalidEntityPathException ex) { - Logger.error(ex); - throw new AccessRightsException("Invalid Agent Id: "+agentId, ""); - } catch (InvalidDataException ex) { - Logger.error(ex); - Logger.msg("Propagating InvalidDataException back to the calling agent"); - throw ex; - } catch (ObjectAlreadyExistsException ex) { - Logger.error(ex); - Logger.msg("Propagating ObjectAlreadyExistsException back to the calling agent"); - throw ex; - // non-CORBA exception hasn't been caught! - } catch (Throwable ex) { - Logger.error("Unknown Error: requestAction on "+mSystemKey+" by "+agentId+" executing "+stepPath); - Logger.error(ex); - throw new InvalidDataException("Extraordinary Exception during execution:"+ex.getClass().getName()+" - "+ex.getMessage(), ""); - } - } - } - - /************************************************************************** - * - **************************************************************************/ - @Override - public String queryLifeCycle( int agentId, - boolean filter - ) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - synchronized (this) { - Logger.msg(1, "TraceableEntity::queryLifeCycle("+mSystemKey+") - agent: " + agentId); - - try - { - AgentPath agent = new AgentPath(agentId); - Workflow wf = (Workflow)mStorage.get(mSystemKey, ClusterStorage.LIFECYCLE+"/workflow", null); - JobArrayList jobBag = new JobArrayList(); - CompositeActivity domainWf = (CompositeActivity)wf.search("workflow/domain"); - jobBag.list = filter?domainWf.calculateJobs(agent, true):domainWf.calculateAllJobs(agent, true); - Logger.msg(1, "TraceableEntity::queryLifeCycle("+mSystemKey+") - Returning "+jobBag.list.size()+" jobs."); - return CastorXMLUtility.marshall( jobBag ); - } - catch( Throwable ex ) - { - Logger.error(ex); - return ""; - } - } - } - - /************************************************************************** - * The description for operation getData. - * - * @param path - the path to the object required - * the suffix 'all' retrieves a listing of all keys on that level - * - * @return The result string in xml format - * except 'all' which returns a comma sep list - * - * @exception ObjectNotFoundException - * ************************************************************************/ - @Override - public String queryData(String path) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - synchronized (this) { - String result = ""; - - Logger.msg(1, "TraceableEntity::queryData("+mSystemKey+") - " + path ); - - try - { // check for cluster contents query - - if (path.endsWith("/all")) - { - int allPos = path.lastIndexOf("all"); - String query = path.substring(0,allPos); - String[] ids = mStorage.getClusterContents( mSystemKey, query ); - - for( int i=0; i transaction aborted!"); - } - - Logger.msg(5, "ActiveEntity::init() - completed."); - } - - /** - * - * @param propsString - * @return Properties - * @throws InvalidDataException Properties cannot be unmarshalled - * @throws ClusterStorageException - */ - private PropertyArrayList initProps( String propsString ) - throws InvalidDataException, - ClusterStorageException - { - PropertyArrayList props = null; - - // create properties - if( propsString != null && !propsString.equals("") ) - { - try - { - props = (PropertyArrayList)CastorXMLUtility.unmarshall(propsString); - } - catch( Exception ex ) - { - //any exception during unmarshall indicates that data was - //incorrect or the castor mapping was not set up - Logger.error(ex); - throw new InvalidDataException(ex.toString(), null); - } - - Iterator iter = props.list.iterator(); - - while( iter.hasNext() ) - mDatabase.put( mSystemKey, iter.next(), props ); - } - else - { - Logger.warning("ActiveEntity::initProps() - NO Properties!"); - } - - return props; - } - - /************************************************************************** - * - * - **************************************************************************/ - @Override - public org.omg.PortableServer.POA _default_POA() - { - if(mPOA != null) - return mPOA; - else - return super._default_POA(); - } - - - /************************************************************************** - * - * - **************************************************************************/ - @Override - public int getSystemKey() - { - return mSystemKey; - } - - - /************************************************************************** - * - * - **************************************************************************/ - @Override - public String queryData(String xpath) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - String result = ""; - int allPos = -1; - - Logger.msg(1, "ActiveEntity::queryData("+mSystemKey+") - " + xpath ); - - try - { - if( (allPos=xpath.indexOf("all")) != -1 ) - { - String query = xpath.substring(0,allPos); - String[] ids = mDatabase.getClusterContents( mSystemKey, query ); - - for( int i=0; i 0) - try - { - Viewpoint view = (Viewpoint) Gateway.getStorage().get(getItemSysKey(), ClusterStorage.VIEWPOINT + "/" + getSchemaType() + "/" + viewName, null); - mOutcome = view.getOutcome().getData(); - } catch (Exception ex) - { // not found, return null - } - } - return mOutcome; - } - - public Outcome getOutcome() - { - Logger.msg(1, "Get outcome"); - return new Outcome(-1, getOutcomeString(), getSchemaType(), getSchemaVersion()); - } - - public int getAgentId() throws ObjectNotFoundException - { - if (mAgentId == -1) - mAgentId = Gateway.getLDAPLookup().getRoleManager().getAgentPath(getAgentName()).getSysKey(); - return mAgentId; - } - - public void setAgentId(int id) - { - mAgentId = id; - agent = null; - } - - public String getAgentName() - { - if (mAgentName == null) - mAgentName = (String) mActProps.get("AgentName"); - return mAgentName; - } - - public void setAgentName(String agentName) - { - mAgentName = agentName; - } - - public String getAgentRole() - { - if (mAgentRole == null) - mAgentRole = (String) mActProps.get("Agent Role"); - return mAgentRole; - } - - public void setAgentRole(String role) - { - mAgentRole = role; - } - - public boolean isOutcomeUsed() - { - String schemaType = getSchemaType(); - return (Boolean.TRUE.equals(getActProp("AlwaysUseOutcome")) || mPossibleTransition == Transitions.DONE || mPossibleTransition == Transitions.COMPLETE) && !(schemaType == null || schemaType.equals("")); - } - - public int getID() - { - return mID; - } - - @Override - public String getName() - { - return mName; - } - - public void setID(int id) - { - mID = id; - mName = String.valueOf(id); - } - - @Override - public void setName(String name) - { - mName = name; - try - { - mID = Integer.parseInt(name); - } catch (NumberFormatException ex) - { - mID = -1; - } - } - - public void setItemSysKey(int sysKey) - { - mItemSysKey = sysKey; - item = null; - } - - @Override - public String getClusterType() - { - return ClusterStorage.JOB; - } - - public boolean equals(Job job) - { - return (getItemSysKey() == job.getItemSysKey()) && this.mStepPath.equals(job.mStepPath) && this.mPossibleTransition == job.mPossibleTransition; - } - - public Object getActProp(String name) - { - return mActProps.get(name); - } - - public String getActPropString(String name) - { - try - { - return mActProps.get(name).toString(); - } catch (NullPointerException ex) - { - return null; - } - } - - public String getStepName() - { - return mStepName; - } - - public void setStepName(String string) - { - mStepName = string; - } - - public String getStepPath() - { - return mStepPath; - } - - public void setStepPath(String string) - { - mStepPath = string; - } - - public int getPossibleTransition() - { - return mPossibleTransition; - } - - public void setPossibleTransition(int lint) - { - mPossibleTransition = lint; - } - - public CastorHashMap getActProps() - { - return mActProps; - } - - public void setActProps(CastorHashMap map) - { - mActProps = map; - } - - public KeyValuePair[] getKeyValuePairs() - { - return mActProps.getKeyValuePairs(); - } - - public void setKeyValuePairs(KeyValuePair[] pairs) - { - mActProps.setKeyValuePairs(pairs); - } - - public int getCurrentState() - { - return mCurrentState; - } - - public void setCurrentState(int string) - { - mCurrentState = string; - } - - public int getTargetState() { - return mTargetState; - } - - public void setTargetState(int mTargetState) { - this.mTargetState = mTargetState; - } - - /** - * Returns the actType. - * - * @return String - */ - public String getStepType() - { - return mStepType; - } - - /** - * Sets the actType. - * - * @param actType - * The actType to set - */ - public void setStepType(String actType) - { - mStepType = actType; - } -} \ No newline at end of file diff --git a/source/com/c2kernel/entity/agent/JobArrayList.java b/source/com/c2kernel/entity/agent/JobArrayList.java deleted file mode 100644 index dcb3215..0000000 --- a/source/com/c2kernel/entity/agent/JobArrayList.java +++ /dev/null @@ -1,30 +0,0 @@ -/************************************************************************** - * - * $Revision: 1.2 $ - * $Date: 2003/06/20 11:44:30 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.agent; - -import java.util.ArrayList; - -import com.c2kernel.utils.CastorArrayList; - -public class JobArrayList extends CastorArrayList -{ - - public JobArrayList() - { - super(); - } - - public JobArrayList(ArrayList aList) - { - super(aList); - } - - -} diff --git a/source/com/c2kernel/entity/agent/JobList.java b/source/com/c2kernel/entity/agent/JobList.java deleted file mode 100644 index f8a88ee..0000000 --- a/source/com/c2kernel/entity/agent/JobList.java +++ /dev/null @@ -1,108 +0,0 @@ -package com.c2kernel.entity.agent; - -import java.util.Iterator; -import java.util.Vector; - -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.persistency.RemoteMap; -import com.c2kernel.utils.Logger; - - -/************************************************************************** -* -* @author $Author: abranson $ $Date: 2006/03/03 13:52:21 $ -* @version $Revision: 1.15 $ -***************************************************************************/ -public class JobList extends RemoteMap -{ - - /************************************************************************** - * Empty constructor for Castor - **************************************************************************/ - public JobList(int sysKey, Object locker) - { - super(sysKey, ClusterStorage.JOB, locker); - } - - - /************************************************************************** - * - **************************************************************************/ - public void addJob( Job job ) - { - synchronized(this) { - int jobId = getLastId()+1; - job.setID(jobId); - put(String.valueOf(jobId), job); - } - } - - /** - * Cannot be stored - */ - @Override - public String getClusterType() { - return null; - } - - - public Job getJob(int id) { - return get(String.valueOf(id)); - } - - - /** - * @param job - */ - public void removeJobsWithSysKey( int sysKey ) - { - Iterator currentMembers = values().iterator(); - Job j = null; - - while( currentMembers.hasNext() ) - { - j = currentMembers.next(); - - if( j.getItemSysKey() == sysKey ) - remove( String.valueOf(j.getID()) ); - } - - Logger.msg(5, "JobList::removeJobsWithSysKey() - " + sysKey + " DONE." ); - } - - public void removeJobsForStep( int sysKey, String stepPath ) - { - Iterator currentMembers = values().iterator(); - while( currentMembers.hasNext() ) - { - Job j = currentMembers.next(); - if( j.getItemSysKey() == sysKey && j.getStepPath().equals(stepPath)) - remove( String.valueOf(j.getID()) ); - } - - Logger.msg(5, "JobList::removeJobsForStep() - " + sysKey + " DONE." ); - } - /** - * @param itemKey - * @param string - * @return - */ - public Vector getJobsOfSysKey(int sysKey) - { - Iterator currentMembers = values().iterator(); - Job j = null; - Vector jobs = new Vector(); - - while( currentMembers.hasNext() ) - { - j = currentMembers.next(); - - if( j.getItemSysKey() == sysKey ) - jobs.add(j); - } - - Logger.msg(5, "JobList::getJobsOfSysKey() - returning " + jobs.size() + " Jobs." ); - - return jobs; - } -} \ No newline at end of file diff --git a/source/com/c2kernel/entity/proxy/AgentProxy.java b/source/com/c2kernel/entity/proxy/AgentProxy.java deleted file mode 100644 index 72ed088..0000000 --- a/source/com/c2kernel/entity/proxy/AgentProxy.java +++ /dev/null @@ -1,302 +0,0 @@ -/************************************************************************** - * AgentProxy.java - * - * $Revision: 1.37 $ - * $Date: 2005/10/05 07:39:36 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.Date; -import java.util.Enumeration; - -import com.c2kernel.common.AccessRightsException; -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.InvalidTransitionException; -import com.c2kernel.common.ObjectAlreadyExistsException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.common.PersistencyException; -import com.c2kernel.entity.Agent; -import com.c2kernel.entity.AgentHelper; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.ManageableEntity; -import com.c2kernel.entity.agent.Job; -import com.c2kernel.lifecycle.instance.predefined.PredefinedStep; -import com.c2kernel.lifecycle.instance.stateMachine.Transitions; -import com.c2kernel.lookup.AgentPath; -import com.c2kernel.lookup.DomainPath; -import com.c2kernel.lookup.EntityPath; -import com.c2kernel.lookup.InvalidEntityPathException; -import com.c2kernel.lookup.Path; -import com.c2kernel.persistency.outcome.OutcomeValidator; -import com.c2kernel.persistency.outcome.Schema; -import com.c2kernel.process.Gateway; -import com.c2kernel.scripting.ErrorInfo; -import com.c2kernel.scripting.Script; -import com.c2kernel.scripting.ScriptingEngineException; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.LocalObjectLoader; -import com.c2kernel.utils.Logger; - -/****************************************************************************** - * It is a wrapper for the connection and communication with Agent - * It caches data loaded from the Agent to reduce communication - * - * @version $Revision: 1.37 $ $Date: 2005/10/05 07:39:36 $ - * @author $Author: abranson $ - ******************************************************************************/ -public class AgentProxy extends EntityProxy -{ - AgentPath path; - - /************************************************************************** - * Creates an AgentProxy without cache and change notification - **************************************************************************/ - public AgentProxy( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - super(ior, systemKey); - try { - path = new AgentPath(systemKey); - } catch (InvalidEntityPathException e) { - throw new ObjectNotFoundException(); - } - } - - @Override - public ManageableEntity narrow() throws ObjectNotFoundException - { - try { - return AgentHelper.narrow(mIOR); - } catch (org.omg.CORBA.BAD_PARAM ex) { } - throw new ObjectNotFoundException("CORBA Object was not an Agent, or the server is down."); - } - /************************************************************************** - * - * - **************************************************************************/ - public void initialise( String agentProps, String collector ) - throws AccessRightsException, - InvalidDataException, - PersistencyException, - ObjectNotFoundException - { - Logger.msg(7, "AgentProxy::initialise - started"); - - ((Agent)getEntity()).initialise( agentProps ); - } - - public AgentPath getPath() { - return path; - } - - /** - * Executes a job on the given item using this agent. - * - * @param item - item holding this job - * @param job - the job to execute - */ - public void execute(ItemProxy item, Job job) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - OutcomeValidator validator = null; - String scriptName = job.getActPropString("ScriptName"); - Date startTime = new Date(); - Logger.msg(3, "AgentProxy - executing "+job.getStepPath()+" for "+path.getAgentName()); - // get the outcome validator if present - if (job.isOutcomeUsed()) - { - - // get schema info from act props - String schemaName = job.getActPropString("SchemaType"); - int schemaVersion; - try { - schemaVersion = Integer.parseInt(job.getActPropString("SchemaVersion")); - } catch (Exception e) { - throw new InvalidDataException(e.getClass().getName()+" extracing schema version", ""); - } - Logger.msg(5, "AgentProxy - fetching schema "+schemaName+"_"+schemaVersion+" for validation"); - // retrieve schema - Schema schema = LocalObjectLoader.getSchema(schemaName, schemaVersion); - - if (schema == null) - throw new InvalidDataException("Job references outcome type "+schemaName+" version "+schemaVersion+" that does not exist in this centre.", ""); - - try { - validator = OutcomeValidator.getValidator(schema); - } catch (Exception e) { - throw new InvalidDataException("Could not create validator: "+e.getMessage(), ""); - } - } - - if(scriptName != null && scriptName.length() > 0 && - (job.getPossibleTransition() == Transitions.DONE || job.getPossibleTransition() == Transitions.COMPLETE)) { - Logger.msg(3, "AgentProxy - executing script "+scriptName); - try { - - // pre-validate outcome from script if there is one - if (job.getOutcomeString()!= null && validator != null) { - Logger.msg(5, "AgentProxy - validating outcome before script execution"); - String error = validator.validate(job.getOutcomeString()); - if (error.length() > 0) { - Logger.error("Outcome not valid: \n " + error); - throw new InvalidDataException(error, ""); - } - } - - // load script - ErrorInfo scriptErrors = (ErrorInfo)callScript(item, job); - - if (scriptErrors.getFatal()) { - Logger.msg(3, "AgentProxy - fatal script error"); - Logger.error(scriptErrors.getErrors()); - throw new InvalidDataException("Fatal Script Error: \n"+scriptErrors.getErrors(), ""); - } - if (scriptErrors.getErrors().length() > 0) - Logger.warning("Script errors: "+scriptErrors.getErrors()); - } catch (ScriptingEngineException ex) { - Logger.error(ex); - throw new InvalidDataException(ex.getMessage(), ""); - } - } - - if (job.isOutcomeUsed()) { - Logger.msg(3, "AgentProxy - validating outcome"); - String error = validator.validate(job.getOutcomeString()); - if (error.length() > 0) - throw new InvalidDataException(error, ""); - } - - job.setAgentId(getSystemKey()); - Logger.msg(3, "AgentProxy - submitting job to item proxy"); - item.requestAction(job); - if (Logger.doLog(3)) { - Date timeNow = new Date(); - long secsNow = (timeNow.getTime()-startTime.getTime())/1000; - Logger.msg(3, "Execution took "+secsNow+" seconds"); - } - } - - public Object callScript(ItemProxy item, Job job) throws ScriptingEngineException { - Script script = new Script(item, this, job); - return script.execute(); - } - - /** - * Standard execution of jobs. Note that this method should always be the one used from clients - all execution - * parameters are taken from the job where they're probably going to be correct. - * - * @param job - * @throws AccessRightsException - * @throws InvalidDataException - * @throws InvalidTransitionException - * @throws ObjectNotFoundException - * @throws PersistencyException - * @throws ObjectAlreadyExistsException - */ - public void execute(Job job) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - try { - ItemProxy targetItem = (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(job.getItemSysKey())); - execute(targetItem, job); - } catch (InvalidEntityPathException e) { - throw new ObjectNotFoundException("Job contained invalid item sysKey: "+job.getItemSysKey(), ""); - } - } - - public void execute(ItemProxy item, String predefStep, C2KLocalObject obj) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - String param; - try { - param = marshall(obj); - } catch (Exception ex) { - Logger.error(ex); - throw new InvalidDataException("Error on marshall", ""); - } - execute(item, predefStep, param); - } - - public void execute(ItemProxy item, String predefStep, String param) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - String[] params = new String[1]; - params[0] = param; - execute(item, predefStep, params); - } - - public void execute(ItemProxy item, String predefStep, String[] params) - throws AccessRightsException, - InvalidDataException, - InvalidTransitionException, - ObjectNotFoundException, - PersistencyException, - ObjectAlreadyExistsException - { - item.requestAction(getSystemKey(), "workflow/predefined/"+predefStep, Transitions.DONE, PredefinedStep.bundleData(params)); - } - - /** Wrappers for scripts */ - public String marshall(Object obj) throws Exception { - return CastorXMLUtility.marshall(obj); - } - - public Object unmarshall(String obj) throws Exception { - return CastorXMLUtility.unmarshall(obj); - } - - /** Let scripts resolve items */ - public ItemProxy searchItem(String name) throws ObjectNotFoundException { - Enumeration results = Gateway.getLDAPLookup().search(new DomainPath(""),name); - - Path returnPath = null; - if (!results.hasMoreElements()) - throw new ObjectNotFoundException(name, ""); - - while(results.hasMoreElements()) { - Path nextMatch = results.nextElement(); - if (returnPath != null && nextMatch.getSysKey() != -1 && returnPath.getSysKey() != nextMatch.getSysKey()) - throw new ObjectNotFoundException("Too many items with that name"); - returnPath = nextMatch; - } - - return (ItemProxy)Gateway.getProxyManager().getProxy(returnPath); - } - - public ItemProxy getItem(String path) throws ObjectNotFoundException { - return (getItem(new DomainPath(path))); - } - - public ItemProxy getItem(DomainPath path) throws ObjectNotFoundException { - return (ItemProxy)Gateway.getProxyManager().getProxy(path); - } - - public ItemProxy getItemBySysKey(int sysKey) throws ObjectNotFoundException, InvalidEntityPathException { - return (ItemProxy)Gateway.getProxyManager().getProxy(new EntityPath(sysKey)); - } -} diff --git a/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java b/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java deleted file mode 100644 index 4089325..0000000 --- a/source/com/c2kernel/entity/proxy/DomainPathSubscriber.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.c2kernel.entity.proxy; - -import com.c2kernel.lookup.DomainPath; - -/************************************************************************** - * - * $Revision: 1.1 $ - * $Date: 2004/02/05 16:11:57 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -public interface DomainPathSubscriber { - - public void pathAdded(DomainPath path); - public void pathRemoved(DomainPath path); -} diff --git a/source/com/c2kernel/entity/proxy/EntityProxy.java b/source/com/c2kernel/entity/proxy/EntityProxy.java deleted file mode 100644 index fae2e28..0000000 --- a/source/com/c2kernel/entity/proxy/EntityProxy.java +++ /dev/null @@ -1,248 +0,0 @@ -/************************************************************************** - * EntityProxy.java - * - * $Revision: 1.35 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.HashMap; -import java.util.Iterator; - -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.ManageableEntity; -import com.c2kernel.persistency.ClusterStorageException; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.Logger; - - -/****************************************************************************** -* It is a wrapper for the connection and communication with Entities. -* It can cache data loaded from the Entity to reduce communication with it. -* This cache is syncronised with corresponding Entity through an event mechanism. -* -* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $ -* @author $Author: abranson $ -******************************************************************************/ - -abstract public class EntityProxy implements ManageableEntity -{ - - protected ManageableEntity mEntity = null; - protected org.omg.CORBA.Object mIOR; - protected int mSystemKey; - private HashMap, EntityProxyObserver> mSubscriptions; - - /************************************************************************** - * - **************************************************************************/ - protected EntityProxy( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity"); - - initialise( ior, systemKey); - } - - /************************************************************************** - * - **************************************************************************/ - private void initialise( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity"); - - mIOR = ior; - mSystemKey = systemKey; - mSubscriptions = new HashMap, EntityProxyObserver>(); - } - - - /************************************************************************** - * - **************************************************************************/ - public ManageableEntity getEntity() throws ObjectNotFoundException - { - if (mEntity == null) { - mEntity = narrow(); - } - return mEntity; - } - - abstract public ManageableEntity narrow() throws ObjectNotFoundException; - - /************************************************************************** - * - **************************************************************************/ - //check who is using.. and if toString() is sufficient - @Override - public int getSystemKey() - { - return mSystemKey; - } - - - /************************************************************************** - * - **************************************************************************/ - @Override - public String queryData( String path ) - throws ObjectNotFoundException - { - - try { - Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); - if (path.endsWith("all")) { - Logger.msg(7, "EntityProxy.queryData() - listing contents"); - String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); - StringBuffer retString = new StringBuffer(); - for (int i = 0; i < result.length; i++) { - retString.append(result[i]); - if (i"+e.getMessage()+""; - } - } - - public String[] getContents( String path ) throws ObjectNotFoundException { - try { - return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); - } catch (ClusterStorageException e) { - throw new ObjectNotFoundException(e.toString()); - } - } - - - /************************************************************************** - * - **************************************************************************/ - public C2KLocalObject getObject( String xpath ) - throws ObjectNotFoundException - { - // load from storage, falling back to proxy loader if not found in others - try - { - return Gateway.getStorage().get( mSystemKey, xpath , null); - } - catch( ClusterStorageException ex ) - { - Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); - throw new ObjectNotFoundException( ex.toString() ); - } - } - - - - public String getProperty( String name ) - throws ObjectNotFoundException - { - Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); - Property prop = (Property)getObject("Property/"+name); - try - { - return prop.getValue(); - } - catch (NullPointerException ex) - { - throw new ObjectNotFoundException(); - } - } - - public String getName() - { - try { - return getProperty("Name"); - } catch (ObjectNotFoundException ex) { - return null; - } - } - - - /************************************************************************** - * Subscription methods - **************************************************************************/ - - public void subscribe (MemberSubscription newSub) { - - newSub.setSubject(this); - synchronized (this){ - mSubscriptions.put( newSub, newSub.getObserver() ); - } - new Thread(newSub).start(); - Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); - } - - public void unsubscribe(EntityProxyObserver observer) - { - synchronized (this){ - for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { - MemberSubscription thisSub = e.next(); - if (mSubscriptions.get( thisSub ) == observer) { - e.remove(); - Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); - } - } - } - } - - public void dumpSubscriptions(int logLevel) { - if (mSubscriptions.size() == 0) return; - Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); - synchronized(this) { - for (MemberSubscription element : mSubscriptions.keySet()) { - EntityProxyObserver obs = element.getObserver(); - if (obs != null) - Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); - else - Logger.msg(logLevel, " Phantom subscription to "+element.interest); - } - } - } - - public void notify(ProxyMessage message) { - Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); - synchronized (this){ - if (!message.getServer().equals(EntityProxyManager.serverName)) - Gateway.getStorage().clearCache(mSystemKey, message.getPath()); - for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { - MemberSubscription newSub = e.next(); - if (newSub.getObserver() == null) { // phantom - Logger.msg(4, "Removing phantom subscription to "+newSub.interest); - e.remove(); - } - else - newSub.update(message.getPath(), message.getState()); - } - } - } - - /** - * If this is reaped, clear out the cache for it too. - */ - @Override - protected void finalize() throws Throwable { - Logger.msg(7, "Proxy "+mSystemKey+" reaped"); - Gateway.getStorage().clearCache(mSystemKey, null); - Gateway.getProxyManager().removeProxy(mSystemKey); - super.finalize(); - } - -} diff --git a/source/com/c2kernel/entity/proxy/EntityProxyManager.java b/source/com/c2kernel/entity/proxy/EntityProxyManager.java deleted file mode 100644 index 192a984..0000000 --- a/source/com/c2kernel/entity/proxy/EntityProxyManager.java +++ /dev/null @@ -1,339 +0,0 @@ -/************************************************************************** - * EntityProxyFactory.java - * - * $Revision: 1.45 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.ArrayList; -import java.util.ConcurrentModificationException; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Iterator; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.lookup.AgentPath; -import com.c2kernel.lookup.DomainPath; -import com.c2kernel.lookup.Path; -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.utils.Logger; -import com.c2kernel.utils.SoftCache; -import com.c2kernel.utils.server.SimpleTCPIPServer; - - -public class EntityProxyManager -{ - SoftCache proxyPool = new SoftCache(50); - HashMap treeSubscribers = new HashMap(); - HashMap connections = new HashMap(); - - // server objects - static ArrayList proxyClients = new ArrayList(); - static SimpleTCPIPServer proxyServer = null; - static String serverName = null; - - /** - * Create an entity proxy manager to listen for proxy events and reap unused proxies - */ - public EntityProxyManager() - { - Logger.msg(5, "EntityProxyManager - Starting....."); - - Enumeration servers = Gateway.getLDAPLookup().searchEntities(new DomainPath("/servers")); - while(servers.hasMoreElements()) { - Path thisServerPath = servers.nextElement(); - try { - int syskey = thisServerPath.getSysKey(); - String remoteServer = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/Name", null)).getValue(); - String portStr = ((Property)Gateway.getStorage().get(syskey, ClusterStorage.PROPERTY+"/ProxyPort", null)).getValue(); - int remotePort = Integer.parseInt(portStr); - connectToProxyServer(remoteServer, remotePort); - - } catch (Exception ex) { - Logger.error("Exception retrieving proxy server connection data for "+thisServerPath); - Logger.error(ex); - } - } - } - - public void connectToProxyServer(String name, int port) { - ProxyServerConnection oldConn = connections.get(name); - if (oldConn != null) - oldConn.shutdown(); - connections.put(name, new ProxyServerConnection(name, port, this)); - } - - - protected void resubscribe(ProxyServerConnection conn) { - synchronized (proxyPool) { - for (Integer key : proxyPool.keySet()) { - ProxyMessage sub = new ProxyMessage(key.intValue(), ProxyMessage.ADDPATH, false); - Logger.msg(5, "Subscribing to entity "+key); - conn.sendMessage(sub); - } - } - } - - /** - * @param sub - */ - private void sendMessage(ProxyMessage sub) { - for (ProxyServerConnection element : connections.values()) { - element.sendMessage(sub); - } - - } - - public void shutdown() { - Logger.msg("EntityProxyManager.shutdown() - flagging shutdown of server connections"); - for (ProxyServerConnection element : connections.values()) { - element.shutdown(); - } - } - - protected void processMessage(ProxyMessage thisMessage) throws InvalidDataException { - if (Logger.doLog(9)) Logger.msg(9, thisMessage.toString()); - - if (thisMessage.getPath().equals(ProxyMessage.PINGPATH)) // ping response - return; - - if (thisMessage.getSysKey() == ProxyMessage.NA) // must be domain path info - informTreeSubscribers(thisMessage.getState(), thisMessage.getPath()); - else { - // proper proxy message - Logger.msg(5, "Received proxy message: "+thisMessage.toString()); - Integer key = new Integer(thisMessage.getSysKey()); - EntityProxy relevant = proxyPool.get(key); - if (relevant == null) - Logger.warning("Received proxy message for sysKey "+thisMessage.getSysKey()+" which we don't have a proxy for."); - else - try { - relevant.notify(thisMessage); - } catch (Throwable ex) { - Logger.error("Error caught notifying proxy listener "+relevant.toString()+" of "+thisMessage.toString()); - Logger.error(ex); - } - } - } - - private void informTreeSubscribers(boolean state, String path) { - DomainPath last = new DomainPath(path); - DomainPath parent; boolean first = true; - synchronized(treeSubscribers) { - while((parent = last.getParent()) != null) { - - for (DomainPathSubscriber sub : treeSubscribers.keySet()) { - DomainPath interest = treeSubscribers.get(sub); - if (interest.equals(parent)) { - if (state == ProxyMessage.ADDED) - sub.pathAdded(last); - else if (first) - sub.pathRemoved(last); - } - } - last = parent; - first = false; - } - } - } - - public void subscribeTree(DomainPathSubscriber sub, DomainPath interest) { - synchronized(treeSubscribers) { - treeSubscribers.put(sub, interest); - } - } - - public void unsubscribeTree(DomainPathSubscriber sub) { - synchronized(treeSubscribers) { - treeSubscribers.remove(sub); - } - } - - /************************************************************************** - * - **************************************************************************/ - private EntityProxy createProxy( org.omg.CORBA.Object ior, - int systemKey, - boolean isItem ) - throws ObjectNotFoundException - { - - EntityProxy newProxy = null; - - Logger.msg(5, "EntityProxyFactory::creating proxy on entity " + systemKey); - - if( isItem ) - { - newProxy = new ItemProxy(ior, systemKey); - } - else - { - newProxy = new AgentProxy(ior, systemKey); - } - - // subscribe to changes from server - ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.ADDPATH, false); - sendMessage(sub); - reportCurrentProxies(9); - return ( newProxy ); - } - - protected void removeProxy( int systemKey ) - { - ProxyMessage sub = new ProxyMessage(systemKey, ProxyMessage.DELPATH, true); - Logger.msg(5,"EntityProxyManager.removeProxy() - Unsubscribing to proxy informer for "+systemKey); - sendMessage(sub); - } - - - /************************************************************************** - * EntityProxy getProxy( ManageableEntity, SystemKey) - * - * Called by the other GetProxy methods. Fills in either the ior or the - * SystemKey - **************************************************************************/ - private EntityProxy getProxy( org.omg.CORBA.Object ior, - int systemKey, - boolean isItem ) - throws ObjectNotFoundException - { - Integer key = new Integer(systemKey); - - synchronized(proxyPool) { - EntityProxy newProxy; - // return it if it exists - newProxy = proxyPool.get(key); - if (newProxy == null) { - // create a new one - newProxy = createProxy(ior, systemKey, isItem ); - proxyPool.put(key, newProxy); - } - return newProxy; - - } - } - - /************************************************************************** - * EntityProxy getProxy( String ) - * - * Proxy from Alias - **************************************************************************/ - public EntityProxy getProxy( Path path ) - throws ObjectNotFoundException - { - - //convert namePath to dn format - Logger.msg(8,"EntityProxyFactory::getProxy(" + path.toString() + ")"); - boolean isItem = !(path.getEntity() instanceof AgentPath); - return getProxy( Gateway.getLDAPLookup().getIOR(path), - path.getSysKey(), - isItem ); - - } - - /************************************************************************** - * void reportCurrentProxies() - * - * A utility to Dump the current proxies loaded - **************************************************************************/ - public void reportCurrentProxies(int logLevel) - { - if (!Logger.doLog(logLevel)) return; - Logger.msg(logLevel, "Current proxies: "); - try { - synchronized(proxyPool) { - Iterator i = proxyPool.keySet().iterator(); - - for( int count=0; i.hasNext(); count++ ) - { - Integer nextProxy = i.next(); - EntityProxy thisProxy = proxyPool.get(nextProxy); - if (thisProxy != null) - Logger.msg(logLevel, - "" + count + ": " - + proxyPool.get(nextProxy).getClass().getName() - + ": " + nextProxy); - } - } - } catch (ConcurrentModificationException ex) { - Logger.msg(logLevel, "Proxy cache modified. Aborting."); - } - } - - - /************************************************************************** - * Static Proxy Server methods - **************************************************************************/ - - /** - * Initialises the Proxy event UDP server listening on 'Host.Proxy.port' from c2kprops - * @param c2kProps - */ - public static void initServer() - { - Logger.msg(5, "EntityProxyFactory::initServer - Starting....."); - String port = Gateway.getProperty("ItemServer.Proxy.port"); - serverName = Gateway.getProperty("ItemServer.name"); - if (port == null) { - Logger.error("ItemServer.Proxy.port not defined in connect file. Remote proxies will not be informed of entity changes."); - return; - } - - // set up the proxy server - try { - int portNo = Integer.parseInt(port); - Logger.msg(5, "EntityProxyFactory::initServer - Initialising proxy informer on port "+port); - proxyServer = new SimpleTCPIPServer(portNo, ProxyClientConnection.class, 200); - proxyServer.startListening(); - } catch (Exception ex) { - Logger.error("Error setting up Proxy Server. Remote proxies will not be informed of entity changes."); - Logger.error(ex); - } - } - - public static void sendProxyEvent(ProxyMessage message) { - if (proxyServer != null && message.getPath() != null) - synchronized(proxyClients) { - for (ProxyClientConnection client : proxyClients) { - client.sendMessage(message); - } - } - } - - public static void reportConnections(int logLevel) { - synchronized(proxyClients) { - Logger.msg(logLevel, "Currently connected proxy clients:"); - for (ProxyClientConnection client : proxyClients) { - Logger.msg(logLevel, " "+client); - } - } - } - - public static void shutdownServer() { - if (proxyServer != null) { - Logger.msg(1, "EntityProxyManager: Closing Server."); - proxyServer.stopListening(); - } - } - - public static void registerProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.add(client); - } - } - - public static void unRegisterProxyClient(ProxyClientConnection client) { - synchronized(proxyClients) { - proxyClients.remove(client); - } - } -} - diff --git a/source/com/c2kernel/entity/proxy/EntityProxyObserver.java b/source/com/c2kernel/entity/proxy/EntityProxyObserver.java deleted file mode 100644 index 3ddb99c..0000000 --- a/source/com/c2kernel/entity/proxy/EntityProxyObserver.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.c2kernel.entity.proxy; - -import com.c2kernel.entity.C2KLocalObject; - - - -public interface EntityProxyObserver -{ - /************************************************************************** - * Subscribed items are broken apart and fed one by one to these methods. - * Replacement after an event is done by feeding the new memberbase with the same id. - * ID could be an XPath? - **************************************************************************/ - public void add(V contents); - - /************************************************************************** - * the 'type' parameter should be an indication of the type of object - * supplied so that the subscriber can associate the call back with - * one of its subscriptions. If we go with an Xpath subscription form, - * then the id will probably be sufficient. - * Should be comparable (substring whatever) with the parameter given to - * the subscribe method of ItemProxy. - **************************************************************************/ - public void remove(String id); - - public void control(String control, String msg); -} diff --git a/source/com/c2kernel/entity/proxy/ItemProxy.java b/source/com/c2kernel/entity/proxy/ItemProxy.java deleted file mode 100644 index 658e0c8..0000000 --- a/source/com/c2kernel/entity/proxy/ItemProxy.java +++ /dev/null @@ -1,213 +0,0 @@ -/************************************************************************** - * ItemProxy.java - * - * $Revision: 1.25 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.util.ArrayList; - -import com.c2kernel.common.AccessRightsException; -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.common.InvalidTransitionException; -import com.c2kernel.common.ObjectAlreadyExistsException; -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.common.PersistencyException; -import com.c2kernel.entity.Item; -import com.c2kernel.entity.ItemHelper; -import com.c2kernel.entity.ManageableEntity; -import com.c2kernel.entity.agent.Job; -import com.c2kernel.entity.agent.JobArrayList; -import com.c2kernel.lifecycle.instance.stateMachine.Transitions; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.Logger; - -/****************************************************************************** - * It is a wrapper for the connection and communication with Item - * It caches data loaded from the Item to reduce communication - * - * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ - * @author $Author: abranson $ - ******************************************************************************/ -public class ItemProxy extends EntityProxy -{ - - /************************************************************************** - * - **************************************************************************/ - protected ItemProxy( org.omg.CORBA.Object ior, - int systemKey) - throws ObjectNotFoundException - { - super(ior, systemKey); - - } - - @Override - public ManageableEntity narrow() throws ObjectNotFoundException - { - try { - return ItemHelper.narrow(mIOR); - } catch (org.omg.CORBA.BAD_PARAM ex) { } - throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); - } - /************************************************************************** - * - * - **************************************************************************/ - public void initialise( int agentId, - String itemProps, - String workflow ) - throws AccessRightsException, - InvalidDataException, - PersistencyException, - ObjectNotFoundException - { - Logger.msg(7, "ItemProxy::initialise - started"); - - ((Item)getEntity()).initialise( agentId, itemProps, workflow ); - } - - public void setProperty(AgentProxy agent, String name, String value) - throws AccessRightsException, - PersistencyException - { - String[] params = new String[2]; - params[0] = name; - params[1] = value; - try { - agent.execute(this, "WriteProperty", params); - } catch (AccessRightsException e) { - throw (e); - } catch (PersistencyException e) { - throw (e); - } catch (Exception e) { - Logger.error(e); - throw new PersistencyException("Could not store property"); - } - } - /************************************************************************** - * - **************************************************************************/ - protected void requestAction( Job thisJob ) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - String outcome = thisJob.getOutcomeString(); - // check fields that should have been filled in - if (outcome==null) - if (thisJob.isOutcomeUsed()) - throw new InvalidDataException("Outcome is required.", ""); - else - outcome=""; - - if (thisJob.getAgentId() == -1) - throw new InvalidDataException("No Agent specified.", ""); - - Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); - requestAction (thisJob.getAgentId(), thisJob.getStepPath(), - thisJob.getPossibleTransition(), outcome); - } - - //requestData is xmlString - public void requestAction( int agentId, - String stepPath, - int transitionID, - String requestData - ) - throws AccessRightsException, - InvalidTransitionException, - ObjectNotFoundException, - InvalidDataException, - PersistencyException, - ObjectAlreadyExistsException - { - ((Item)getEntity()).requestAction( agentId, - stepPath, - transitionID, - requestData ); - } - - /************************************************************************** - * - **************************************************************************/ - public String queryLifeCycle( int agentId, - boolean filter - ) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return ((Item)getEntity()).queryLifeCycle( agentId, - filter ); - } - - - /************************************************************************** - * - **************************************************************************/ - private ArrayList getJobList(int agentId, boolean filter) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - JobArrayList thisJobList; - try { - String jobs = queryLifeCycle(agentId, filter); - thisJobList = (JobArrayList)CastorXMLUtility.unmarshall(jobs); - } - catch (Exception e) { - Logger.error(e); - throw new PersistencyException("Exception::ItemProxy::getJobList() - Cannot unmarshall the jobs", null); - } - return thisJobList.list; - } - - public ArrayList getJobList(AgentProxy agent) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return getJobList(agent.getSystemKey()); - } - - private ArrayList getJobList(int agentId) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException - { - return getJobList(agentId, true); - } - - private Job getJobByName(String actName, int agentId) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException { - - ArrayList jobList = getJobList(agentId); - for (Job job : jobList) { - int transition = job.getPossibleTransition(); - if (job.getStepName().equals(actName)) - if (transition == Transitions.COMPLETE || transition == Transitions.DONE) - return job; - } - return null; - - } - - public Job getJobByName(String actName, AgentProxy agent) - throws AccessRightsException, - ObjectNotFoundException, - PersistencyException { - return getJobByName(actName, agent.getSystemKey()); - } -} diff --git a/source/com/c2kernel/entity/proxy/MemberSubscription.java b/source/com/c2kernel/entity/proxy/MemberSubscription.java deleted file mode 100644 index 157297f..0000000 --- a/source/com/c2kernel/entity/proxy/MemberSubscription.java +++ /dev/null @@ -1,118 +0,0 @@ - -package com.c2kernel.entity.proxy; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.StringTokenizer; - -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.utils.Logger; - -public class MemberSubscription implements Runnable { - public static final String ERROR = "Error"; - public static final String END = "theEND"; - - EntityProxy subject; - String interest; - // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used - WeakReference> observerReference; - ArrayList contents = new ArrayList(); - boolean preLoad; - - public MemberSubscription(EntityProxyObserver observer, String interest, boolean preLoad) { - setObserver(observer); - this.interest = interest; - this.preLoad = preLoad; - } - - @Override - public void run() { - Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest); - if (preLoad) loadChildren(); - } - - private void loadChildren() { - C newMember; - EntityProxyObserver observer = getObserver(); - if (observer == null) return; //reaped - try { - // fetch contents of path - String children = subject.queryData(interest+"/all"); - StringTokenizer tok = new StringTokenizer(children, ","); - ArrayList newContents = new ArrayList(); - while (tok.hasMoreTokens()) - newContents.add(tok.nextToken()); - - // look to see what's new - for (String newChild: newContents) { - - // load child object - try { - newMember = (C)subject.getObject(interest+"/"+newChild); - contents.remove(newChild); - observer.add(newMember); - } catch (ObjectNotFoundException ex) { - observer.control(ERROR, "Listed member "+newChild+" was not found."); - } - } - // report what's left in old contents as deleted - for (String oldChild: contents) { - observer.remove(interest+"/"+oldChild); - } - //replace contents arraylist - contents = newContents; - //report that we're done - observer.control(END, null); - } catch (Exception ex) { - observer.control(ERROR, "Query on "+interest+" failed with "+ex); - } - } - - public boolean isRelevant(String path) { - Logger.msg(7, "Checking relevance of "+path+" to "+interest); - return (path.startsWith(interest)); - } - - public void update(String path, boolean deleted) { - EntityProxyObserver observer = getObserver(); - if (observer == null) return; //reaped - Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); - if (!path.startsWith(interest)) // doesn't concern us - return; - - if (path.equals(interest)) // refresh contents - loadChildren(); - else { - String name = path.substring(interest.length()); - if (deleted) { - Logger.msg(4, "Removing "+path); - contents.remove(name); - observer.remove(name); - } - else { - try { - C newMember = (C)subject.getObject(path); - Logger.msg(4, "Adding "+path); - contents.add(name); - observer.add(newMember); - } catch (ObjectNotFoundException e) { - Logger.error("Member Subscription: could not load "+path); - Logger.error(e); - } - } - } - } - - public void setObserver(EntityProxyObserver observer) { - observerReference = new WeakReference>(observer); - } - - public void setSubject(EntityProxy subject) { - this.subject = subject; - } - - public EntityProxyObserver getObserver() { - return observerReference.get(); - } -} - diff --git a/source/com/c2kernel/entity/proxy/ProxyClientConnection.java b/source/com/c2kernel/entity/proxy/ProxyClientConnection.java deleted file mode 100644 index 9687f22..0000000 --- a/source/com/c2kernel/entity/proxy/ProxyClientConnection.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.c2kernel.entity.proxy; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.InterruptedIOException; -import java.io.PrintWriter; -import java.net.Socket; -import java.net.SocketException; -import java.util.ArrayList; -import java.util.Iterator; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.utils.Logger; -import com.c2kernel.utils.server.SocketHandler; - -/************************************************************************** - * - * $Revision: 1.18 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -public class ProxyClientConnection implements SocketHandler { - - Socket clientSocket = null; - static int clientId = -1; - int thisClientId; - ArrayList sysKeys; - PrintWriter response; - BufferedReader request; - boolean closing = false; - - public ProxyClientConnection() { - super(); - thisClientId = ++clientId; - EntityProxyManager.registerProxyClient(this); - Logger.msg(1, "Proxy Client Connection Handler "+thisClientId+" ready."); - } - - - @Override - public String getName() { - return "Proxy Client Connection"; - } - - @Override - public boolean isBusy() { - return clientSocket != null; - } - - @Override - public synchronized void setSocket(Socket newSocket) { - try { - Logger.msg(1, "Proxy Client Connection "+thisClientId+" connect from "+newSocket.getInetAddress()+":"+newSocket.getPort()); - newSocket.setSoTimeout(500); - clientSocket = newSocket; - response = new PrintWriter(clientSocket.getOutputStream(), true); - sysKeys = new ArrayList(); - } catch (SocketException ex) { - Logger.msg("Could not set socket timeout:"); - Logger.error(ex); - closeSocket(); - } catch (IOException ex) { - Logger.msg("Could not setup output stream:"); - Logger.error(ex); - closeSocket(); - } - } - - /** - * Main loop. Reads proxy commands from the client and acts on them. - */ - @Override - public void run() { - Thread.currentThread().setName("Proxy Client Connection: "+clientSocket.getInetAddress()); - Logger.msg(7, "ProxyClientConnection "+thisClientId+" - Setting up proxy client connection with "+clientSocket.getInetAddress()); - try { - request = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); - String input = null; - ProxyMessage thisMessage; - while (clientSocket != null) { - try { - input = request.readLine(); - Logger.msg(9, "ProxyClientConnection "+thisClientId+" - received "+input); - thisMessage = new ProxyMessage(input); - processMessage(thisMessage); - } catch (InterruptedIOException ex) { //timeout - } catch (InvalidDataException ex) { // invalid proxy message - Logger.error("ProxyClientConnection "+thisClientId+" - Invalid proxy message: "+input); - } - - } - } catch (IOException ex) { - if (!closing) - Logger.error("ProxyClientConnection "+thisClientId+" - Error reading from socket."); - } - closeSocket(); - Logger.msg(1, "ProxyClientConnection "+thisClientId+" closed."); - } - - private void processMessage(ProxyMessage message) throws InvalidDataException { - - // proxy disconnection - if (message.getPath().equals(ProxyMessage.BYEPATH)) { - Logger.msg(7, "ProxyClientConnection "+thisClientId+" disconnecting"); - closeSocket(); - } - - // proxy checking connection - else if (message.getPath().equals(ProxyMessage.PINGPATH)) - response.println(ProxyMessage.pingMessage); - - // new subscription to entity changes - else if (message.getPath().equals(ProxyMessage.ADDPATH)) { - Logger.msg(7, "ProxyClientConnection "+thisClientId+" subscribed to "+message.getSysKey()); - synchronized (sysKeys) { - sysKeys.add(new Integer(message.getSysKey())); - } - } - - // remove of subscription to entity changes - else if (message.getPath().equals(ProxyMessage.DELPATH)) { - synchronized (sysKeys) { - sysKeys.remove(new Integer(message.getSysKey())); - } - Logger.msg(7, "ProxyClientConnection "+thisClientId+" unsubscribed from "+message.getSysKey()); - } - - else // unknown message - Logger.error("ProxyClientConnection "+thisClientId+" - Unknown message type: "+message); - - } - - public synchronized void sendMessage(ProxyMessage message) { - if (clientSocket==null) return; // idle - boolean relevant = message.getSysKey() == ProxyMessage.NA; - synchronized (sysKeys) { - for (Iterator iter = sysKeys.iterator(); iter.hasNext() && !relevant;) { - Integer thisKey = iter.next(); - if (thisKey.intValue() == message.getSysKey()) - relevant = true; - } - } - if (!relevant) return; // not for our client - - response.println(message); - } - - @Override - public void shutdown() { - if (isBusy()) { - closing = true; - Logger.msg("ProxyClientConnection "+thisClientId+" closing."); - closeSocket(); - } - } - - @Override - public String toString() { - if (clientSocket == null) return thisClientId+": idle"; - else return thisClientId+": "+clientSocket.getInetAddress(); - } - - private synchronized void closeSocket() { - if (clientSocket==null) return; - try { - request.close(); - response.close(); - clientSocket.close(); - } catch (IOException e) { - Logger.error("ProxyClientConnection "+thisClientId+" - Could not close socket."); - Logger.error(e); - } - synchronized (sysKeys) { - sysKeys = null; - } - - clientSocket = null; - - } - -} diff --git a/source/com/c2kernel/entity/proxy/ProxyMessage.java b/source/com/c2kernel/entity/proxy/ProxyMessage.java deleted file mode 100644 index 62866eb..0000000 --- a/source/com/c2kernel/entity/proxy/ProxyMessage.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.c2kernel.entity.proxy; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.util.StringTokenizer; - -import com.c2kernel.common.InvalidDataException; - - -/************************************************************************** - * - * $Revision: 1.11 $ - * $Date: 2005/05/10 11:40:09 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -public class ProxyMessage { - - // special server message paths - public static final String BYEPATH = "bye"; - public static final String ADDPATH = "add"; - public static final String DELPATH = "del"; - public static final String PINGPATH = "ping"; - public static final boolean ADDED = false; - public static final boolean DELETED = true; - public static final int NA = -1; - - static ProxyMessage byeMessage = new ProxyMessage(NA, BYEPATH, ADDED); - static ProxyMessage pingMessage = new ProxyMessage(NA, PINGPATH, ADDED); - - private int sysKey = NA; - private String path = ""; - private String server = null; - private boolean state = ADDED; - - public ProxyMessage() { - super(); - } - public ProxyMessage(int sysKey, String path, boolean state) { - this(); - setSysKey(sysKey); - setPath(path); - setState(state); - } - - public ProxyMessage(String line) throws InvalidDataException, IOException { - if (line == null) - throw new IOException("Null proxy message"); - StringTokenizer tok = new StringTokenizer(line,":"); - if (tok.countTokens()!=2) - throw new InvalidDataException("String '"+line+"' does not constitute a valid proxy message.", ""); - sysKey = Integer.parseInt(tok.nextToken()); - path = tok.nextToken(); - if (path.startsWith("-")) { - state = DELETED; - path = path.substring(1); - } - } - - public ProxyMessage(DatagramPacket packet) throws InvalidDataException, IOException { - this(new String(packet.getData())); - } - - public int getSysKey() { - return sysKey; - } - - public void setSysKey(int sysKey) { - this.sysKey = sysKey; - } - - public String getPath() { - return path; - } - - public void setPath(String newPath) { - this.path = newPath; - } - - public boolean getState() { - return state; - } - - public void setState(boolean state) { - this.state = state; - } - - @Override - public String toString() { - return sysKey+":"+(state?"-":"")+path; - } - - public String getServer() { - return server; - } - - public void setServer(String server) { - this.server = server; - } -} diff --git a/source/com/c2kernel/entity/proxy/ProxyServerConnection.java b/source/com/c2kernel/entity/proxy/ProxyServerConnection.java deleted file mode 100644 index 6807953..0000000 --- a/source/com/c2kernel/entity/proxy/ProxyServerConnection.java +++ /dev/null @@ -1,134 +0,0 @@ -/************************************************************************** - * EntityProxyFactory.java - * - * $Revision: 1.3 $ - * $Date: 2005/05/25 12:11:44 $ - * - * Copyright (C) 2001 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -package com.c2kernel.entity.proxy; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.InterruptedIOException; -import java.io.PrintWriter; -import java.net.Socket; - -import com.c2kernel.common.InvalidDataException; -import com.c2kernel.utils.Logger; - - -public class ProxyServerConnection extends Thread -{ - - public boolean serverIsActive = true; - // proxy client details - String serverName; - int serverPort; - Socket serverConnection; - EntityProxyManager manager; - // for talking to the proxy server - PrintWriter serverStream; - boolean listening = false; - static boolean isServer = false; - - /** - * Create an entity proxy manager to listen for proxy events and reap unused proxies - */ - public ProxyServerConnection(String host, int port, EntityProxyManager manager) - { - Logger.msg(5, "ProxyServerConnection - Initialising connection to "+host+":"+port); - serverName = host; - serverPort = port; - this.manager = manager; - listening = true; - start(); - } - - @Override - public void run() { - Thread.currentThread().setName("Proxy Client Connection Listener to "+serverName+":"+serverPort); - while (listening) { - try { - if (serverConnection == null) connect(); - if (serverConnection != null) { - BufferedReader request = new BufferedReader(new InputStreamReader(serverConnection.getInputStream())); - String input = null; - ProxyMessage thisMessage; - while (listening && serverConnection != null) { - try { - input = request.readLine(); - thisMessage = new ProxyMessage(input); - thisMessage.setServer(serverName); - manager.processMessage(thisMessage); - } catch (InterruptedIOException ex) { // timeout - send a ping - sendMessage(ProxyMessage.pingMessage); - } catch (InvalidDataException ex) { // invalid proxy message - if (input != null) - Logger.error("EntityProxyManager - Invalid proxy message: "+input); - } - } - } - } catch (IOException ex) { - Logger.error("ProxyServerConnection - Disconnected from "+serverName+":"+serverPort); - try { - serverStream.close(); - serverConnection.close(); - } catch (IOException e1) { } - - - serverStream = null; - serverConnection = null; - } - } - - if (serverStream != null) { - try { - Logger.msg(1, "Disconnecting from proxy server on "+serverName+":"+serverPort); - serverStream.println(ProxyMessage.byeMessage.toString()); - serverStream.close(); - serverConnection.close(); - serverConnection = null; - } catch (Exception e) { - Logger.error("Error disconnecting from proxy server."); - } - } - } - - public void connect() { - Logger.msg(3, "ProxyServerConnection - connecting to proxy server on "+serverName+":"+serverPort); - try { - serverConnection = new Socket(serverName, serverPort); - serverConnection.setKeepAlive(true); - serverIsActive = true; - serverConnection.setSoTimeout(5000); - serverStream = new PrintWriter(serverConnection.getOutputStream(), true); - Logger.msg("Connected to proxy server on "+serverName+":"+serverPort); - manager.resubscribe(this); - } catch (Exception e) { - Logger.msg(3, "Could not connect to proxy server. Retrying in 5s"); - try { Thread.sleep(5000); } catch (InterruptedException ex) { } - serverStream = null; - serverConnection = null; - serverIsActive = false; - } - } - - public void shutdown() { - Logger.msg("Proxy Client: flagging shutdown."); - listening = false; - } - - /** - * @param sub - */ - public void sendMessage(ProxyMessage sub) { - if (serverStream != null) - serverStream.println(sub); - } - -} - diff --git a/source/com/c2kernel/entity/transfer/TransferItem.java b/source/com/c2kernel/entity/transfer/TransferItem.java deleted file mode 100644 index 0e3b764..0000000 --- a/source/com/c2kernel/entity/transfer/TransferItem.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.c2kernel.entity.transfer; - -import java.io.File; -import java.util.ArrayList; -import java.util.Enumeration; - -import com.c2kernel.common.ObjectNotFoundException; -import com.c2kernel.entity.C2KLocalObject; -import com.c2kernel.entity.TraceableEntity; -import com.c2kernel.lifecycle.instance.Workflow; -import com.c2kernel.lookup.DomainPath; -import com.c2kernel.lookup.EntityPath; -import com.c2kernel.lookup.Path; -import com.c2kernel.persistency.ClusterStorage; -import com.c2kernel.persistency.outcome.Outcome; -import com.c2kernel.process.Gateway; -import com.c2kernel.property.Property; -import com.c2kernel.property.PropertyArrayList; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.FileStringUtility; -import com.c2kernel.utils.Logger; - -public class TransferItem { - public ArrayList domainPaths; - public int sysKey; - static int importAgentId; - - public TransferItem() throws Exception { - try { - importAgentId = Gateway.getLDAPLookup().getRoleManager().getAgentPath("system").getSysKey(); - } catch (ObjectNotFoundException e) { - Logger.error("TransferItem - System user not found!"); - throw e; - } - } - - public TransferItem(int sysKey) throws Exception { - this.sysKey = sysKey; - domainPaths = new ArrayList(); - Property name = (Property)Gateway.getStorage().get(sysKey, ClusterStorage.PROPERTY + "/Name", null); - Enumeration paths = Gateway.getLDAPLookup().search(new DomainPath(), name.getValue()); - while (paths.hasMoreElements()) { - DomainPath thisPath = (DomainPath)paths.nextElement(); - domainPaths.add(thisPath.toString()); - } - } - - public void exportItem(File dir, String path) throws Exception { - Logger.msg("Path " + path + " in " + sysKey); - String[] contents = Gateway.getStorage().getClusterContents(sysKey, path); - if (contents.length > 0) { - FileStringUtility.createNewDir(dir.getCanonicalPath()); - for (String content : contents) { - exportItem(new File(dir, content), path + "/" + content); - } - } else { //no children, try to dump object - try { - C2KLocalObject obj = Gateway.getStorage().get(sysKey, path, null); - Logger.msg("Dumping object " + path + " in " + sysKey); - File dumpPath = new File(dir.getCanonicalPath() + ".xml"); - FileStringUtility.string2File(dumpPath, CastorXMLUtility.marshall(obj)); - return; - } catch (ObjectNotFoundException ex) { - } // not an object - } - } - - public void importItem(File dir) throws Exception { - // check if already exists - try { - Property name = (Property)Gateway.getStorage().get(sysKey, ClusterStorage.PROPERTY + "/Name", null); - throw new Exception("Syskey " + sysKey + " already in use as " + name.getValue()); - } catch (Exception ex) { - } - - // retrieve objects - ArrayList objectFiles = FileStringUtility.listDir(dir.getCanonicalPath(), false, true); - ArrayList objects = new ArrayList(); - for (String element : objectFiles) { - String xmlFile = FileStringUtility.file2String(element); - C2KLocalObject newObj; - String choppedPath = element.substring(dir.getCanonicalPath().length()+1, element.length()-4); - Logger.msg(choppedPath); - if (choppedPath.startsWith(ClusterStorage.OUTCOME)) - newObj = new Outcome(choppedPath, xmlFile); - else - newObj = (C2KLocalObject)CastorXMLUtility.unmarshall(xmlFile); - - objects.add(newObj); - } - - // create item - EntityPath entityPath = new EntityPath(sysKey); - TraceableEntity newItem = (TraceableEntity)Gateway.getCorbaServer().createEntity(entityPath); - Gateway.getLDAPLookup().add(entityPath); - - PropertyArrayList props = new PropertyArrayList(); - Workflow wf = null; - // put objects - for (C2KLocalObject obj : objects) { - if (obj instanceof Property) - props.list.add((Property)obj); - else if (obj instanceof Workflow) - wf = (Workflow)obj; - } - - if (wf == null) - throw new Exception("No workflow found in import for "+sysKey); - - // init item - newItem.initialise(importAgentId, CastorXMLUtility.marshall(props), CastorXMLUtility.marshall(wf.search("workflow/domain"))); - - // store objects - importByType(ClusterStorage.COLLECTION, objects); - importByType(ClusterStorage.HISTORY, objects); - importByType(ClusterStorage.OUTCOME, objects); - importByType(ClusterStorage.VIEWPOINT, objects); - Gateway.getStorage().commit(this); - // add domPaths - for (String element : domainPaths) { - DomainPath newPath = new DomainPath(element, entityPath); - Gateway.getLDAPLookup().add(newPath); - } - } - - private void importByType(String type, ArrayList objects) throws Exception { - for (C2KLocalObject element : objects) { - if (element.getClusterType().equals(type)) - Gateway.getStorage().put(sysKey, element, this); - } - - } -} \ No newline at end of file diff --git a/source/com/c2kernel/entity/transfer/TransferSet.java b/source/com/c2kernel/entity/transfer/TransferSet.java deleted file mode 100644 index 71a593a..0000000 --- a/source/com/c2kernel/entity/transfer/TransferSet.java +++ /dev/null @@ -1,103 +0,0 @@ -package com.c2kernel.entity.transfer; - -import java.io.File; -import java.util.ArrayList; - -import com.c2kernel.lookup.EntityPath; -import com.c2kernel.lookup.NextKeyManager; -import com.c2kernel.process.Gateway; -import com.c2kernel.utils.CastorXMLUtility; -import com.c2kernel.utils.FileStringUtility; -import com.c2kernel.utils.Logger; - -/************************************************************************** - * - * $Revision: 1.5 $ - * $Date: 2005/04/26 06:48:13 $ - * - * Copyright (C) 2003 CERN - European Organization for Nuclear Research - * All rights reserved. - **************************************************************************/ - -public class TransferSet { - - public ArrayList items; - - public TransferSet() { - } - - public TransferSet(int[] sysKeys) { - items = new ArrayList(); - for (int sysKey : sysKeys) { - try { - items.add(new TransferItem(sysKey)); - } catch (Exception ex) { - Logger.error("Could not add item "+sysKey); - Logger.error(ex); - } - } - } - - public void exportPackage(File dir) throws Exception { - if (items==null || items.size() == 0) - throw new Exception("Nothing to dump"); - FileStringUtility.createNewDir(dir.getAbsolutePath()); - for (TransferItem element : items) { - try { - element.exportItem(new File(dir, String.valueOf(element.sysKey)), "/"); - } catch (Exception ex) { - Logger.error("Error dumping item "+element.sysKey); - Logger.error(ex); - } - } - - try { - String self = CastorXMLUtility.marshall(this); - FileStringUtility.string2File(new File(dir, "transferSet.xml"), self); - } catch (Exception ex) { - Logger.error("Error writing header file"); - Logger.error(ex); - } - } - - public void importPackage(File rootDir) { - for (TransferItem element : items) { - Logger.msg(5, "Importing "+element.sysKey); - try { - element.importItem(new File(rootDir, String.valueOf(element.sysKey))); - } catch (Exception ex) { - Logger.error("Import of item "+element.sysKey+" failed. Rolling back"); - Logger.error(ex); - Gateway.getStorage().abort(element); - } - } - checkLastKey(); - } - - private void checkLastKey() - { - // find highest key in out import set - int packageLastKey = 0; - for (TransferItem element : items) { - if (element.sysKey > packageLastKey) - packageLastKey = element.sysKey; - } - - try - { // find the current last key - NextKeyManager nextKeyMan = Gateway.getLDAPLookup().getNextKeyManager(); - EntityPath lastKey = nextKeyMan.getLastEntityPath(); - Logger.msg(1, "Last key imported was "+packageLastKey+". LDAP lastkey was "+lastKey.getSysKey()); - - - if (packageLastKey > lastKey.getSysKey()) { // set new last - Logger.msg(1, "Updating lastKey to "+packageLastKey); - nextKeyMan.writeLastEntityKey(packageLastKey); - } - } - catch (Exception ex) - { - Logger.error("Exception::LoadKeys::processFile() " + ex); - } - } -} -- cgit v1.2.3