summaryrefslogtreecommitdiff
path: root/source/com/c2kernel/process/UserCodeProcess.java
diff options
context:
space:
mode:
Diffstat (limited to 'source/com/c2kernel/process/UserCodeProcess.java')
-rw-r--r--source/com/c2kernel/process/UserCodeProcess.java71
1 files changed, 38 insertions, 33 deletions
diff --git a/source/com/c2kernel/process/UserCodeProcess.java b/source/com/c2kernel/process/UserCodeProcess.java
index 18fed06..9c2deab 100644
--- a/source/com/c2kernel/process/UserCodeProcess.java
+++ b/source/com/c2kernel/process/UserCodeProcess.java
@@ -4,7 +4,6 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import com.c2kernel.common.InvalidDataException;
import com.c2kernel.common.InvalidTransitionException;
@@ -12,7 +11,7 @@ import com.c2kernel.entity.C2KLocalObject;
import com.c2kernel.entity.agent.Job;
import com.c2kernel.entity.proxy.AgentProxy;
import com.c2kernel.entity.proxy.EntityProxyObserver;
-import com.c2kernel.entity.proxy.MemberControl;
+import com.c2kernel.entity.proxy.MemberSubscription;
import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
import com.c2kernel.persistency.ClusterStorage;
import com.c2kernel.utils.Logger;
@@ -25,12 +24,12 @@ import com.c2kernel.utils.Logger;
* Copyright (C) 2003 CERN - European Organization for Nuclear Research
* All rights reserved.
**************************************************************************/
-public class UserCodeProcess extends StandardClient implements EntityProxyObserver, Runnable {
+public class UserCodeProcess extends StandardClient implements EntityProxyObserver<Job>, Runnable {
protected AgentProxy agent;
static boolean active = true;
ArrayList<String> ignoredPaths = new ArrayList<String>();
HashMap<String, C2KLocalObject> jobs;
-
+
public UserCodeProcess(String agentName, String agentPass) {
// login - try for a while in case server hasn't imported our user yet
for (int i=1;i<6;i++) {
@@ -48,12 +47,13 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
}
System.out.println(getDesc()+" initialised for " + agentName);
}
-
- public void run() {
+
+ @Override
+ public void run() {
Thread.currentThread().setName("Usercode Process");
jobs = new HashMap<String, C2KLocalObject>();
// subscribe to job list
- agent.subscribe(this, ClusterStorage.JOB, true);
+ agent.subscribe(new MemberSubscription<Job>(this, ClusterStorage.JOB, true));
while (active) {
Job thisJob = null;
synchronized (jobs) {
@@ -62,10 +62,10 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
if (thisJob == null)
thisJob = getJob(jobs, Transitions.START);
if (thisJob == null)
- thisJob = getJob(jobs, Transitions.SUSPEND);
+ thisJob = getJob(jobs, Transitions.SUSPEND);
if (thisJob == null)
thisJob = getJob(jobs, Transitions.RESUME);
-
+
if (thisJob == null) {
Logger.error("No supported jobs, but joblist is not empty! Discarding remaining jobs");
jobs.clear();
@@ -74,7 +74,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
jobs.remove(ClusterStorage.getPath(thisJob));
}
}
-
+
if (thisJob != null) {
String jobKey = thisJob.getItemSysKey()+":"+thisJob.getStepPath();
try {
@@ -82,8 +82,8 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
Logger.msg(5, "Testing start conditions");
boolean start = assessStartConditions(thisJob);
if (start) {
- Logger.msg(5, "Attempting to start");
- agent.execute(thisJob);
+ Logger.msg(5, "Attempting to start");
+ agent.execute(thisJob);
}
else {
Logger.msg(5, "Start conditions failed "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
@@ -103,7 +103,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
if (!ignoredPaths.contains(jobKey))
agent.execute(thisJob);
}
- } catch (InvalidTransitionException ex) {
+ } catch (InvalidTransitionException ex) {
// must have already been done by someone else - ignore
} catch (Throwable ex) {
Logger.error("Error executing "+Transitions.getTransitionName(thisJob.getPossibleTransition())+" job:");
@@ -121,7 +121,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
}
} catch (InterruptedException ex) { }
}
-
+
// shut down
try
{
@@ -132,10 +132,10 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
Logger.error(ex);
}
}
-
- private Job getJob(HashMap jobs, int transition) {
- for (Iterator list = jobs.values().iterator();list.hasNext();) {
- Job thisJob = (Job)list.next();
+
+ private static Job getJob(HashMap<String, C2KLocalObject> jobs, int transition) {
+ for (C2KLocalObject c2kLocalObject : jobs.values()) {
+ Job thisJob = (Job)c2kLocalObject;
if (thisJob.getPossibleTransition() == transition) {
Logger.msg(1,"=================================================================");
Logger.msg(1, "Got "+Transitions.getTransitionName(transition)+" job for "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
@@ -144,27 +144,23 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
}
return null;
}
-
+
public boolean assessStartConditions(Job job) {
- // default implementation - has no start conditions.
+ // default implementation - has no start conditions.
return true;
}
-
+
public void runUCLogic(Job job) throws Exception {
// default implementation - the agent will execute any scripts defined when we execute
agent.execute(job);
}
-
-
+
+
/**
* Receives job from the AgentProxy. Reactivates thread if sleeping.
*/
- public void add(C2KLocalObject contents) {
- if (contents instanceof MemberControl) {
- MemberControl ctrl = (MemberControl)contents;
- Logger.msg(5,ctrl.toString());
- }
- else
+ @Override
+ public void add(Job contents) {
synchronized(jobs) {
Logger.msg(7, "Adding "+ClusterStorage.getPath(contents));
jobs.put(ClusterStorage.getPath(contents), contents);
@@ -172,11 +168,18 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
}
}
+
+ @Override
+ public void control(String control, String msg) {
+ if (control == MemberSubscription.ERROR)
+ Logger.error("Error in job subscription: "+msg);
+ }
/**
* Removes job removal notification from the AgentProxy.
*/
- public void remove(String id) {
+ @Override
+ public void remove(String id) {
synchronized(jobs) {
Logger.msg(7, "Deleting "+id);
jobs.remove(id);
@@ -186,7 +189,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
public static UserCodeProcess getInstance() throws UnknownHostException {
return new UserCodeProcess(InetAddress.getLocalHost().getHostName(), "uc");
}
-
+
static public void main(String[] args)
{
int status = 0;
@@ -197,7 +200,8 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
UserCodeProcess proc = getInstance();
new Thread(proc).start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- public void run() {
+ @Override
+ public void run() {
shutdown();
}
}));
@@ -218,7 +222,7 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
System.exit(status);
}
}
-
+
public String getDesc() {
return("Usercode Process");
}
@@ -226,4 +230,5 @@ public class UserCodeProcess extends StandardClient implements EntityProxyObserv
public static void shutdown() {
active = false;
}
+
}