Merge pull request #28 from lostcharlie/master
Workflow engine support added.
This commit is contained in:
commit
6a5b8bda39
|
@ -0,0 +1,9 @@
|
|||
package haflow.engine;
|
||||
|
||||
import haflow.entity.Flow;
|
||||
|
||||
public abstract class AbstractEngine {
|
||||
public abstract ValidateFlowResult validateFlow(Flow flow);
|
||||
|
||||
public abstract RunFlowResult runFlow(Flow flow);
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package haflow.engine;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface Engine {
|
||||
String name();
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package haflow.engine;
|
||||
|
||||
public class RunFlowResult {
|
||||
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package haflow.engine;
|
||||
|
||||
public class ValidateFlowResult {
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package haflow.engine.oozie;
|
||||
|
||||
import haflow.entity.Node;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.w3c.dom.Document;
|
||||
|
||||
public class EndModuleGenerator extends OozieXmlGenerator {
|
||||
|
||||
@Override
|
||||
public Document generate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package haflow.engine.oozie;
|
||||
|
||||
import haflow.entity.Node;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.w3c.dom.Document;
|
||||
|
||||
public class KillModuleGenerator extends OozieXmlGenerator {
|
||||
|
||||
@Override
|
||||
public Document generate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package haflow.engine.oozie;
|
||||
|
||||
import haflow.engine.AbstractEngine;
|
||||
import haflow.engine.Engine;
|
||||
import haflow.engine.RunFlowResult;
|
||||
import haflow.engine.ValidateFlowResult;
|
||||
import haflow.entity.Flow;
|
||||
|
||||
@Engine(name = "Oozie")
|
||||
public class OozieEngine extends AbstractEngine {
|
||||
|
||||
@Override
|
||||
public ValidateFlowResult validateFlow(Flow flow) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunFlowResult runFlow(Flow flow) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package haflow.engine.oozie;
|
||||
|
||||
import haflow.entity.Node;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.w3c.dom.Document;
|
||||
|
||||
public abstract class OozieXmlGenerator {
|
||||
public abstract Document generate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs);
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package haflow.engine.oozie;
|
||||
|
||||
import haflow.entity.Node;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.w3c.dom.Document;
|
||||
|
||||
public class StartModuleGenerator extends OozieXmlGenerator {
|
||||
|
||||
@Override
|
||||
public Document generate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -4,7 +4,7 @@ import haflow.entity.Node;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
public interface ModuleMetadata {
|
||||
public String generate(Map<String, String> configurations,
|
||||
public abstract class AbstractModule {
|
||||
public abstract boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs);
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package haflow.module;
|
||||
|
||||
public enum DataType {
|
||||
None, PlainText
|
||||
}
|
|
@ -13,4 +13,6 @@ public @interface ModuleEndpoint {
|
|||
int minNumber();
|
||||
|
||||
int maxNumber();
|
||||
|
||||
DataType dataType();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package haflow.module.basic;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
@Module(id = "a0d027c3-a4bd-61b5-5063-134ff71f8122", name = "End", category = "Basic", configurations = { @ModuleConfiguration(key = "name", displayName = "Name") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {})
|
||||
public class EndModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,26 +1,28 @@
|
|||
package haflow.module.oozie;
|
||||
package haflow.module.basic;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
@Module(id = "a966ef60-f825-4ed9-146b-deef78805088", name = "FileSystem", category = "Oozie", configurations = {
|
||||
@Module(id = "a966ef60-f825-4ed9-146b-deef78805088", name = "FileSystem", category = "Basic", configurations = {
|
||||
@ModuleConfiguration(key = "delete", displayName = "Delete"),
|
||||
@ModuleConfiguration(key = "mkdir", displayName = "Make Directory"),
|
||||
@ModuleConfiguration(key = "move", displayName = "Move"),
|
||||
@ModuleConfiguration(key = "chmod", displayName = "Change Mode") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1) })
|
||||
public class FileSystem implements ModuleMetadata {
|
||||
@ModuleConfiguration(key = "chmod", displayName = "Change Mode") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
|
||||
public class FileSystemModule extends AbstractModule {
|
||||
|
||||
public String generate(Map<String, String> configurations,
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package haflow.module.basic;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Module(id = "ada600a8-aa63-968a-ca46-9085e0e0bd2e", name = "Java", category = "Basic", configurations = {
|
||||
@ModuleConfiguration(key = "job-tracker", displayName = "Job Tracker"),
|
||||
@ModuleConfiguration(key = "name-node", displayName = "Name Node"),
|
||||
@ModuleConfiguration(key = "prepare.mkdir", displayName = "Prepare: Make Directory"),
|
||||
@ModuleConfiguration(key = "prepare.delete", displayName = "Prepare: Delete"),
|
||||
@ModuleConfiguration(key = "job-xml", displayName = "Job Xml"),
|
||||
@ModuleConfiguration(key = "configuration", displayName = "Configuration"),
|
||||
@ModuleConfiguration(key = "main-class", displayName = "Main Class"),
|
||||
@ModuleConfiguration(key = "java-opts", displayName = "Java Options"),
|
||||
@ModuleConfiguration(key = "arg", displayName = "Arguments"),
|
||||
@ModuleConfiguration(key = "file", displayName = "File"),
|
||||
@ModuleConfiguration(key = "archive", displayName = "Archive"),
|
||||
@ModuleConfiguration(key = "capture-output", displayName = "Capture Output") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
|
||||
public class JavaModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,26 +1,24 @@
|
|||
package haflow.module.oozie;
|
||||
package haflow.module.basic;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
@Module(id = "b0d027c3-a4bd-61b5-5063-134ff71f8123", name = "Kill", category = "Oozie", configurations = {
|
||||
@Module(id = "b0d027c3-a4bd-61b5-5063-134ff71f8123", name = "Kill", category = "Basic", configurations = {
|
||||
@ModuleConfiguration(key = "name", displayName = "Name"),
|
||||
@ModuleConfiguration(key = "message", displayName = "Message") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1) }, outputs = {})
|
||||
public class KillModule implements ModuleMetadata {
|
||||
@ModuleConfiguration(key = "message", displayName = "Message") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {})
|
||||
public class KillModule extends AbstractModule {
|
||||
|
||||
// TODO: Fix it
|
||||
public String generate(Map<String, String> configurations,
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
String name = configurations.get("name");
|
||||
String xml = "<kill name=\""
|
||||
+ name
|
||||
+ "\"><message>Work flow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message></kill>";
|
||||
return xml;
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,14 +1,15 @@
|
|||
package haflow.module.oozie;
|
||||
package haflow.module.basic;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
@Module(id = "35267c79-5221-3a0e-d485-605fa8e4b191", name = "MapReduce", category = "Oozie", configurations = {
|
||||
@Module(id = "35267c79-5221-3a0e-d485-605fa8e4b191", name = "MapReduce", category = "Basic", configurations = {
|
||||
@ModuleConfiguration(key = "job-tracker", displayName = "Job Tracker"),
|
||||
@ModuleConfiguration(key = "name-node", displayName = "Name Node"),
|
||||
@ModuleConfiguration(key = "prepare.delete", displayName = "Prepare:Delete"),
|
||||
|
@ -17,15 +18,16 @@ import haflow.module.ModuleMetadata;
|
|||
@ModuleConfiguration(key = "streaming.reducer", displayName = "Streaming:Reducer"),
|
||||
@ModuleConfiguration(key = "streaming.record-reader", displayName = "Streaming:Record Reader"),
|
||||
@ModuleConfiguration(key = "streaming.record-reader-mapping", displayName = "Streaming:Record Reader Mapping"),
|
||||
@ModuleConfiguration(key = "streaming.env", displayName = "Streaming:Environment") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1) })
|
||||
public class MapReduceModule implements ModuleMetadata {
|
||||
@ModuleConfiguration(key = "streaming.env", displayName = "Streaming:Environment") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
|
||||
public class MapReduceModule extends AbstractModule {
|
||||
|
||||
public String generate(Map<String, String> configurations,
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,14 +1,15 @@
|
|||
package haflow.module.oozie;
|
||||
package haflow.module.basic;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
@Module(id = "1c32fa2b-a5ec-4db7-6f29-0bd4e969af67", name = "Pig", category = "Oozie", configurations = {
|
||||
@Module(id = "1c32fa2b-a5ec-4db7-6f29-0bd4e969af67", name = "Pig", category = "Basic", configurations = {
|
||||
@ModuleConfiguration(key = "job-tracker", displayName = "Job Tracker"),
|
||||
@ModuleConfiguration(key = "name-node", displayName = "Name Node"),
|
||||
@ModuleConfiguration(key = "prepare.mkdir", displayName = "Prepare: Make Directory"),
|
||||
|
@ -19,15 +20,16 @@ import haflow.module.ModuleMetadata;
|
|||
@ModuleConfiguration(key = "param", displayName = "Parameters"),
|
||||
@ModuleConfiguration(key = "argument", displayName = "Arguments"),
|
||||
@ModuleConfiguration(key = "file", displayName = "File"),
|
||||
@ModuleConfiguration(key = "archive", displayName = "Archive"), }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1) })
|
||||
public class PigModule implements ModuleMetadata {
|
||||
@ModuleConfiguration(key = "archive", displayName = "Archive"), }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
|
||||
public class PigModule extends AbstractModule {
|
||||
|
||||
public String generate(Map<String, String> configurations,
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package haflow.module.basic;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Module(id = "a208d7d2-a8ff-2493-64c2-36f50bc95752", name = "Start", category = "Basic", configurations = {}, inputs = {}, outputs = { @ModuleEndpoint(name = "to", maxNumber = 1, minNumber = 1, dataType = DataType.PlainText) })
|
||||
public class StartModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,21 +0,0 @@
|
|||
package haflow.module.oozie;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
|
||||
@Module(id = "a0d027c3-a4bd-61b5-5063-134ff71f8122", name = "End", category = "Oozie", configurations = { @ModuleConfiguration(key = "name", displayName = "Name") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1) }, outputs = {})
|
||||
public class EndModule implements ModuleMetadata {
|
||||
|
||||
public String generate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
String name = configurations.get("name");
|
||||
String xml = "<end name=\"" + name + "\"/>";
|
||||
return xml;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
package haflow.module.oozie;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Module(id = "ada600a8-aa63-968a-ca46-9085e0e0bd2e", name = "Java", category = "Oozie", configurations = {
|
||||
@ModuleConfiguration(key = "job-tracker", displayName = "Job Tracker"),
|
||||
@ModuleConfiguration(key = "name-node", displayName = "Name Node"),
|
||||
@ModuleConfiguration(key = "prepare.mkdir", displayName = "Prepare: Make Directory"),
|
||||
@ModuleConfiguration(key = "prepare.delete", displayName = "Prepare: Delete"),
|
||||
@ModuleConfiguration(key = "job-xml", displayName = "Job Xml"),
|
||||
@ModuleConfiguration(key = "configuration", displayName = "Configuration"),
|
||||
@ModuleConfiguration(key = "main-class", displayName = "Main Class"),
|
||||
@ModuleConfiguration(key = "java-opts", displayName = "Java Options"),
|
||||
@ModuleConfiguration(key = "arg", displayName = "Arguments"),
|
||||
@ModuleConfiguration(key = "file", displayName = "File"),
|
||||
@ModuleConfiguration(key = "archive", displayName = "Archive"),
|
||||
@ModuleConfiguration(key = "capture-output", displayName = "Capture Output") }, inputs = { @ModuleEndpoint(name = "from", minNumber = 1, maxNumber = 1) }, outputs = {
|
||||
@ModuleEndpoint(name = "to", minNumber = 1, maxNumber = 1),
|
||||
@ModuleEndpoint(name = "error", minNumber = 1, maxNumber = 1) })
|
||||
public class JavaModule implements ModuleMetadata {
|
||||
// TODO: Fix it
|
||||
public String generate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
String name = configurations.get("name");
|
||||
String arg = configurations.get("arg");
|
||||
String ok = configurations.get("ok");
|
||||
String mainClass = configurations.get("mainClass");
|
||||
|
||||
Class<?> moduleClass = this.getClass();
|
||||
assert (moduleClass.isAnnotationPresent(haflow.module.Module.class));
|
||||
// String moduleName =
|
||||
// moduleClass.getAnnotation(haflow.module.Module.class).name();
|
||||
|
||||
String actionXML = "<action name=\"" + name + "\">" + "\n" + "<java>"
|
||||
+ "\n" + "<job-tracker>${jobTracker}</job-tracker>" + "\n"
|
||||
+ "<name-node>${nameNode}</name-node>" + "\n"
|
||||
+ "<configuration>" + "\n" + "<property>" + "\n"
|
||||
+ "<name>mapred.job.queue.name</name>" + "\n"
|
||||
+ "<value>${queueName}</value>" + "\n" + "</property>" + "\n"
|
||||
+ "</configuration>" + "\n" + "<main-class>" + mainClass
|
||||
+ "</main-class>"
|
||||
+ "\n"
|
||||
+ // TODO
|
||||
"<arg>" + "-eee " + arg + "</arg>" + "\n" + "</java>" + "\n"
|
||||
+ "<ok to=\"" + ok + "\"/>" + "\n" + "<error to=\"fail\"/>"
|
||||
+ "\n" + "</action>";
|
||||
return actionXML;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
package haflow.module.oozie;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Module(id = "a208d7d2-a8ff-2493-64c2-36f50bc95752", name = "Start", category = "Oozie", configurations = {}, inputs = {}, outputs = { @ModuleEndpoint(name = "to", maxNumber = 1, minNumber = 1) })
|
||||
public class StartModule implements ModuleMetadata {
|
||||
|
||||
public String generate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
String xml = "<start to=\"" + outputs.get("ok").getName() + "\"/>";
|
||||
return xml;
|
||||
}
|
||||
|
||||
}
|
|
@ -3,39 +3,20 @@ package haflow.module.zrace;
|
|||
import haflow.entity.Node;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Module(id = "6e744dc4-edc6-eca2-07d5-28ff55a75b2d", name = "Preprocess", category = "zrace", configurations = {
|
||||
@ModuleConfiguration(key = "input_path", displayName = "input path"),
|
||||
@ModuleConfiguration(key = "output_path", displayName = "output path") }, inputs = {}, outputs = {})
|
||||
public class ZracePreprocess implements ModuleMetadata {
|
||||
public class ZracePreprocess extends AbstractModule {
|
||||
|
||||
// TODO: Fix it
|
||||
public String generate(Map<String, String> configurations,
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
String inputPath = configurations.get("input_path");
|
||||
String outputPath = configurations.get("output_path");
|
||||
|
||||
Class<?> moduleClass = this.getClass();
|
||||
assert (moduleClass.isAnnotationPresent(haflow.module.Module.class));
|
||||
String moduleName = moduleClass.getAnnotation(
|
||||
haflow.module.Module.class).name();
|
||||
|
||||
String actionXML = "<action name=\"" + moduleName + "\">" + "\n"
|
||||
+ "<java>" + "<job-tracker>${jobTracker}</job-tracker>" + "\n"
|
||||
+ "<name-node>${nameNode}</name-node>" + "\n"
|
||||
+ "<configuration>" + "\n" + "<property>" + "\n"
|
||||
+ "<name>mapred.job.queue.name</name>" + "\n"
|
||||
+ "<value>${queueName}</value>" + "\n" + "</property>" + "\n"
|
||||
+ "</configuration>" + "\n" + "<main-class>"
|
||||
+ this.getClass().getName() + "</main-class>" + "\n" + "<arg>"
|
||||
+ inputPath + "</arg>" + "\n" + "<arg>" + outputPath + "</arg>"
|
||||
+ "\n" + "</java>" + "\n" +
|
||||
// "<ok to=\"end\"/>" + "\n" + //TODO
|
||||
"<error to=\"fail\"/>" + "\n" + "</action>";
|
||||
return actionXML;
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -4,9 +4,8 @@ import haflow.entity.Flow;
|
|||
import haflow.entity.Node;
|
||||
import haflow.flow.DirectedGraph;
|
||||
import haflow.flow.TopologicalSort;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.oozie.EndModule;
|
||||
import haflow.module.oozie.StartModule;
|
||||
import haflow.module.basic.EndModule;
|
||||
import haflow.module.basic.StartModule;
|
||||
import haflow.profile.NodeConfiguration;
|
||||
import haflow.ui.model.RunFlowResultModel;
|
||||
import haflow.utility.ClusterConfiguration;
|
||||
|
@ -259,8 +258,10 @@ public class FlowExecuteService {
|
|||
}
|
||||
int w = sorted.get(i);
|
||||
Node node = graph.getNode(w);
|
||||
Class<?> moduleClass = moduleClasses.get(node.getModuleId());
|
||||
ModuleMetadata module = (ModuleMetadata) moduleClass.newInstance();
|
||||
// TODO
|
||||
// Class<?> moduleClass = moduleClasses.get(node.getModuleId());
|
||||
// AbstractModule module = (AbstractModule)
|
||||
// moduleClass.newInstance();
|
||||
Map<String, String> configurations = new HashMap<String, String>();
|
||||
configurations.put("name", node.getName());
|
||||
List<NodeConfiguration> ncps = this.getNodeConfigurationService()
|
||||
|
@ -278,9 +279,10 @@ public class FlowExecuteService {
|
|||
break;// TODO
|
||||
}
|
||||
}
|
||||
String part = module.generate(configurations,
|
||||
graph.getInputs(node), graph.getOutputs(node));
|
||||
workflowXml.append(part + "\n");
|
||||
// TODO
|
||||
// String part = module.generate(configurations,
|
||||
// graph.getInputs(node), graph.getOutputs(node));
|
||||
// workflowXml.append(part + "\n");
|
||||
}
|
||||
workflowXml.append("</workflow-app>" + "\n");
|
||||
return workflowXml.toString();
|
||||
|
|
|
@ -55,6 +55,7 @@ public class ModuleHelper {
|
|||
model.setMaxNumber(input.maxNumber());
|
||||
model.setMinNumber(input.minNumber());
|
||||
model.setName(input.name());
|
||||
model.setDataType(input.dataType().toString());
|
||||
moduleBriefModel.getInputs().add(model);
|
||||
}
|
||||
for (ModuleEndpoint output : module.outputs()) {
|
||||
|
@ -62,6 +63,7 @@ public class ModuleHelper {
|
|||
model.setMaxNumber(output.maxNumber());
|
||||
model.setMinNumber(output.minNumber());
|
||||
model.setName(output.name());
|
||||
model.setDataType(output.dataType().toString());
|
||||
moduleBriefModel.getOutputs().add(model);
|
||||
}
|
||||
moduleListModel.getModules().add(moduleBriefModel);
|
||||
|
|
|
@ -8,6 +8,7 @@ public class ModuleEndpointModel {
|
|||
private String name;
|
||||
private int minNumber;
|
||||
private int maxNumber;
|
||||
private String dataType;
|
||||
|
||||
@XmlElement
|
||||
public String getName() {
|
||||
|
@ -35,4 +36,14 @@ public class ModuleEndpointModel {
|
|||
public void setMaxNumber(int maxNumber) {
|
||||
this.maxNumber = maxNumber;
|
||||
}
|
||||
|
||||
@XmlElement
|
||||
public String getDataType() {
|
||||
return dataType;
|
||||
}
|
||||
|
||||
public void setDataType(String dataType) {
|
||||
this.dataType = dataType;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package haflow.utility;
|
||||
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -24,17 +24,17 @@ public class ModuleLoader {
|
|||
this.classHelper = classHelper;
|
||||
}
|
||||
|
||||
public Map<Module, ModuleMetadata> searchForModules(String packageName) {
|
||||
public Map<Module, AbstractModule> searchForModules(String packageName) {
|
||||
try {
|
||||
Map<Module, ModuleMetadata> modules = new HashMap<Module, ModuleMetadata>();
|
||||
Map<Module, AbstractModule> modules = new HashMap<Module, AbstractModule>();
|
||||
List<String> classNames = this.getClassHelper().getClassNames(
|
||||
packageName, true);
|
||||
for (String className : classNames) {
|
||||
Class<?> moduleClass = Class.forName(className);
|
||||
if (moduleClass.isAnnotationPresent(Module.class)) {
|
||||
Object obj = moduleClass.newInstance();
|
||||
if (obj instanceof ModuleMetadata) {
|
||||
ModuleMetadata metadata = (ModuleMetadata) obj;
|
||||
if (obj instanceof AbstractModule) {
|
||||
AbstractModule metadata = (AbstractModule) obj;
|
||||
Module module = moduleClass.getAnnotation(Module.class);
|
||||
modules.put(module, metadata);
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ public class ModuleLoader {
|
|||
Class<?> moduleClass = Class.forName(className);
|
||||
if (moduleClass.isAnnotationPresent(Module.class)) {
|
||||
Object obj = moduleClass.newInstance();
|
||||
if (obj instanceof ModuleMetadata) {
|
||||
if (obj instanceof AbstractModule) {
|
||||
moduleClasses.put(
|
||||
UUID.fromString(moduleClass.getAnnotation(
|
||||
Module.class).id()), moduleClass);
|
||||
|
@ -71,7 +71,7 @@ public class ModuleLoader {
|
|||
}
|
||||
}
|
||||
|
||||
public Map<Module, ModuleMetadata> searchForModules() {
|
||||
public Map<Module, AbstractModule> searchForModules() {
|
||||
return this.searchForModules("haflow.module");
|
||||
}
|
||||
|
||||
|
|
|
@ -567,25 +567,25 @@ HAFlow.Main.prototype.onModuleAdded = function(instance, flowId, event, ui) {
|
|||
|
||||
HAFlow.Main.prototype.doAddModule = function(instance, flowId, moduleId, left,
|
||||
top) {
|
||||
instance.flows[flowId].currentNewNodeNumber = -1;
|
||||
var currentNewNodeNumber = -1;
|
||||
var i;
|
||||
var j;
|
||||
for (i = 0; i < instance.flows[flowId].nodes.length; i++) {
|
||||
var pattern = /^NewNode(\d+)$/;
|
||||
var matches = pattern.exec(instance.flows[flowId].nodes[i].name);
|
||||
for (j = 0; j < matches.length; j++) {
|
||||
if (parseInt(matches[j]) > instance.flows[flowId].currentNewNodeNumber) {
|
||||
instance.flows[flowId].currentNewNodeNumber = parseInt(matches[j]);
|
||||
if (parseInt(matches[j]) > currentNewNodeNumber) {
|
||||
currentNewNodeNumber = parseInt(matches[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
instance.flows[flowId].currentNewNodeNumber++;
|
||||
currentNewNodeNumber++;
|
||||
var newNode = {};
|
||||
var id = HAFlow.generateUUID();
|
||||
newNode["id"] = id;
|
||||
newNode["flowId"] = flowId;
|
||||
newNode["moduleId"] = moduleId;
|
||||
newNode["name"] = "NewNode" + instance.flows[flowId].currentNewNodeNumber;
|
||||
newNode["name"] = "NewNode" + currentNewNodeNumber;
|
||||
newNode["position"] = {};
|
||||
newNode.position["left"] = left;
|
||||
newNode.position["top"] = top;
|
||||
|
@ -724,47 +724,44 @@ HAFlow.Main.prototype.initNodes = function(flowId) {
|
|||
this.flows[flowId].nodes[i].id);
|
||||
var module = this.getModuleById(this, node.moduleId);
|
||||
|
||||
_addEndpoints = function(instance, flowId, nodeId, module) {
|
||||
var inputAnchors = new Array();
|
||||
var outputAnchors = new Array();
|
||||
inputAnchors = [ [ 0, 0.5, -1, 0 ], [ 0, 0.1, -1, 0 ],
|
||||
[ 0, 0.9, -1, 0 ] ];
|
||||
outputAnchors = [ [ 1, 0.5, 1, 0 ], [ 1, 0.1, 1, 0 ],
|
||||
[ 1, 0.9, 1, 0 ] ];
|
||||
this.addEndpoints(this, flowId, nodeId, module, sourceEndpoint,
|
||||
targetEndpoint);
|
||||
}
|
||||
};
|
||||
|
||||
var k = 0;
|
||||
for ( var i = 0; i < module.outputs.length; i++, k++) {
|
||||
var sourceId = nodeId + "_" + module.outputs[i].name;
|
||||
instance.jsPlumb[flowId].allSourceEndpoints
|
||||
.push(instance.jsPlumb[flowId].addEndpoint(nodeId,
|
||||
sourceEndpoint, {
|
||||
anchor : outputAnchors[k
|
||||
% outputAnchors.length],
|
||||
uuid : sourceId,
|
||||
overlays : [ [ "Label", {
|
||||
location : [ 0.5, -0.5 ],
|
||||
label : module.outputs[i].name
|
||||
} ] ]
|
||||
}));
|
||||
}
|
||||
k = 0;
|
||||
for ( var j = 0; j < module.inputs.length; j++, k++) {
|
||||
var targetId = nodeId + "_" + module.inputs[j].name;
|
||||
instance.jsPlumb[flowId].allTargetEndpoints
|
||||
.push(instance.jsPlumb[flowId].addEndpoint(nodeId,
|
||||
targetEndpoint, {
|
||||
anchor : inputAnchors[k
|
||||
% inputAnchors.length],
|
||||
uuid : targetId,
|
||||
overlays : [ [ "Label", {
|
||||
location : [ 0.5, -0.5 ],
|
||||
label : module.inputs[j].name
|
||||
} ] ]
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
_addEndpoints(this, flowId, nodeId, module);
|
||||
HAFlow.Main.prototype.addEndpoints = function(instance, flowId, nodeId, module,
|
||||
sourceEndpoint, targetEndpoint) {
|
||||
var k = 0;
|
||||
for ( var i = 0; i < module.outputs.length; i++, k++) {
|
||||
var sourceId = nodeId + "_" + module.outputs[i].name;
|
||||
instance.jsPlumb[flowId].allSourceEndpoints
|
||||
.push(instance.jsPlumb[flowId].addEndpoint(nodeId,
|
||||
sourceEndpoint, {
|
||||
anchor : [ 1,
|
||||
1 / (module.outputs.length + 1) * (i + 1),
|
||||
1, 0 ],
|
||||
uuid : sourceId,
|
||||
overlays : [ [ "Label", {
|
||||
location : [ 0.5, -0.5 ],
|
||||
label : module.outputs[i].name
|
||||
} ] ]
|
||||
}));
|
||||
}
|
||||
k = 0;
|
||||
for ( var j = 0; j < module.inputs.length; j++, k++) {
|
||||
var targetId = nodeId + "_" + module.inputs[j].name;
|
||||
instance.jsPlumb[flowId].allTargetEndpoints
|
||||
.push(instance.jsPlumb[flowId].addEndpoint(nodeId,
|
||||
targetEndpoint, {
|
||||
anchor : [ 0,
|
||||
1 / (module.inputs.length + 1) * (j + 1),
|
||||
-1, 0 ],
|
||||
uuid : targetId,
|
||||
overlays : [ [ "Label", {
|
||||
location : [ 0.5, -0.5 ],
|
||||
label : module.inputs[j].name
|
||||
} ] ]
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
import haflow.utility.ModuleLoader;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
@ -28,7 +28,7 @@ public class ModuleLoaderTest extends AbstractJUnit4SpringContextTests {
|
|||
|
||||
@Test
|
||||
public void testSearchForModule() {
|
||||
Map<Module, ModuleMetadata> map = this.getModuleLoader()
|
||||
Map<Module, AbstractModule> map = this.getModuleLoader()
|
||||
.searchForModules();
|
||||
Assert.assertNotNull(map);
|
||||
Assert.assertTrue(map.keySet().size() > 0);
|
||||
|
@ -44,12 +44,12 @@ public class ModuleLoaderTest extends AbstractJUnit4SpringContextTests {
|
|||
|
||||
@Test
|
||||
public void testSearchForModuleInAnotherPackage() {
|
||||
Map<Module, ModuleMetadata> map = this.getModuleLoader()
|
||||
Map<Module, AbstractModule> map = this.getModuleLoader()
|
||||
.searchForModules("haflow.test");
|
||||
Assert.assertNotNull(map);
|
||||
Assert.assertTrue(map.keySet().size() > 0);
|
||||
boolean contains = false;
|
||||
ModuleMetadata metadata = null;
|
||||
AbstractModule metadata = null;
|
||||
for (Module module : map.keySet()) {
|
||||
if (module.name().equals("TestModule")) {
|
||||
contains = true;
|
||||
|
@ -59,7 +59,6 @@ public class ModuleLoaderTest extends AbstractJUnit4SpringContextTests {
|
|||
}
|
||||
Assert.assertTrue(contains);
|
||||
Assert.assertNotNull(metadata);
|
||||
Assert.assertEquals("Test", metadata.generate(null, null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1,17 +1,22 @@
|
|||
package haflow.test;
|
||||
|
||||
import haflow.entity.Node;
|
||||
import haflow.module.DataType;
|
||||
import haflow.module.Module;
|
||||
import haflow.module.ModuleConfiguration;
|
||||
import haflow.module.ModuleEndpoint;
|
||||
import haflow.module.ModuleMetadata;
|
||||
import haflow.module.AbstractModule;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Module(category = "Test", configurations = { @ModuleConfiguration(key = "test", displayName = "Test") }, id = "92c5e828-0d02-bc7f-8825-7bbb6f48f2f1", inputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testInput") }, name = "TestModule", outputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testOutput") })
|
||||
public class TestModule implements ModuleMetadata {
|
||||
public String generate(Map<String, String> configurations,
|
||||
@Module(category = "Test", configurations = { @ModuleConfiguration(key = "test", displayName = "Test") }, id = "92c5e828-0d02-bc7f-8825-7bbb6f48f2f1", inputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testInput", dataType = DataType.PlainText) }, name = "TestModule", outputs = { @ModuleEndpoint(maxNumber = 1, minNumber = 1, name = "testOutput", dataType = DataType.PlainText) })
|
||||
public class TestModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
public boolean validate(Map<String, String> configurations,
|
||||
Map<String, Node> inputs, Map<String, Node> outputs) {
|
||||
return "Test";
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue