From 254ee6f47eebfc00462c10756a92066e82cc1a96 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Tue, 21 Jun 2011 15:46:02 +0200 Subject: Initial commit --- source/com/c2kernel/entity/proxy/EntityProxy.java | 246 ++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100755 source/com/c2kernel/entity/proxy/EntityProxy.java (limited to 'source/com/c2kernel/entity/proxy/EntityProxy.java') diff --git a/source/com/c2kernel/entity/proxy/EntityProxy.java b/source/com/c2kernel/entity/proxy/EntityProxy.java new file mode 100755 index 0000000..a5b6822 --- /dev/null +++ b/source/com/c2kernel/entity/proxy/EntityProxy.java @@ -0,0 +1,246 @@ +/************************************************************************** + * EntityProxy.java + * + * $Revision: 1.35 $ + * $Date: 2005/05/10 11:40:09 $ + * + * Copyright (C) 2001 CERN - European Organization for Nuclear Research + * All rights reserved. + **************************************************************************/ + +package com.c2kernel.entity.proxy; + +import java.util.*; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.*; +import com.c2kernel.persistency.*; +import com.c2kernel.process.Gateway; +import com.c2kernel.property.Property; +import com.c2kernel.utils.*; + + +/****************************************************************************** +* It is a wrapper for the connection and communication with Entities. +* It can cache data loaded from the Entity to reduce communication with it. +* This cache is syncronised with corresponding Entity through an event mechanism. +* +* @version $Revision: 1.35 $ $Date: 2005/05/10 11:40:09 $ +* @author $Author: abranson $ +******************************************************************************/ + +abstract public class EntityProxy implements ManageableEntity +{ + + protected ManageableEntity mEntity = null; + protected org.omg.CORBA.Object mIOR; + protected int mSystemKey; + private HashMap mSubscriptions; + + /************************************************************************** + * + **************************************************************************/ + protected EntityProxy( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + Logger.msg(8,"EntityProxy::EntityProxy() - Initialising '" +systemKey+ "' entity"); + + initialise( ior, systemKey); + } + + /************************************************************************** + * + **************************************************************************/ + private void initialise( org.omg.CORBA.Object ior, + int systemKey) + throws ObjectNotFoundException + { + Logger.msg(8, "EntityProxy::initialise() - Initialising '" +systemKey+ "' entity"); + + mIOR = ior; + mSystemKey = systemKey; + mSubscriptions = new HashMap(); + } + + + /************************************************************************** + * + **************************************************************************/ + public ManageableEntity getEntity() throws ObjectNotFoundException + { + if (mEntity == null) { + mEntity = narrow(); + } + return mEntity; + } + + abstract public ManageableEntity narrow() throws ObjectNotFoundException; + + /************************************************************************** + * + **************************************************************************/ + //check who is using.. and if toString() is sufficient + public int getSystemKey() + { + return mSystemKey; + } + + + /************************************************************************** + * + **************************************************************************/ + public String queryData( String path ) + throws ObjectNotFoundException + { + + try { + Logger.msg(7, "EntityProxy.queryData() - "+mSystemKey+"/"+path); + if (path.endsWith("all")) { + Logger.msg(7, "EntityProxy.queryData() - listing contents"); + String[] result = Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length()-3)); + StringBuffer retString = new StringBuffer(); + for (int i = 0; i < result.length; i++) { + retString.append(result[i]); + if (i"+e.getMessage()+""; + } + } + + public String[] getContents( String path ) throws ObjectNotFoundException { + try { + return Gateway.getStorage().getClusterContents(mSystemKey, path.substring(0, path.length())); + } catch (ClusterStorageException e) { + throw new ObjectNotFoundException(e.toString()); + } + } + + + /************************************************************************** + * + **************************************************************************/ + public C2KLocalObject getObject( String xpath ) + throws ObjectNotFoundException + { + // load from storage, falling back to proxy loader if not found in others + try + { + return Gateway.getStorage().get( mSystemKey, xpath , this); + } + catch( ClusterStorageException ex ) + { + Logger.msg(4, "Exception loading object :"+mSystemKey+"/"+xpath); + throw new ObjectNotFoundException( ex.toString() ); + } + } + + + + public String getProperty( String name ) + throws ObjectNotFoundException + { + Logger.msg(5, "Get property "+name+" from syskey/"+mSystemKey); + Property prop = (Property)getObject("Property/"+name); + try + { + return prop.getValue(); + } + catch (NullPointerException ex) + { + throw new ObjectNotFoundException(); + } + } + + public String getName() + { + try { + return getProperty("Name"); + } catch (ObjectNotFoundException ex) { + return null; + } + } + + + /************************************************************************** + * Subscription methods + **************************************************************************/ + + public void subscribe (EntityProxyObserver observer, + String interest, + boolean preload) + { + MemberSubscription newSub = new MemberSubscription(this, interest, observer, preload); + synchronized (this){ + mSubscriptions.put( newSub, observer ); + } + new Thread(newSub).start(); + Logger.msg(7, "Subscribed "+observer.getClass().getName()+" for "+interest); + } + + + public void unsubscribe(EntityProxyObserver observer) + { + synchronized (this){ + for (Iterator e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription thisSub = (MemberSubscription)e.next(); + if (mSubscriptions.get( thisSub ) == observer) { + e.remove(); + Logger.msg(7, "Unsubscribed "+observer.getClass().getName()); + } + } + } + } + + public void dumpSubscriptions(int logLevel) { + if (mSubscriptions.size() == 0) return; + Logger.msg(logLevel, "Subscriptions to proxy "+mSystemKey+":"); + synchronized(this) { + for (Iterator iter = mSubscriptions.keySet().iterator(); iter.hasNext();) { + MemberSubscription element = (MemberSubscription)iter.next(); + EntityProxyObserver obs = element.getObserver(); + if (obs != null) + Logger.msg(logLevel, " "+element.getObserver().getClass().getName()+" subscribed to "+element.interest); + else + Logger.msg(logLevel, " Phantom subscription to "+element.interest); + } + } + } + + public void notify(ProxyMessage message) { + Logger.msg(4, "EntityProxy.notify() - Received change notification for "+message.getPath()+" on "+mSystemKey); + synchronized (this){ + if (!message.getServer().equals(EntityProxyManager.serverName)) + Gateway.getStorage().clearCache(mSystemKey, message.getPath()); + for (Iterator e = mSubscriptions.keySet().iterator(); e.hasNext();) { + MemberSubscription newSub = (MemberSubscription)e.next(); + if (newSub.getObserver() == null) { // phantom + Logger.msg(4, "Removing phantom subscription to "+newSub.interest); + e.remove(); + } + else + newSub.update(message.getPath(), message.getState()); + } + } + } + + /** + * If this is reaped, clear out the cache for it too. + */ + protected void finalize() throws Throwable { + Logger.msg(7, "Proxy "+mSystemKey+" reaped"); + Gateway.getStorage().clearCache(mSystemKey, null); + Gateway.getProxyManager().removeProxy(mSystemKey); + super.finalize(); + } + +} -- cgit v1.2.3