Add middle data type verification.

This commit is contained in:
zhaowei8188127 2013-11-26 10:53:22 +08:00
parent 221da6294b
commit d317573185
11 changed files with 239 additions and 37 deletions

View File

@ -8,7 +8,9 @@ import haflow.engine.AbstractEngine;
import haflow.engine.Engine;
import haflow.engine.RunFlowResult;
import haflow.engine.ValidateFlowResult;
import haflow.engine.model.AdjMatrixNode;
import haflow.engine.model.DirectedGraph;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
@ -17,6 +19,7 @@ import haflow.service.HdfsService;
import haflow.service.NodeConfigurationService;
import haflow.util.ClusterConfiguration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -49,8 +52,9 @@ public class OozieEngine extends AbstractEngine {
Map<UUID, Class<?>> moduleClasses = this.moduleUtil
.searchForModuleClasses();
DirectedGraph originalGraph = new DirectedGraph(flow.getNodes(), flow.getEdges());
messageBuilder.append("Start validating flow inputs and outputs ...");
boolean isValidGraph = true;//validateGraph(graph, moduleClasses, messageBuilder);//TODO
boolean isValidGraph = validateGraph(originalGraph, moduleClasses, messageBuilder);//TODO
if (isValidGraph) {
messageBuilder.append(" done" + "<br>");
@ -63,7 +67,7 @@ public class OozieEngine extends AbstractEngine {
String flowName = flow.getName();
String workflowXml = null;
try {
workflowXml = this.oozieFlowXmlGenerator.genWorkflowXml(flow, moduleClasses, messageBuilder);
workflowXml = this.oozieFlowXmlGenerator.genWorkflowXml(flow, originalGraph, moduleClasses, messageBuilder);
} catch (Exception e) {
messageBuilder.append(e.getMessage());
}
@ -146,10 +150,89 @@ public class OozieEngine extends AbstractEngine {
return jarPaths;
}
/**
* validate graph
* @param graph
* @param moduleClasses
* @param messageBuilder
* @return
*/
private boolean validateGraph(DirectedGraph graph,
Map<UUID, Class<?>> moduleClasses, StringBuilder messageBuilder) {
boolean idtm = isDataTypeMatch(graph, moduleClasses, messageBuilder);
if(idtm == false)
return false;
boolean veio = hasNoEmptyInputOutput(graph, moduleClasses, messageBuilder);
if( veio == false)
return false;
return true;
}
private boolean isDataTypeMatch(DirectedGraph graph,
Map<UUID, Class<?>> moduleClasses, StringBuilder messageBuilder){
List<Edge> edges = graph.getEdgeList();
Map<Edge, DataType> sourceDataTypeMap = new HashMap<Edge, DataType>();
Map<Edge, DataType> targetDataTypeMap = new HashMap<Edge, DataType>();
for( Edge edge : edges ){
Node sourceNode = edge.getSourceNode();
Class<?> sourceNodeModuleClass = moduleClasses.get(sourceNode.getModuleId());
Module sourceNodeModule = sourceNodeModuleClass.getAnnotation(Module.class);
ModuleEndpoint[] outputs = sourceNodeModule.outputs();
DataType sourceEndPointDataType = null;
for( ModuleEndpoint output : outputs){
if( output.name().equals(edge.getSourceEndpoint()) ){
sourceEndPointDataType = output.dataType();
}
}
Node targetNode = edge.getTargetNode();
Class<?> targetNodeModuleClass = moduleClasses.get(targetNode.getModuleId());
Module targetNodeModule = targetNodeModuleClass.getAnnotation(Module.class);
ModuleEndpoint[] inputs = targetNodeModule.inputs();
DataType targetEndPointDataType = null;
for( ModuleEndpoint input : inputs ){
if( input.name().equals(edge.getTargetEndpoint())){
targetEndPointDataType = input.dataType();
}
}
if( !sourceDataTypeMap.containsKey(edge)){
sourceDataTypeMap.put(edge, sourceEndPointDataType);
}
if(targetEndPointDataType == DataType.AUTO){
int nodeIndex = graph.getNodeIndex(targetNode);
List<AdjMatrixNode> adjEdges = graph.getAdjacent(nodeIndex);
for(AdjMatrixNode adjEdge : adjEdges ){
targetDataTypeMap.put(adjEdge.getPath(), sourceEndPointDataType);
}
}else{
if( !targetDataTypeMap.containsKey(edge)){
targetDataTypeMap.put(edge, targetEndPointDataType);
}
}
}
//validate data type of each edge
for( Edge edge : edges ){
DataType sourceEndPointDataType = sourceDataTypeMap.get(edge);
DataType targetEndPointDataType = targetDataTypeMap.get(edge);
if( !DataType.matches(sourceEndPointDataType, targetEndPointDataType) ){
messageBuilder.append("<br>" + "Data type mismatch: " + edge.getSourceNode().getName() + "." +
edge.getSourceEndpoint() + " to " + edge.getTargetNode().getName() + "." + edge.getTargetEndpoint());
return false;
}
}
return true;
}
private boolean hasNoEmptyInputOutput(DirectedGraph graph,
Map<UUID, Class<?>> moduleClasses, StringBuilder messageBuilder){
List<Edge> edges = graph.getEdgeList();
//check the graph
for (int i = 0; i < graph.getNodeCount(); i++) {
Node node = graph.getNode(i);
UUID moduleId = node.getModuleId();
@ -158,6 +241,7 @@ public class OozieEngine extends AbstractEngine {
ModuleEndpoint[] inputs = annotation.inputs();
ModuleEndpoint[] outputs = annotation.outputs();
//check empty input
for (ModuleEndpoint input : inputs) {
String inputName = input.name();
boolean found = false;
@ -169,11 +253,12 @@ public class OozieEngine extends AbstractEngine {
}
}
if( !found){
messageBuilder.append("Empty input of node " + node.getName() + " input " + inputName);
messageBuilder.append("<br>" + "Empty input of node " + node.getName() + " input " + inputName);
return false;
}
}
//check empty output
for(ModuleEndpoint output : outputs){
String outputName = output.name();
boolean found = false;

View File

@ -24,11 +24,11 @@ import org.w3c.dom.Document;
@Component
public class OozieFlowXmlGenerator {
public String genWorkflowXml(Flow flow, Map<UUID, Class<?>> moduleClasses, StringBuilder messageBuilder)
public String genWorkflowXml(Flow flow, DirectedGraph originalGraph, Map<UUID, Class<?>> moduleClasses, StringBuilder messageBuilder)
throws Exception {
messageBuilder.append("Start sorting nodes ...");
DirectedGraph originalGraph = new DirectedGraph(flow.getNodes(), flow.getEdges());
// DirectedGraph originalGraph = new DirectedGraph(flow.getNodes(), flow.getEdges());
Map<UUID, Map<String, String>> inputConfigurations = new HashMap<UUID, Map<String, String>>();
DirectedGraph graph = this.graphTransformer.transform(flow, originalGraph, moduleClasses, inputConfigurations);
List<Integer> sorted = new TopologicalSort(graph).getOrder();

View File

@ -17,31 +17,15 @@ public abstract class AbstractJavaModule extends AbstractModule {
List<String> result = new ArrayList<String>();
for (ModuleEndpoint input : inputs) {
switch (input.dataType()) {
case PlainText:
String textValue = configurations.get(input.name()).trim();
result.add("--" + input.name());
result.add(textValue);
break;
case None:
default:
System.out.println("Invalid Parameters!");
break;
}
String textValue = configurations.get(input.name()).trim();
result.add("--" + input.name());
result.add(textValue);
}
for (ModuleEndpoint output : outputs) {
switch (output.dataType()) {
case PlainText:
String textValue = configurations.get(output.name()).trim();
result.add("--" + output.name());
result.add(textValue);
break;
case None:
default:
System.out.println("Invalid Parameters!");
break;
}
String textValue = configurations.get(output.name()).trim();
result.add("--" + output.name());
result.add(textValue);
}
for (String key : configurations.keySet()) {

View File

@ -1,5 +1,35 @@
package haflow.module;
public enum DataType {
None, PlainText
PlainText, Csv, Arff, Image, Jpg, Model, ALL, AUTO;
public static boolean matches(DataType sourceDataType, DataType targetDataType){
if( sourceDataType == DataType.AUTO || targetDataType == DataType.AUTO){
System.out.println("Error: AUTO datatype");
return false;
}
if( sourceDataType == targetDataType )
return true;
if( contains(targetDataType, sourceDataType))
return true;
return false;
}
private static boolean contains(DataType targetDataType, DataType sourceDataType){
if( targetDataType == DataType.ALL)
return true;
if( targetDataType == PlainText){
if( sourceDataType == Csv || sourceDataType == Arff)
return true;
}
if( targetDataType == Image){
if( sourceDataType == Jpg)
return true;
}
return false;
}
}

View File

@ -0,0 +1,26 @@
package haflow.module.data.input;
import haflow.module.AbstractModule;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleConfigurationType;
import haflow.module.ModuleEndpoint;
import haflow.module.ModuleType;
import java.util.Map;
@Module(id = "ada611a8-aa63-968a-ca46-4356a0e0bdab", name = "AllSourceFile", category = "Data-Source", type = ModuleType.SOURCE,
configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)},
inputs = { },
outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.ALL)})
public class AllFileInpuModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, String> inputs, Map<String, String> outputs) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -1,4 +1,4 @@
package haflow.module.datamining;
package haflow.module.data.input;
import haflow.module.AbstractModule;
import haflow.module.DataType;
@ -10,11 +10,11 @@ import haflow.module.ModuleType;
import java.util.Map;
@Module(id = "ada611a8-aa63-968a-ca46-4356a0e0bdab", name = "ArffSourceFile", category = "Data-Source", type = ModuleType.SOURCE,
@Module(id = "ada611a8-aa63-968a-1146-4356a0e0bdab", name = "ArffSourceFile", category = "Data-Source", type = ModuleType.SOURCE,
configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)},
inputs = { },
outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText)})
public class ArffFileModule extends AbstractModule {
outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.Arff)})
public class ArffFileInpuModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
@ -23,5 +23,4 @@ public class ArffFileModule extends AbstractModule {
return false;
}
}

View File

@ -0,0 +1,26 @@
package haflow.module.data.input;
import haflow.module.AbstractModule;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleConfigurationType;
import haflow.module.ModuleEndpoint;
import haflow.module.ModuleType;
import java.util.Map;
@Module(id = "34a611a8-aa63-968a-ca46-4356a0e0bdab", name = "TextSourceFile", category = "Data-Source", type = ModuleType.SOURCE,
configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)},
inputs = { },
outputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText)})
public class PlainTextInputFileModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, String> inputs, Map<String, String> outputs) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -0,0 +1,26 @@
package haflow.module.data.output;
import java.util.Map;
import haflow.module.AbstractModule;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleConfigurationType;
import haflow.module.ModuleEndpoint;
import haflow.module.ModuleType;
@Module(id = "12a611a8-aa63-118a-ca46-4356a0e0bdab", name = "AllTargetFile", category = "Data-Target", type = ModuleType.DEST,
configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)},
inputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.ALL) },
outputs = {})
public class AllOutputFileModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, String> inputs, Map<String, String> outputs) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -0,0 +1,26 @@
package haflow.module.data.output;
import java.util.Map;
import haflow.module.AbstractModule;
import haflow.module.DataType;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleConfigurationType;
import haflow.module.ModuleEndpoint;
import haflow.module.ModuleType;
@Module(id = "21a611a8-aa63-118a-ca46-4356a0e0bdab", name = "ArffTargetFile", category = "Data-Target", type = ModuleType.DEST,
configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)},
inputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.Arff) },
outputs = {})
public class ArffOutputFileModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,
Map<String, String> inputs, Map<String, String> outputs) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -1,4 +1,4 @@
package haflow.module.datamining;
package haflow.module.data.output;
import java.util.Map;
@ -10,11 +10,11 @@ import haflow.module.ModuleConfigurationType;
import haflow.module.ModuleEndpoint;
import haflow.module.ModuleType;
@Module(id = "ada611a8-aa63-118a-ca46-4356a0e0bdab", name = "TextDestFile", category = "Data-Source", type = ModuleType.DEST,
@Module(id = "ada611a8-aa63-118a-ca46-4356a0e1cdab", name = "TextTargetFile", category = "Data-Target", type = ModuleType.DEST,
configurations = { @ModuleConfiguration(key = "path", displayName = "path: Data path", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT)},
inputs = { @ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) },
outputs = {})
public class TxtOutputModule extends AbstractModule {
public class PlainTextFileOutputModule extends AbstractModule {
@Override
public boolean validate(Map<String, String> configurations,

View File

@ -15,7 +15,7 @@ import java.util.Map;
@ModuleConfiguration(key = "descriptor", displayName = "descriptor: Data descriptor", pattern = "^(.*)$", type = ModuleConfigurationType.PLAIN_TEXT),
@ModuleConfiguration(key = "regression", displayName = "regression: Regression Problem", pattern = "^(.*)$", type = ModuleConfigurationType.BOOLEAN) },
inputs = {
@ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) },
@ModuleEndpoint(name = "path", minNumber = 1, maxNumber = 1, dataType = DataType.Arff) },
outputs = {
@ModuleEndpoint(name = "file", minNumber = 1, maxNumber = 1, dataType = DataType.PlainText) })
public class DescribeModule extends AbstractJavaModule {