From 254ee6f47eebfc00462c10756a92066e82cc1a96 Mon Sep 17 00:00:00 2001 From: Andrew Branson Date: Tue, 21 Jun 2011 15:46:02 +0200 Subject: Initial commit --- .../c2kernel/entity/proxy/MemberSubscription.java | 121 +++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100755 source/com/c2kernel/entity/proxy/MemberSubscription.java (limited to 'source/com/c2kernel/entity/proxy/MemberSubscription.java') diff --git a/source/com/c2kernel/entity/proxy/MemberSubscription.java b/source/com/c2kernel/entity/proxy/MemberSubscription.java new file mode 100755 index 0000000..fdd3e96 --- /dev/null +++ b/source/com/c2kernel/entity/proxy/MemberSubscription.java @@ -0,0 +1,121 @@ + +package com.c2kernel.entity.proxy; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.StringTokenizer; + +import com.c2kernel.common.ObjectNotFoundException; +import com.c2kernel.entity.C2KLocalObject; +import com.c2kernel.utils.Logger; + +public class MemberSubscription implements Runnable { + EntityProxy subject; + String interest; + // keep the subscriber by weak reference, so it is not kept from the garbage collector if no longer used + WeakReference observerReference; + ArrayList contents = new ArrayList(); + boolean preLoad; + + public MemberSubscription(EntityProxy subject, String interest, + EntityProxyObserver observer, boolean preLoad) { + setObserver(observer); + this.interest = interest; + this.subject = subject; + this.preLoad = preLoad; + } + + public void run() { + Thread.currentThread().setName("Member Subscription: "+subject.getSystemKey()+":"+interest); + if (preLoad) loadChildren(); + } + + private void loadChildren() { + C2KLocalObject newMember; + EntityProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + try { + // fetch contents of path + String children = subject.queryData(interest+"/all"); + StringTokenizer tok = new StringTokenizer(children, ","); + ArrayList newContents = new ArrayList(); + while (tok.hasMoreTokens()) + newContents.add(tok.nextToken()); + + // look to see what's new + for (Iterator iter = newContents.iterator(); iter.hasNext();) { + String newChild = (String)iter.next(); + + // load child object + try { + newMember = subject.getObject(interest+"/"+newChild); + contents.remove(newChild); + } catch (ObjectNotFoundException ex) { + newMember = new MemberControl(MemberControl.ERROR, "Listed member "+newChild+" was not found."); + } + try { + observer.add(newMember); + } catch (Throwable ex) { + Logger.error("Error publishing member to "+observer); + Logger.error(ex); + } + } + // report what's left in old contents as deleted + for (Iterator iter = contents.iterator(); iter.hasNext();) { + String oldChild = (String)iter.next(); + observer.remove(interest+"/"+oldChild); + } + //replace contents arraylist + contents = newContents; + //report that we're done + observer.add(MemberControl.theEND); + } catch (Exception ex) { + observer.add(new MemberControl(MemberControl.ERROR, "Query on "+interest+" failed with "+ex)); + } + } + + public boolean isRelevant(String path) { + Logger.msg(7, "Checking relevance of "+path+" to "+interest); + return (path.startsWith(interest)); + } + + public void update(String path, boolean deleted) { + EntityProxyObserver observer = getObserver(); + if (observer == null) return; //reaped + Logger.msg(7, "Processing proxy message path "+path +" for "+observer+". Interest: "+interest+" Was Deleted:"+deleted); + if (!path.startsWith(interest)) // doesn't concern us + return; + + if (path.equals(interest)) // refresh contents + loadChildren(); + else { + String name = path.substring(interest.length()); + if (deleted) { + Logger.msg(4, "Removing "+path); + contents.remove(name); + observer.remove(name); + } + else { + try { + C2KLocalObject newMember = subject.getObject(path); + Logger.msg(4, "Adding "+path); + contents.add(name); + observer.add(newMember); + } catch (ObjectNotFoundException e) { + Logger.error("Member Subscription: could not load "+path); + Logger.error(e); + } + } + } + } + + public void setObserver(EntityProxyObserver observer) { + observerReference = new WeakReference(observer); + } + + public EntityProxyObserver getObserver() { + EntityProxyObserver observer = (EntityProxyObserver)observerReference.get(); + return observer; + } +} + -- cgit v1.2.3