/************************************************************************** * ItemProxy.java * * $Revision: 1.25 $ * $Date: 2005/05/10 11:40:09 $ * * Copyright (C) 2001 CERN - European Organization for Nuclear Research * All rights reserved. **************************************************************************/ package com.c2kernel.entity.proxy; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import org.exolab.castor.mapping.MappingException; import org.exolab.castor.xml.MarshalException; import org.exolab.castor.xml.ValidationException; import com.c2kernel.collection.Collection; import com.c2kernel.collection.CollectionArrayList; import com.c2kernel.common.AccessRightsException; import com.c2kernel.common.InvalidDataException; import com.c2kernel.common.InvalidTransitionException; import com.c2kernel.common.ObjectAlreadyExistsException; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.common.PersistencyException; import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.entity.Item; import com.c2kernel.entity.ItemHelper; import com.c2kernel.entity.agent.Job; import com.c2kernel.entity.agent.JobArrayList; import com.c2kernel.lifecycle.instance.CompositeActivity; import com.c2kernel.lifecycle.instance.Workflow; import com.c2kernel.lookup.InvalidItemPathException; import com.c2kernel.lookup.ItemPath; import com.c2kernel.lookup.Path; import com.c2kernel.persistency.ClusterStorage; import com.c2kernel.persistency.ClusterStorageException; import com.c2kernel.persistency.outcome.Viewpoint; import com.c2kernel.process.Gateway; import com.c2kernel.property.Property; import com.c2kernel.property.PropertyArrayList; import com.c2kernel.utils.CastorXMLUtility; import com.c2kernel.utils.Logger; /****************************************************************************** * It is a wrapper for the connection and communication with Item * It caches data loaded from the Item to reduce communication * * @version $Revision: 1.25 $ $Date: 2005/05/10 11:40:09 $ * @author $Author: abranson $ ******************************************************************************/ public class ItemProxy { protected Item mItem = null; protected org.omg.CORBA.Object mIOR; protected int mSystemKey; protected Path mPath; private final HashMap, ProxyObserver> mSubscriptions; /************************************************************************** * **************************************************************************/ protected ItemProxy( org.omg.CORBA.Object ior, int systemKey) throws ObjectNotFoundException { Logger.msg(8, "ItemProxy::initialise() - Initialising entity " +systemKey); mIOR = ior; mSystemKey = systemKey; mSubscriptions = new HashMap, ProxyObserver>(); try { mPath = new ItemPath(systemKey); } catch (InvalidItemPathException e) { throw new ObjectNotFoundException(); } } public int getSystemKey() { return mSystemKey; } public Path getPath() { return mPath; } protected Item getItem() throws ObjectNotFoundException { if (mItem == null) mItem = narrow(); return mItem; } public Item narrow() throws ObjectNotFoundException { try { return ItemHelper.narrow(mIOR); } catch (org.omg.CORBA.BAD_PARAM ex) { } throw new ObjectNotFoundException("CORBA Object was not an Item, or the server is down."); } /** * @throws MappingException * @throws IOException * @throws ValidationException * @throws MarshalException ************************************************************************ * * **************************************************************************/ public void initialise( int agentId, PropertyArrayList itemProps, CompositeActivity workflow, CollectionArrayList colls ) throws AccessRightsException, InvalidDataException, PersistencyException, ObjectNotFoundException, MarshalException, ValidationException, IOException, MappingException { Logger.msg(7, "ItemProxy::initialise - started"); CastorXMLUtility xml = Gateway.getMarshaller(); if (itemProps == null) throw new InvalidDataException("No initial properties supplied"); String propString = xml.marshall(itemProps); String wfString = ""; if (workflow != null) wfString = xml.marshall(workflow); String collString = ""; if (colls != null) collString = xml.marshall(colls); getItem().initialise( agentId, propString, wfString, collString); } public void setProperty(AgentProxy agent, String name, String value) throws AccessRightsException, PersistencyException, InvalidDataException { String[] params = new String[2]; params[0] = name; params[1] = value; try { agent.execute(this, "WriteProperty", params); } catch (AccessRightsException e) { throw (e); } catch (PersistencyException e) { throw (e); } catch (InvalidDataException e) { throw (e); } catch (Exception e) { Logger.error(e); throw new PersistencyException("Could not store property", ""); } } /************************************************************************** * **************************************************************************/ public String requestAction( Job thisJob ) throws AccessRightsException, InvalidTransitionException, ObjectNotFoundException, InvalidDataException, PersistencyException, ObjectAlreadyExistsException { String outcome = thisJob.getOutcomeString(); // check fields that should have been filled in if (outcome==null) if (thisJob.isOutcomeRequired()) throw new InvalidDataException("Outcome is required.", ""); else outcome=""; if (thisJob.getAgentId() == -1) throw new InvalidDataException("No Agent specified.", ""); Logger.msg(7, "ItemProxy - executing "+thisJob.getStepPath()+" for "+thisJob.getAgentName()); return getItem().requestAction (thisJob.getAgentId(), thisJob.getStepPath(), thisJob.getTransition().getId(), outcome); } /************************************************************************** * **************************************************************************/ private ArrayList getJobList(int agentId, boolean filter) throws AccessRightsException, ObjectNotFoundException, PersistencyException { JobArrayList thisJobList; try { String jobs = getItem().queryLifeCycle(agentId, filter); thisJobList = (JobArrayList)Gateway.getMarshaller().unmarshall(jobs); } catch (Exception e) { Logger.error(e); throw new PersistencyException("Exception::ItemProxy::getJobList() - Cannot unmarshall the jobs", null); } return thisJobList.list; } public ArrayList getJobList(AgentProxy agent) throws AccessRightsException, ObjectNotFoundException, PersistencyException { return getJobList(agent.getSystemKey()); } private ArrayList getJobList(int agentId) throws AccessRightsException, ObjectNotFoundException, PersistencyException { return getJobList(agentId, true); } private Job getJobByName(String actName, int agentId) throws AccessRightsException, ObjectNotFoundException, PersistencyException { ArrayList jobList = getJobList(agentId); for (Job job : jobList) { if (job.getStepName().equals(actName) && job.hasOutcome()) return job; } return null; } public Collection getCollection(String collName) throws ObjectNotFoundException { return (Collection)getObject(ClusterStorage.COLLECTION+"/"+collName); } public Workflow getWorkflow() throws ObjectNotFoundException { return (Workflow)getObject(ClusterStorage.LIFECYCLE+"/workflow"); } public Viewpoint getViewpoint(String schemaName, String viewName) throws ObjectNotFoundException { return (Viewpoint)getObject(ClusterStorage.VIEWPOINT+"/"+schemaName+"/"+viewName); } public Job getJobByName(String actName, AgentProxy agent) throws AccessRightsException, ObjectNotFoundException, PersistencyException { return getJobByName(actName, agent.getSystemKey()); } /** * If this is reaped, clear out the cache for it too. */ @Override protected void finalize() throws Throwable { Logger.msg(7, "Proxy "+mSystemKey+" reaped"); Gateway.getStorage().clearCache(mSystemKey, null); Gateway.getProxyManager().removeProxy(mSystemKey); super.finalize(); } /************************************************************************** * **************************************************************************/ public String queryData( String path ) throws ObjectNotFoundException { try { Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); if (path.endsWith("all")) { Logger.msg(7, "EntityProxy.queryData() - listing contents"); String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); StringBuffer retString = new StringBuffer(); for (int i = 0; i < result.length; i++) { retString.append(result[i]); if (i"+e.getMessage()+""; } } public String[] getContents( String path ) throws ObjectNotFoundException { try { return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); } catch (ClusterStorageException e) { throw new ObjectNotFoundException(e.toString()); } } /************************************************************************** * **************************************************************************/ public C2KLocalObject getObject( String xpath ) throws ObjectNotFoundException { // load from storage, falling back to proxy loader if not found in others try { return Gateway.getStorage().get( mSystemKey, xpath , null); } catch( ClusterStorageException ex ) { Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); throw new ObjectNotFoundException( ex.toString() ); } } public String getProperty( String name ) throws ObjectNotFoundException { Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); Property prop = (Property)getObject("Property/"+name); try { return prop.getValue(); } catch (NullPointerException ex) { throw new ObjectNotFoundException(); } } public String getName() { try { return getProperty("Name"); } catch (ObjectNotFoundException ex) { return null; } } /************************************************************************** * Subscription methods **************************************************************************/ public void subscribe(MemberSubscription newSub) { newSub.setSubject(this); synchronized (this){ mSubscriptions.put( newSub, newSub.getObserver() ); } new Thread(newSub).start(); Logger.msg(7, "Subscribed "+newSub.getObserver().getClass().getName()+" for "+newSub.interest); } public void unsubscribe(ProxyObserver observer) { synchronized (this){ for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { MemberSubscription thisSub = e.next(); if (mSubscriptions.get( thisSub ) == observer) { e.remove(); Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); } } } } public void dumpSubscriptions(int logLevel) { if (mSubscriptions.size() == 0) return; Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); synchronized(this) { for (MemberSubscription element : mSubscriptions.keySet()) { ProxyObserver obs = element.getObserver(); if (obs != null) Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); else Logger.msg(logLevel, " Phantom subscription to "+element.interest); } } } public void notify(ProxyMessage message) { Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); synchronized (this){ if (Gateway.getProxyServer()== null || !message.getServer().equals(Gateway.getProxyServer().getServerName())) Gateway.getStorage().clearCache(mSystemKey, message.getPath()); for (Iterator> e = mSubscriptions.keySet().iterator(); e.hasNext();) { MemberSubscription newSub = e.next(); if (newSub.getObserver() == null) { // phantom Logger.msg(4, "Removing phantom subscription to "+newSub.interest); e.remove(); } else newSub.update(message.getPath(), message.getState()); } } } }