summaryrefslogtreecommitdiff
path: root/source/com/c2kernel/process/UserCodeProcess.java
diff options
context:
space:
mode:
Diffstat (limited to 'source/com/c2kernel/process/UserCodeProcess.java')
-rwxr-xr-xsource/com/c2kernel/process/UserCodeProcess.java229
1 files changed, 229 insertions, 0 deletions
diff --git a/source/com/c2kernel/process/UserCodeProcess.java b/source/com/c2kernel/process/UserCodeProcess.java
new file mode 100755
index 0000000..7eff37b
--- /dev/null
+++ b/source/com/c2kernel/process/UserCodeProcess.java
@@ -0,0 +1,229 @@
+package com.c2kernel.process;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import com.c2kernel.common.InvalidDataException;
+import com.c2kernel.common.InvalidTransitionException;
+import com.c2kernel.entity.C2KLocalObject;
+import com.c2kernel.entity.agent.Job;
+import com.c2kernel.entity.proxy.AgentProxy;
+import com.c2kernel.entity.proxy.EntityProxyObserver;
+import com.c2kernel.entity.proxy.MemberControl;
+import com.c2kernel.lifecycle.instance.stateMachine.Transitions;
+import com.c2kernel.persistency.ClusterStorage;
+import com.c2kernel.utils.Logger;
+
+/**************************************************************************
+ *
+ * $Revision: 1.31 $
+ * $Date: 2004/10/21 08:02:19 $
+ *
+ * Copyright (C) 2003 CERN - European Organization for Nuclear Research
+ * All rights reserved.
+ **************************************************************************/
+public class UserCodeProcess extends StandardClient implements EntityProxyObserver, Runnable {
+ protected AgentProxy agent;
+ static boolean active = true;
+ ArrayList ignoredPaths = new ArrayList();
+ HashMap jobs;
+
+ public UserCodeProcess(String agentName, String agentPass) {
+ // login - try for a while in case server hasn't imported our user yet
+ for (int i=1;i<6;i++) {
+ try {
+ Logger.msg("Login attempt "+i+" of 5");
+ agent = Gateway.connect(agentName, agentPass);
+ break;
+ } catch (InvalidDataException ex) {
+ Logger.error("Could not log in.");
+ Logger.error(ex);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ex2) { }
+ }
+ }
+ System.out.println(getDesc()+" initialised for " + agentName);
+ }
+
+ public void run() {
+ Thread.currentThread().setName("Usercode Process");
+ jobs = new HashMap();
+ // subscribe to job list
+ agent.subscribe(this, ClusterStorage.JOB, true);
+ while (active) {
+ Job thisJob = null;
+ synchronized (jobs) {
+ if (jobs.size() > 0) {
+ thisJob = getJob(jobs, Transitions.COMPLETE);
+ if (thisJob == null)
+ thisJob = getJob(jobs, Transitions.START);
+ if (thisJob == null)
+ thisJob = getJob(jobs, Transitions.SUSPEND);
+ if (thisJob == null)
+ thisJob = getJob(jobs, Transitions.RESUME);
+
+ if (thisJob == null) {
+ Logger.error("No supported jobs, but joblist is not empty! Discarding remaining jobs");
+ jobs.clear();
+ }
+ else
+ jobs.remove(ClusterStorage.getPath(thisJob));
+ }
+ }
+
+ if (thisJob != null) {
+ String jobKey = thisJob.getItemSysKey()+":"+thisJob.getStepPath();
+ try {
+ if (thisJob.getPossibleTransition()==Transitions.START) {
+ Logger.msg(5, "Testing start conditions");
+ boolean start = assessStartConditions(thisJob);
+ if (start) {
+ Logger.msg(5, "Attempting to start");
+ agent.execute(thisJob);
+ }
+ else {
+ Logger.msg(5, "Start conditions failed "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
+ }
+ }
+ else if (thisJob.getPossibleTransition()==Transitions.COMPLETE) {
+ Logger.msg(5, "Executing logic");
+ runUCLogic(thisJob);
+ if (ignoredPaths.contains(jobKey))
+ ignoredPaths.remove(jobKey);
+ }
+ else if (thisJob.getPossibleTransition()==Transitions.SUSPEND) {
+ if (ignoredPaths.contains(jobKey))
+ agent.execute(thisJob);
+ }
+ else if (thisJob.getPossibleTransition()==Transitions.RESUME) {
+ if (!ignoredPaths.contains(jobKey))
+ agent.execute(thisJob);
+ }
+ } catch (InvalidTransitionException ex) {
+ // must have already been done by someone else - ignore
+ } catch (Throwable ex) {
+ Logger.error("Error executing "+Transitions.getTransitionName(thisJob.getPossibleTransition())+" job:");
+ Logger.error(ex);
+ ignoredPaths.add(jobKey);
+ }
+ }
+ try {
+ synchronized (jobs) {
+ if (jobs.size() == 0) {
+ Logger.msg("Sleeping");
+ while (active && jobs.size() == 0)
+ jobs.wait(2000);
+ }
+ }
+ } catch (InterruptedException ex) { }
+ }
+
+ // shut down
+ try
+ {
+ standardTearDown();
+ }
+ catch( Exception ex )
+ {
+ Logger.error(ex);
+ }
+ }
+
+ private Job getJob(HashMap jobs, int transition) {
+ for (Iterator list = jobs.values().iterator();list.hasNext();) {
+ Job thisJob = (Job)list.next();
+ if (thisJob.getPossibleTransition() == transition) {
+ Logger.msg(1,"=================================================================");
+ Logger.msg(1, "Got "+Transitions.getTransitionName(transition)+" job for "+thisJob.getStepName()+" in "+thisJob.getItemSysKey());
+ return thisJob;
+ }
+ }
+ return null;
+ }
+
+ public boolean assessStartConditions(Job job) {
+ // default implementation - has no start conditions.
+ return true;
+ }
+
+ public void runUCLogic(Job job) throws Exception {
+ // default implementation - the agent will execute any scripts defined when we execute
+ agent.execute(job);
+ }
+
+
+ /**
+ * Receives job from the AgentProxy. Reactivates thread if sleeping.
+ */
+ public void add(C2KLocalObject contents) {
+ if (contents instanceof MemberControl) {
+ MemberControl ctrl = (MemberControl)contents;
+ Logger.msg(5,ctrl.toString());
+ }
+ else
+ synchronized(jobs) {
+ Logger.msg(7, "Adding "+ClusterStorage.getPath(contents));
+ jobs.put(ClusterStorage.getPath(contents), contents);
+ jobs.notify();
+ }
+
+ }
+
+ /**
+ * Removes job removal notification from the AgentProxy.
+ */
+ public void remove(String id) {
+ synchronized(jobs) {
+ Logger.msg(7, "Deleting "+id);
+ jobs.remove(id);
+ }
+ }
+
+ public static UserCodeProcess getInstance() throws UnknownHostException {
+ return new UserCodeProcess(InetAddress.getLocalHost().getHostName(), "uc");
+ }
+
+ static public void main(String[] args)
+ {
+ int status = 0;
+
+ try
+ {
+ standardSetUp(args);
+ UserCodeProcess proc = getInstance();
+ new Thread(proc).start();
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ public void run() {
+ shutdown();
+ }
+ }));
+ }
+ catch( Exception ex )
+ {
+ Logger.error(ex);
+
+ try
+ {
+ standardTearDown();
+ }
+ catch(Exception ex1)
+ {
+ Logger.error(ex1);
+ }
+ status = 1;
+ System.exit(status);
+ }
+ }
+
+ public String getDesc() {
+ return("Usercode Process");
+ }
+
+ public static void shutdown() {
+ active = false;
+ }
+}