package com.c2kernel.entity.proxy; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Iterator; import java.util.StringTokenizer; import com.c2kernel.common.ObjectNotFoundException; import com.c2kernel.entity.C2KLocalObject; import com.c2kernel.utils.Logger; public class MemberSubscription implements Runnable { EntityProxy subject; String interest; // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used WeakReference observerReference; ArrayList contents = new ArrayList(); boolean preLoad; public MemberSubscription(EntityProxy subject, String interest, EntityProxyObserver observer, boolean preLoad) { setObserver(observer); this.interest = interest; this.subject = subject; this.preLoad = preLoad; } public void run() { Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest); if (preLoad) loadChildren(); } private void loadChildren() { C2KLocalObject newMember; EntityProxyObserver observer = getObserver(); if (observer == null) return; //reaped try { // fetch contents of path String children = subject.queryData(interest+"/all"); StringTokenizer tok = new StringTokenizer(children, ","); ArrayList newContents = new ArrayList(); while (tok.hasMoreTokens()) newContents.add(tok.nextToken()); // look to see what's new for (Iterator iter = newContents.iterator(); iter.hasNext();) { String newChild = (String)iter.next(); // load child object try { newMember = subject.getObject(interest+"/"+newChild); contents.remove(newChild); } catch (ObjectNotFoundException ex) { newMember = new MemberControl(MemberControl.ERROR, "Listed member "+newChild+" was not found."); } try { observer.add(newMember); } catch (Throwable ex) { Logger.error("Error publishing member to "+observer); Logger.error(ex); } } // report what's left in old contents as deleted for (Iterator iter = contents.iterator(); iter.hasNext();) { String oldChild = (String)iter.next(); observer.remove(interest+"/"+oldChild); } //replace contents arraylist contents = newContents; //report that we're done observer.add(MemberControl.theEND); } catch (Exception ex) { observer.add(new MemberControl(MemberControl.ERROR, "Query on "+interest+" failed with "+ex)); } } public boolean isRelevant(String path) { Logger.msg(7, "Checking relevance of "+path+" to "+interest); return (path.startsWith(interest)); } public void update(String path, boolean deleted) { EntityProxyObserver observer = getObserver(); if (observer == null) return; //reaped Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); if (!path.startsWith(interest)) // doesn't concern us return; if (path.equals(interest)) // refresh contents loadChildren(); else { String name = path.substring(interest.length()); if (deleted) { Logger.msg(4, "Removing "+path); contents.remove(name); observer.remove(name); } else { try { C2KLocalObject newMember = subject.getObject(path); Logger.msg(4, "Adding "+path); contents.add(name); observer.add(newMember); } catch (ObjectNotFoundException e) { Logger.error("Member Subscription: could not load "+path); Logger.error(e); } } } } public void setObserver(EntityProxyObserver observer) { observerReference = new WeakReference(observer); } public EntityProxyObserver getObserver() { EntityProxyObserver observer = (EntityProxyObserver)observerReference.get(); return observer; } }