add static_configurations annotation to modules.

This commit is contained in:
zhaowei8188127 2013-06-23 16:06:45 +08:00
parent 67ad8ea980
commit c34b24e239
15 changed files with 294 additions and 157 deletions

View File

@ -0,0 +1,43 @@
package haflow.engine.model;
import haflow.utility.ClusterConfiguration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.springframework.beans.factory.annotation.Autowired;
public class GlobalConfiguration {
private Properties properties;
public static final String JOB_TRACKER = "job-tracker";
public static final String NAME_NODE = "name-node";
public static final String MAPRED_JOB_QUEUE_NAME = "mapred.job.queue.name";
public static final String OOZIE_USER_SYSTEM_LIBPATH = "oozie.use.system.libpath";
private Map<String, String> map;
@Autowired
public GlobalConfiguration(ClusterConfiguration cc){
this.map = new HashMap<String, String>();
map.put(JOB_TRACKER, cc.getProperty(JOB_TRACKER));
map.put(NAME_NODE, cc.getProperty(NAME_NODE));
map.put(MAPRED_JOB_QUEUE_NAME, cc.getProperty(MAPRED_JOB_QUEUE_NAME));
map.put(OOZIE_USER_SYSTEM_LIBPATH, cc.getProperty(OOZIE_USER_SYSTEM_LIBPATH));
properties = new Properties();
properties.put(JOB_TRACKER, cc.getProperty(JOB_TRACKER));
properties.put(NAME_NODE, cc.getProperty(NAME_NODE));
properties.put(MAPRED_JOB_QUEUE_NAME, cc.getProperty(MAPRED_JOB_QUEUE_NAME));
properties.put(OOZIE_USER_SYSTEM_LIBPATH, cc.getProperty(OOZIE_USER_SYSTEM_LIBPATH));
}
public String getConf(String key){
return this.properties.getProperty(key);
}
public Map<String, String> getProperties(){
return this.map;
}
}

View File

