diff --git a/src/main/java/haflow/engine/oozie/OozieEngine.java b/src/main/java/haflow/engine/oozie/OozieEngine.java index 12fb43f..33a34b0 100644 --- a/src/main/java/haflow/engine/oozie/OozieEngine.java +++ b/src/main/java/haflow/engine/oozie/OozieEngine.java @@ -8,7 +8,9 @@ import haflow.engine.AbstractEngine; import haflow.engine.Engine; import haflow.engine.RunFlowResult; import haflow.engine.ValidateFlowResult; +import haflow.engine.model.AdjMatrixNode; import haflow.engine.model.DirectedGraph; +import haflow.module.DataType; import haflow.module.Module; import haflow.module.ModuleConfiguration; import haflow.module.ModuleEndpoint; @@ -17,6 +19,7 @@ import haflow.service.HdfsService; import haflow.service.NodeConfigurationService; import haflow.util.ClusterConfiguration; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -49,8 +52,9 @@ public class OozieEngine extends AbstractEngine { Map> moduleClasses = this.moduleUtil .searchForModuleClasses(); + DirectedGraph originalGraph = new DirectedGraph(flow.getNodes(), flow.getEdges()); messageBuilder.append("Start validating flow inputs and outputs ..."); - boolean isValidGraph = true;//validateGraph(graph, moduleClasses, messageBuilder);//TODO + boolean isValidGraph = validateGraph(originalGraph, moduleClasses, messageBuilder);//TODO if (isValidGraph) { messageBuilder.append(" done" + "
"); @@ -63,7 +67,7 @@ public class OozieEngine extends AbstractEngine { String flowName = flow.getName(); String workflowXml = null; try { - workflowXml = this.oozieFlowXmlGenerator.genWorkflowXml(flow, moduleClasses, messageBuilder); + workflowXml = this.oozieFlowXmlGenerator.genWorkflowXml(flow, originalGraph, moduleClasses, messageBuilder); } catch (Exception e) { messageBuilder.append(e.getMessage()); } @@ -146,10 +150,89 @@ public class OozieEngine extends AbstractEngine { return jarPaths; } + /** + * validate graph + * @param graph + * @param moduleClasses + * @param messageBuilder + * @return + */ private boolean validateGraph(DirectedGraph graph, Map> moduleClasses, StringBuilder messageBuilder) { + boolean idtm = isDataTypeMatch(graph, moduleClasses, messageBuilder); + if(idtm == false) + return false; + boolean veio = hasNoEmptyInputOutput(graph, moduleClasses, messageBuilder); + if( veio == false) + return false; + return true; + + } + + private boolean isDataTypeMatch(DirectedGraph graph, + Map> moduleClasses, StringBuilder messageBuilder){ List edges = graph.getEdgeList(); + Map sourceDataTypeMap = new HashMap(); + Map targetDataTypeMap = new HashMap(); + + for( Edge edge : edges ){ + Node sourceNode = edge.getSourceNode(); + Class sourceNodeModuleClass = moduleClasses.get(sourceNode.getModuleId()); + Module sourceNodeModule = sourceNodeModuleClass.getAnnotation(Module.class); + ModuleEndpoint[] outputs = sourceNodeModule.outputs(); + DataType sourceEndPointDataType = null; + for( ModuleEndpoint output : outputs){ + if( output.name().equals(edge.getSourceEndpoint()) ){ + sourceEndPointDataType = output.dataType(); + } + } + + Node targetNode = edge.getTargetNode(); + Class targetNodeModuleClass = moduleClasses.get(targetNode.getModuleId()); + Module targetNodeModule = targetNodeModuleClass.getAnnotation(Module.class); + ModuleEndpoint[] inputs = targetNodeModule.inputs(); + DataType targetEndPointDataType = null; + for( ModuleEndpoint input : inputs ){ + if( input.name().equals(edge.getTargetEndpoint())){ + targetEndPointDataType = input.dataType(); + } + } + + if( !sourceDataTypeMap.containsKey(edge)){ + sourceDataTypeMap.put(edge, sourceEndPointDataType); + } + if(targetEndPointDataType == DataType.AUTO){ + int nodeIndex = graph.getNodeIndex(targetNode); + List adjEdges = graph.getAdjacent(nodeIndex); + for(AdjMatrixNode adjEdge : adjEdges ){ + targetDataTypeMap.put(adjEdge.getPath(), sourceEndPointDataType); + } + }else{ + if( !targetDataTypeMap.containsKey(edge)){ + targetDataTypeMap.put(edge, targetEndPointDataType); + } + } + } + + //validate data type of each edge + for( Edge edge : edges ){ + DataType sourceEndPointDataType = sourceDataTypeMap.get(edge); + DataType targetEndPointDataType = targetDataTypeMap.get(edge); + + if( !DataType.matches(sourceEndPointDataType, targetEndPointDataType) ){ + messageBuilder.append("
" + "Data type mismatch: " + edge.getSourceNode().getName() + "." + + edge.getSourceEndpoint() + " to " + edge.getTargetNode().getName() + "." + edge.getTargetEndpoint()); + return false; + } + } + return true; + } + + private boolean hasNoEmptyInputOutput(DirectedGraph graph, + Map> moduleClasses, StringBuilder messageBuilder){ + List edges = graph.getEdgeList(); + //check the graph for (int i = 0; i < graph.getNodeCount(); i++) { Node node = graph.getNode(i); UUID moduleId = node.getModuleId(); @@ -158,6 +241,7 @@ public class OozieEngine extends AbstractEngine { ModuleEndpoint[] inputs = annotation.inputs(); ModuleEndpoint[] outputs = annotation.outputs(); + //check empty input for (ModuleEndpoint input : inputs) { String inputName = input.name(); boolean found = false; @@ -169,11 +253,12 @@ public class OozieEngine extends AbstractEngine { } } if( !found){ - messageBuilder.append("Empty input of node " + node.getName() + " input " + inputName); + messageBuilder.append("
" + "Empty input of node " + node.getName() + " input " + inputName); return false; } } + //check empty output for(ModuleEndpoint output : outputs){ String outputName = output.name(); boolean found = false; diff --git a/src/main/java/haflow/engine/oozie/OozieFlowXmlGenerator.java b/src/main/java/haflow/engine/oozie/OozieFlowXmlGenerator.java index 3290079..e620e60 100644 --- a/src/main/java/haflow/engine/oozie/OozieFlowXmlGenerator.java +++ b/src/main/java/haflow/engine/oozie/OozieFlowXmlGenerator.java @@ -24,11 +24,11 @@ import org.w3c.dom.Document; @Component public class OozieFlowXmlGenerator { - public String genWorkflowXml(Flow flow, Map> moduleClasses, StringBuilder messageBuilder) + public String genWorkflowXml(Flow flow, DirectedGraph originalGraph, Map> moduleClasses, StringBuilder messageBuilder) throws Exception { messageBuilder.append("Start sorting nodes ..."); - DirectedGraph originalGraph = new DirectedGraph(flow.getNodes(), flow.getEdges()); +// DirectedGraph originalGraph = new DirectedGraph(flow.getNodes(), flow.getEdges()); Map> inputConfigurations = new HashMap>(); DirectedGraph graph = this.graphTransformer.transform(flow, originalGraph, moduleClasses, inputConfigurations); List sorted = new TopologicalSort(graph).getOrder(); diff --git a/src/main/java/haflow/module/AbstractJavaModule.java b/src/main/java/haflow/module/AbstractJavaModule.java index 508f6fa..416bd88 100644 --- a/src/main/java/haflow/module/AbstractJavaModule.java +++ b/src/main/java/haflow/module/AbstractJavaModule.java @@ -17,31 +17,15 @@ public abstract class AbstractJavaModule extends AbstractModule { List result = new ArrayList(); for (ModuleEndpoint input : inputs) { - switch (input.dataType()) { - case PlainText: - String textValue = configurations.get(input.name()).trim(); - result.add("--" + input.name()); - result.add(textValue); - break; - case None: - default: - System.out.println("Invalid Parameters!"); - break; - } + String textValue = configurations.get(input.name()).trim(); + result.add("--" + input.name()); + result.add(textValue); } for (ModuleEndpoint output : outputs) { - switch (output.dataType()) { - case PlainText: - String textValue = configurations.get(output.name()).trim(); - result.add("--" + output.name()); - result.add(textValue); - break; - case None: - default: - System.out.println("Invalid Parameters!"); - break; - } + String textValue = configurations.get(output.name()).trim(); + result.add("--" + output.name()); + result.add(textValue); } for (String key : configurations.keySet()) { diff --git a/src/main/java/haflow/module/DataType.java b/src/main/java/haflow/module/DataType.java index 7775ee7..a0f6011 100644 --- a/src/main/java/haflow/module/DataType.java +++ b/src/main/java/haflow/module/DataType.java @@ -1,5 +1,35 @@ package haflow.module; public enum DataType { - None, PlainText + PlainText, Csv, Arff, Image, Jpg, Model, ALL, AUTO; + + public static boolean matches(DataType sourceDataType, DataType targetDataType){ + if( sourceDataType == DataType.AUTO || targetDataType == DataType.AUTO){ + System.out.println("Error: AUTO datatype"); + return false; + } + if( sourceDataType == targetDataType ) + return true; + + if( contains(targetDataType, sourceDataType)) + return true; + + return false; + } + + private static boolean contains(DataType targetDataType, DataType sourceDataType){ + if( targetDataType == DataType.ALL) + return true; + + if( targetDataType == PlainText){ + if( sourceDataType == Csv || sourceDataType == Arff) + return true; + } + + if( targetDataType == Image){ + if( sourceDataType == Jpg) + return true; + } + return false; + } } diff --git a/src/main/java/haflow/module/data/input/AllFileInpuModule.java b/src/main/java/haflow/module/data/input/AllFileInpuModule.java new file mode 100644 index 0000000..164a800 --- /dev/null +++ b/src/main/java/haflow/module/data/input/AllFileInpuModule.java @@ -0,0 +1,26 @@ +package haflow.module.data.input; + +import haflow.module.AbstractModule; +import haflow.module.DataType; +import haflow.module.Module; +import haflow.module.ModuleConfiguration; +import haflow.module.ModuleConfigurationType; +import haflow.module.ModuleEndpoint; +import haflow.module.ModuleType; + +import java.util.Map; + +@Module(id = "ada611a8-aa63-968a-ca46-4356a0e0bdab", name = "AllSourceFile", category = "Data-Source", type = ModuleType.SOURCE, + configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)}, + inputs = { }, + outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.ALL)}) +public class AllFileInpuModule extends AbstractModule { + + @Override + public boolean validate(Map configurations, + Map inputs, Map outputs) { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/src/main/java/haflow/module/datamining/ArffFileModule.java b/src/main/java/haflow/module/data/input/ArffFileInpuModule.java similarity index 77% rename from src/main/java/haflow/module/datamining/ArffFileModule.java rename to src/main/java/haflow/module/data/input/ArffFileInpuModule.java index 1eeb65f..1e1fee3 100644 --- a/src/main/java/haflow/module/datamining/ArffFileModule.java +++ b/src/main/java/haflow/module/data/input/ArffFileInpuModule.java @@ -1,4 +1,4 @@ -package haflow.module.datamining; +package haflow.module.data.input; import haflow.module.AbstractModule; import haflow.module.DataType; @@ -10,11 +10,11 @@ import haflow.module.ModuleType; import java.util.Map; -@Module(id = "ada611a8-aa63-968a-ca46-4356a0e0bdab", name = "ArffSourceFile", category = "Data-Source", type = ModuleType.SOURCE, +@Module(id = "ada611a8-aa63-968a-1146-4356a0e0bdab", name = "ArffSourceFile", category = "Data-Source", type = ModuleType.SOURCE, configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)}, inputs = { }, - outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText)}) -public class ArffFileModule extends AbstractModule { + outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.Arff)}) +public class ArffFileInpuModule extends AbstractModule { @Override public boolean validate(Map configurations, @@ -23,5 +23,4 @@ public class ArffFileModule extends AbstractModule { return false; } - } diff --git a/src/main/java/haflow/module/data/input/PlainTextInputFileModule.java b/src/main/java/haflow/module/data/input/PlainTextInputFileModule.java new file mode 100644 index 0000000..45fed32 --- /dev/null +++ b/src/main/java/haflow/module/data/input/PlainTextInputFileModule.java @@ -0,0 +1,26 @@ +package haflow.module.data.input; + +import haflow.module.AbstractModule; +import haflow.module.DataType; +import haflow.module.Module; +import haflow.module.ModuleConfiguration; +import haflow.module.ModuleConfigurationType; +import haflow.module.ModuleEndpoint; +import haflow.module.ModuleType; + +import java.util.Map; + +@Module(id = "34a611a8-aa63-968a-ca46-4356a0e0bdab", name = "TextSourceFile", category = "Data-Source", type = ModuleType.SOURCE, + configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)}, + inputs = { }, + outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText)}) +public class PlainTextInputFileModule extends AbstractModule { + + @Override + public boolean validate(Map configurations, + Map inputs, Map outputs) { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/src/main/java/haflow/module/data/output/AllOutputFileModule.java b/src/main/java/haflow/module/data/output/AllOutputFileModule.java new file mode 100644 index 0000000..fa2274f --- /dev/null +++ b/src/main/java/haflow/module/data/output/AllOutputFileModule.java @@ -0,0 +1,26 @@ +package haflow.module.data.output; + +import java.util.Map; + +import haflow.module.AbstractModule; +import haflow.module.DataType; +import haflow.module.Module; +import haflow.module.ModuleConfiguration; +import haflow.module.ModuleConfigurationType; +import haflow.module.ModuleEndpoint; +import haflow.module.ModuleType; + +@Module(id = "12a611a8-aa63-118a-ca46-4356a0e0bdab", name = "AllTargetFile", category = "Data-Target", type = ModuleType.DEST, + configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)}, + inputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.ALL) }, + outputs = {}) +public class AllOutputFileModule extends AbstractModule { + + @Override + public boolean validate(Map configurations, + Map inputs, Map outputs) { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/src/main/java/haflow/module/data/output/ArffOutputFileModule.java b/src/main/java/haflow/module/data/output/ArffOutputFileModule.java new file mode 100644 index 0000000..75044ec --- /dev/null +++ b/src/main/java/haflow/module/data/output/ArffOutputFileModule.java @@ -0,0 +1,26 @@ +package haflow.module.data.output; + +import java.util.Map; + +import haflow.module.AbstractModule; +import haflow.module.DataType; +import haflow.module.Module; +import haflow.module.ModuleConfiguration; +import haflow.module.ModuleConfigurationType; +import haflow.module.ModuleEndpoint; +import haflow.module.ModuleType; + +@Module(id = "21a611a8-aa63-118a-ca46-4356a0e0bdab", name = "ArffTargetFile", category = "Data-Target", type = ModuleType.DEST, + configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)}, + inputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.Arff) }, + outputs = {}) +public class ArffOutputFileModule extends AbstractModule { + + @Override + public boolean validate(Map configurations, + Map inputs, Map outputs) { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/src/main/java/haflow/module/datamining/TxtOutputModule.java b/src/main/java/haflow/module/data/output/PlainTextFileOutputModule.java similarity index 74% rename from src/main/java/haflow/module/datamining/TxtOutputModule.java rename to src/main/java/haflow/module/data/output/PlainTextFileOutputModule.java index e84b10b..a073fc6 100644 --- a/src/main/java/haflow/module/datamining/TxtOutputModule.java +++ b/src/main/java/haflow/module/data/output/PlainTextFileOutputModule.java @@ -1,4 +1,4 @@ -package haflow.module.datamining; +package haflow.module.data.output; import java.util.Map; @@ -10,11 +10,11 @@ import haflow.module.ModuleConfigurationType; import haflow.module.ModuleEndpoint; import haflow.module.ModuleType; -@Module(id = "ada611a8-aa63-118a-ca46-4356a0e0bdab", name = "TextDestFile", category = "Data-Source", type = ModuleType.DEST, +@Module(id = "ada611a8-aa63-118a-ca46-4356a0e1cdab", name = "TextTargetFile", category = "Data-Target", type = ModuleType.DEST, configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)}, inputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, outputs = {}) -public class TxtOutputModule extends AbstractModule { +public class PlainTextFileOutputModule extends AbstractModule { @Override public boolean validate(Map configurations, diff --git a/src/main/java/haflow/module/datamining/DescribeModule.java b/src/main/java/haflow/module/datamining/DescribeModule.java index c69c2ca..bbec01f 100644 --- a/src/main/java/haflow/module/datamining/DescribeModule.java +++ b/src/main/java/haflow/module/datamining/DescribeModule.java @@ -15,7 +15,7 @@ import java.util.Map; @ModuleConfiguration(key = "descriptor", displayName = "descriptor: Data descriptor", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT), @ModuleConfiguration(key = "regression", displayName = "regression: Regression Problem", pattern = "^(.*)$", type = ModuleConfigurationType.BOOLEAN) }, inputs = { - @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }, + @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.Arff) }, outputs = { @ModuleEndpoint(name = "file", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) }) public class DescribeModule extends AbstractJavaModule {