@ -1,10 +1,11 @@
package haflow.engine.model;
import haflow.dto.entity.Edge;
import haflow.dto.entity.Node;
public class Path {
// private Edge edge;
private Edge edge;
private String sourceEndPoint;
@ -21,6 +22,7 @@ public class Path {
// }
public Path(Edge edge){
this.edge = edge;
this.sourceEndPoint = edge.getSourceEndpoint();
this.targetEndPoint = edge.getTargetEndpoint();
}
@ -45,4 +47,11 @@ public class Path {
// return targetAction;
// }
// public Edge getEdge(){
// return this.edge;
// }
//
public Node getTargetNode(){
return this.edge.getTargetNode();
}
}

View File

@ -11,27 +11,47 @@ import org.w3c.dom.Document;
public class JavaModuleGenerator extends OozieXmlGenerator {
public static final String JOB_TRACKER = "job-tracker";
public static final String NAME_NODE = "name-node";
// public static final String PREPARE = "prepare";
// public static final String JOB_XML = "job_xml";
public static final String MAIN_CLASS = "main_class";
public static final String JAVA_OPT = "java_opt";
public static final String ARG = "arg";
public static final String MAPRED_JOB_QUEUE_NAME = "mapred.job.queue.name";
public static final String OOZIE_USER_SYSTEM_LIBPATH = "oozie.use.system.libpath";
@Override
public Document generate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
try {
String name = configurations.get("name");
String arg = configurations.get("arg");
String ok = configurations.get("ok");
String mainClass = configurations.get("mainClass");
String job_tracker = configurations.get(JOB_TRACKER);
String name_node = configurations.get(NAME_NODE);
String queue_name = configurations.get(MAPRED_JOB_QUEUE_NAME);
String main_class = configurations.get(MAIN_CLASS);
// String java_opt = configurations.get(JAVA_OPT);
String argument = configurations.get(ARG);
String ok = outputs.get("ok").getName();
String error = outputs.get("error").getName();
String xml = "<action name=\"" + name + "\">" + "\n" + "<java>"
+ "\n" + "<job-tracker>${jobTracker}</job-tracker>" + "\n"
+ "<name-node>${nameNode}</name-node>" + "\n"
+ "\n" + "<job-tracker>" + job_tracker + "</job-tracker>" + "\n"
+ "<name-node>" + name_node + "</name-node>" + "\n"
+ "<configuration>" + "\n" + "<property>" + "\n"
+ "<name>mapred.job.queue.name</name>" + "\n"
+ "<value>${queueName}</value>" + "\n" + "</property>"
+ "<value>" + queue_name + "</value>" + "\n" + "</property>"
+ "\n" + "</configuration>" + "\n" + "<main-class>"
+ mainClass + "</main-class>"
+ "\n"
+ // TODO
"<arg>" + "-eee " + arg + "</arg>" + "\n" + "</java>"
+ "\n" + "<ok to=\"" + ok + "\"/>" + "\n"
+ "<error to=\"fail\"/>" + "\n" + "</action>";
+ main_class + "</main-class>" + "\n"
+ "<arg>" + argument + "</arg>" + "\n" + "</java>"
+ "\n" + "<ok to=\"" + ok + "\"/>" + "\n"//ok
+ "<error to=\"" +error + "\"/>" + "\n" + "</action>";
return DocumentBuilderFactory.newInstance().newDocumentBuilder()
.parse(new StringInputStream(xml));
} catch (Exception e) {

View File

@ -10,6 +10,8 @@ import haflow.engine.ValidateFlowResult;
import haflow.engine.model.Action;
import haflow.engine.model.AdjMatrixNode;
import haflow.engine.model.DirectedGraph;
import haflow.engine.model.GlobalConfiguration;
import haflow.engine.model.Path;
import haflow.engine.model.TopologicalSort;
import haflow.module.Module;
import haflow.module.basic.EndModule;
@ -41,6 +43,8 @@ public class OozieEngine extends AbstractEngine {
private HdfsService hdfsService;
private OozieService oozieService;
private FlowDeployService flowDeployService;
private GlobalConfiguration globalConfiguration;
private ModuleLoader getModuleLoader() {
return moduleLoader;
@ -98,6 +102,15 @@ public class OozieEngine extends AbstractEngine {
this.flowDeployService = flowDeployService;
}
public GlobalConfiguration getGlobalConfiguration() {
return globalConfiguration;
}
@Autowired
public void setGlobalConfiguration(GlobalConfiguration globalConfiguration) {
this.globalConfiguration = globalConfiguration;
}
@Override
public ValidateFlowResult validateFlow(Flow flow) {
// TODO Auto-generated method stub
@ -253,6 +266,14 @@ public class OozieEngine extends AbstractEngine {
Action node = graph.getNode(w);
Map<String, String> configurations = new HashMap<String, String>();
//global consistent configurations
configurations.putAll(this.globalConfiguration.getProperties());
//module configurations
//run time configurations
configurations.put("name", node.getNodeName());
List<NodeConfiguration> ncps = this.getNodeConfigurationService()
.getNodeConfiguration(node.getNodeId());
@ -266,9 +287,10 @@ public class OozieEngine extends AbstractEngine {
List<AdjMatrixNode> adj = graph.getAdjacent(w);
for (AdjMatrixNode v : adj) {
if (sorted.contains(v)) {
Action action = graph.getNode(v.getIndex());
outputs.put(v.getPath().getSourceEndPoint(), // "ok"
action.getNode());
// Action action = graph.getNode(v.getIndex());
Path path = v.getPath();
outputs.put(path.getSourceEndPoint(), // "ok"
path.getTargetNode());
break;
}
}

View File

@ -1,24 +1,26 @@
package haflow.module;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Module {
String id();
String name();
String category();
ModuleType type();
ModuleConfiguration[] configurations();
ModuleEndpoint[] inputs();
ModuleEndpoint[] outputs();
}
package haflow.module;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Module {
String id();
String name();
String category();
ModuleType type();
ModuleConfiguration[] configurations();
ModuleStaticConfiguration[] static_configurations();
ModuleEndpoint[] inputs();
ModuleEndpoint[] outputs();
}

View File

@ -0,0 +1,14 @@
package haflow.module;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ModuleStaticConfiguration {
String key();
String value();
}

View File

@ -1,39 +1,50 @@
package haflow.module.basic;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleType;
import java.util.Map;
@Module(id = "ada600a8-aa63-968a-ca46-9085e0e0bd2e", name = "Java", category = "Basic", type = ModuleType.JAVA,
configurations = {
@ModuleConfiguration(key = "job-tracker", displayName = "Job Tracker"),
@ModuleConfiguration(key = "name-node", displayName = "Name Node"),
@ModuleConfiguration(key = "prepare.mkdir", displayName = "Prepare: Make Directory"),
@ModuleConfiguration(key = "prepare.delete", displayName = "Prepare: Delete"),
@ModuleConfiguration(key = "job-xml", displayName = "Job Xml"),
@ModuleConfiguration(key = "configuration", displayName = "Configuration"),
@ModuleConfiguration(key = "main-class", displayName = "Main Class"),
@ModuleConfiguration(key = "java-opts", displayName = "Java Options"),
@ModuleConfiguration(key = "arg", displayName = "Arguments"),
@ModuleConfiguration(key = "file", displayName = "File"),
@ModuleConfiguration(key = "archive", displayName = "Archive"),
@ModuleConfiguration(key = "capture-output", displayName = "Capture Output") },
inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) },
outputs = { @ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
public class JavaModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
}
package haflow.module.basic;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleStaticConfiguration;
import haflow.module.ModuleType;
import java.util.Map;
@Module(id = "ada600a8-aa63-968a-ca46-9085e0e0bd2e", name = "Java", category = "Basic", type = ModuleType.JAVA,
configurations = {
@ModuleConfiguration(key = "job-tracker", displayName = "Job Tracker"),
@ModuleConfiguration(key = "name-node", displayName = "Name Node"),
@ModuleConfiguration(key = "prepare.mkdir", displayName = "Prepare: Make Directory"),
@ModuleConfiguration(key = "prepare.delete", displayName = "Prepare: Delete"),
@ModuleConfiguration(key = "job-xml", displayName = "Job Xml"),
@ModuleConfiguration(key = "configuration", displayName = "Configuration"),
@ModuleConfiguration(key = "main-class", displayName = "Main Class"),
@ModuleConfiguration(key = "java-opts", displayName = "Java Options"),
@ModuleConfiguration(key = "arg", displayName = "Arguments"),
@ModuleConfiguration(key = "file", displayName = "File"),
@ModuleConfiguration(key = "archive", displayName = "Archive"),
@ModuleConfiguration(key = "capture-output", displayName = "Capture Output") },
static_configurations={@ModuleStaticConfiguration(key="main_class", value="haflow.module.basic.JavaModule")},
inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) },
outputs = { @ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
public class DemoJavaModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
public static void main(String[] args) {
System.out.println("Demo Java Main");
System.out.println("# Arguments: " + args.length);
for (int i = 0; i < args.length; i++) {
System.out.println("Argument[" + i + "]: " + args[i]);
}
}
}

View File

@ -1,24 +1,26 @@
package haflow.module.basic;
import java.util.Map;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleType;
@Module(id = "a0d027c3-a4bd-61b5-5063-134ff71f8122", name = "End", category = "Basic", type = ModuleType.END,
configurations = { @ModuleConfiguration(key = "name", displayName = "Name") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {})
public final class EndModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
}
package haflow.module.basic;
import java.util.Map;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleType;
@Module(id = "a0d027c3-a4bd-61b5-5063-134ff71f8122", name = "End", category = "Basic", type = ModuleType.END,
configurations = { @ModuleConfiguration(key = "name", displayName = "Name") },
static_configurations={},
inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {})
public final class EndModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -15,7 +15,9 @@ import haflow.module.ModuleType;
@ModuleConfiguration(key = "delete", displayName = "Delete"),
@ModuleConfiguration(key = "mkdir", displayName = "Make Directory"),
@ModuleConfiguration(key = "move", displayName = "Move"),
@ModuleConfiguration(key = "chmod", displayName = "Change Mode") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
@ModuleConfiguration(key = "chmod", displayName = "Change Mode") },
static_configurations={},
inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
public class FileSystemModule extends AbstractModule {

View File

@ -1,26 +1,28 @@
package haflow.module.basic;
import java.util.Map;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleType;
@Module(id = "b0d027c3-a4bd-61b5-5063-134ff71f8123", name = "Kill", category = "Basic", type = ModuleType.KILL,
configurations = {
@ModuleConfiguration(key = "name", displayName = "Name"),
@ModuleConfiguration(key = "message", displayName = "Message") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {})
public final class KillModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
}
package haflow.module.basic;
import java.util.Map;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleType;
@Module(id = "b0d027c3-a4bd-61b5-5063-134ff71f8123", name = "Kill", category = "Basic", type = ModuleType.KILL,
configurations = {
@ModuleConfiguration(key = "name", displayName = "Name"),
@ModuleConfiguration(key = "message", displayName = "Message") },
static_configurations={},
inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {})
public final class KillModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -20,7 +20,9 @@ import haflow.module.ModuleType;
@ModuleConfiguration(key = "streaming.reducer", displayName = "Streaming:Reducer"),
@ModuleConfiguration(key = "streaming.record-reader", displayName = "Streaming:Record Reader"),
@ModuleConfiguration(key = "streaming.record-reader-mapping", displayName = "Streaming:Record Reader Mapping"),
@ModuleConfiguration(key = "streaming.env", displayName = "Streaming:Environment") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
@ModuleConfiguration(key = "streaming.env", displayName = "Streaming:Environment") },
static_configurations = {},
inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
public class MapReduceModule extends AbstractModule {

View File

@ -22,7 +22,9 @@ import haflow.module.ModuleType;
@ModuleConfiguration(key = "param", displayName = "Parameters"),
@ModuleConfiguration(key = "argument", displayName = "Arguments"),
@ModuleConfiguration(key = "file", displayName = "File"),
@ModuleConfiguration(key = "archive", displayName = "Archive"), }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
@ModuleConfiguration(key = "archive", displayName = "Archive"), },
static_configurations = {},
inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
public class PigModule extends AbstractModule {

View File

@ -1,23 +1,25 @@
package haflow.module.basic;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleType;
import java.util.Map;
@Module(id = "a208d7d2-a8ff-2493-64c2-36f50bc95752", name = "Start", category = "Basic", type = ModuleType.START,
configurations = {}, inputs = {}, outputs = { @ModuleEndpoint(name = "to", maxNumber = 1, minNumber = 1, dataType = DataType.PlainText) })
public final class StartModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
}
package haflow.module.basic;
import haflow.dto.entity.Node;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleEndpoint;
import haflow.module.AbstractModule;
import haflow.module.ModuleType;
import java.util.Map;
@Module(id = "a208d7d2-a8ff-2493-64c2-36f50bc95752", name = "Start", category = "Basic", type = ModuleType.START,
configurations = {},
static_configurations = {},
inputs = {}, outputs = { @ModuleEndpoint(name = "to", maxNumber = 1, minNumber = 1, dataType = DataType.PlainText) })
public final class StartModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, Node> inputs, Map<String, Node> outputs) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -11,7 +11,9 @@ import java.util.Map;
@Module(id = "6e744dc4-edc6-eca2-07d5-28ff55a75b2d", name = "Preprocess", category = "zrace", type = ModuleType.JAVA,
configurations = {
@ModuleConfiguration(key = "input_path", displayName = "input path"),
@ModuleConfiguration(key = "output_path", displayName = "output path") }, inputs = {}, outputs = {})
@ModuleConfiguration(key = "output_path", displayName = "output path") },
static_configurations = {},
inputs = {}, outputs = {})
public class ZracePreprocess extends AbstractModule {
@Override

View File

@ -11,7 +11,9 @@ import haflow.module.ModuleType;
import java.util.Map;
@Module(category = "Test", type=ModuleType.OTHER,
configurations = { @ModuleConfiguration(key = "test", displayName = "Test") }, id = "92c5e828-0d02-bc7f-8825-7bbb6f48f2f1", inputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testInput", dataType = DataType.PlainText) }, name = "TestModule", outputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testOutput", dataType = DataType.PlainText) })
configurations = { @ModuleConfiguration(key = "test", displayName = "Test") }, id = "92c5e828-0d02-bc7f-8825-7bbb6f48f2f1",
static_configurations = {},
inputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testInput", dataType = DataType.PlainText) }, name = "TestModule", outputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testOutput", dataType = DataType.PlainText) })
public class TestModule extends AbstractModule {
@Override