增加工作流正确性验证:输入输出不能为空,每个节点的required参数不能为空,必须有起始节点,不能有环。

This commit is contained in:
zhaowei8188127 2013-09-11 10:35:13 +08:00
parent 9f3befbd08
commit fbc1552636
5 changed files with 236 additions and 83 deletions

View File

@ -98,22 +98,22 @@ public class DirectedGraph {
// return ret;
// }
// private List<Node> getNodeList() {
// return nodeList;
// }
//
// private void setNodeList(List<Node> nodeList) {
// this.nodeList = nodeList;
// }
//
// private List<Edge> getEdgeList() {
// return edgeList;
// }
//
// private void setEdgeList(List<Edge> edgeList) {
// this.edgeList = edgeList;
// }
//
public List<Node> getNodeList() {
return nodeList;
}
public void setNodeList(List<Node> nodeList) {
this.nodeList = nodeList;
}
public List<Edge> getEdgeList() {
return edgeList;
}
public void setEdgeList(List<Edge> edgeList) {
this.edgeList = edgeList;
}
// private Node getStartNode() {
// return startNode;
// }

View File

@ -15,6 +15,8 @@ import haflow.engine.model.TopologicalSort;
import haflow.module.AbstractHiveModule;
import haflow.module.AbstractJavaModule;
import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleEndpoint;
import haflow.module.basic.EndModule;
import haflow.module.basic.StartModule;
import haflow.module.util.ModuleUtil;
@ -141,79 +143,132 @@ public class OozieEngine extends AbstractEngine {
.searchForModuleClasses();
Set<Node> nodes = flow.getNodes();
messageBuilder.append("Start parsing flow ..." + "<br>");
messageBuilder.append("Finding start node ...");
List<Node> startNodes = findStartNodes(nodes, moduleClasses);
if (startNodes.size() != 1) {
messageBuilder.append("Error: Wrong start node number "
+ startNodes.size());
} else {
if (startNodes.size() == 1) {
messageBuilder.append(" done" + "<br>");
DirectedGraph graph = new DirectedGraph(flow.getNodes(),
flow.getEdges(), startNodes.get(0));
List<Integer> sorted = new TopologicalSort(graph).getOrder();
messageBuilder.append("Start validating flow inputs and outputs ...");
boolean isValidGraph = validateGraph(graph, moduleClasses, messageBuilder);//TODO
if (isValidGraph) {
messageBuilder.append(" done" + "<br>");
messageBuilder.append("Start sorting nodes ...");
List<Integer> sorted = new TopologicalSort(graph)
.getOrder();
if (sorted == null) {
messageBuilder.append("Error: Flow has Circles!");
} else {
String flowName = flow.getName();
String workflowXml = genWorkflowXml(flowName, sorted,
moduleClasses, graph);
messageBuilder.append("Parsing flow ... Finised" + "<br>");
messageBuilder.append("Start deploying flow ..." + "<br>");
String localDeployPath = this.getClusterConfiguration()
.getProperty(ClusterConfiguration.WORKSPACE_LOCAL)
+ flowName;
boolean deloyedLocally = this.getFlowDeployService()
.deployFlowLocal(localDeployPath, workflowXml,
getJarPaths(nodes, moduleClasses));
if (deloyedLocally) {
messageBuilder.append(flowName
+ " has been deployed locally!" + "<br>");
String hdfsDeployPath = this.getClusterConfiguration()
.getProperty(
ClusterConfiguration.WORKSPACE_HDFS)
+ flowName;
boolean deleted = this.getHdfsService()
.deleteDirectory(hdfsDeployPath);
if (deleted) {
messageBuilder.append("Old folder deleted: "
+ hdfsDeployPath + "<br>");
}
boolean deployedToHdfs = this.getHdfsService()
.uploadFile(localDeployPath, hdfsDeployPath);
if (deployedToHdfs) {
messageBuilder.append(flowName
+ " has been uploaded to hdfs!" + "<br>");
String jobId = this.getOozieService().runJob(
flowName);
if (jobId == null) {
messageBuilder.append("Failed to commit job: "
+ flowName + "<br>");
} else {
messageBuilder.append("Job commited! Job id : "
+ jobId + "<br>");
model.setCommitted(true);
model.setJobId(jobId);
if (sorted != null) {
messageBuilder.append(" done" + "<br>");
messageBuilder.append("Start validating configurations of each node ..."
+ "<br>");
boolean isValide = validateEachNode(flow, moduleClasses, messageBuilder);// TODO
if (isValide) {
messageBuilder.append(" done" + "<br>");
messageBuilder.append("Start generating Oozie xml ...");
String flowName = flow.getName();
String workflowXml = null;
try {
workflowXml = genWorkflowXml(flowName, sorted,
moduleClasses, graph);
} catch (Exception e) {
messageBuilder.append(e.getMessage());
}
} else {
messageBuilder.append(flowName
+ " failed to be uploaded to hdfs!" + "<br>");
}
} else {
messageBuilder.append(flowName
+ " failed to be deployed locally!" + "<br>");
}
if (workflowXml != null){
messageBuilder.append(" done" + "<br>");
messageBuilder
.append("Start deploying flow locally ...");
String localDeployPath = this
.getClusterConfiguration()
.getProperty(
ClusterConfiguration.WORKSPACE_LOCAL)
+ flowName;
boolean deloyedLocally = this
.getFlowDeployService()
.deployFlowLocal(
localDeployPath,
workflowXml,
getJarPaths(nodes,
moduleClasses));
if (deloyedLocally) {
messageBuilder.append(" done" + "<br>");
String hdfsDeployPath = this
.getClusterConfiguration()
.getProperty(
ClusterConfiguration.WORKSPACE_HDFS)
+ flowName;
boolean deleted = this.getHdfsService()
.deleteDirectory(hdfsDeployPath);
if (deleted) {
messageBuilder
.append( "<br>" + "Old folder deleted: "
+ hdfsDeployPath
+ "<br>");
}
messageBuilder
.append("Start deploying flow to HDFS ...");
boolean deployedToHdfs = this
.getHdfsService().uploadFile(
localDeployPath,
hdfsDeployPath);
if (deployedToHdfs) {
messageBuilder.append(" done" + "<br>");
messageBuilder
.append("Start committing job to Oozie ...");
String jobId = this.getOozieService()
.runJob(flowName);
if (jobId != null) {
messageBuilder.append(" done" + "<br>");
messageBuilder
.append("Job commited! Job id : "
+ jobId + "<br>");
model.setCommitted(true);
model.setJobId(jobId);
} else {
messageBuilder
.append("Error :" + "Failed to commit job: "
+ flowName + "<br>");
}
} else {
messageBuilder
.append("Error :" + flowName
+ " failed to be uploaded to hdfs!"
+ "<br>");
}
}else {
messageBuilder.append("Error :" + flowName
+ " failed to be deployed locally!"
+ "<br>");
}
}else {
messageBuilder
.append("Error : Failed to generate Oozie xml!");
}
}else {
messageBuilder.append("Error : Invalid flow!");
}
}else {
messageBuilder.append("Error: Flow has Circles!");
}
}
} else {
messageBuilder.append("Error: Wrong start node number "
+ startNodes.size());
}
// System.out.println(messageBuilder.toString());
model.setMessage(messageBuilder.toString());
// System.out.println(messageBuilder.toString());
logger.info(messageBuilder.toString());
return model;
} catch (Exception e) {
@ -264,6 +319,87 @@ public class OozieEngine extends AbstractEngine {
}
}
private boolean validateGraph(DirectedGraph graph,
Map<UUID, Class<?>> moduleClasses, StringBuilder messageBuilder) {
List<Edge> edges = graph.getEdgeList();
for (int i = 0; i < graph.getNodeCount(); i++) {
Node node = graph.getNode(i);
UUID moduleId = node.getModuleId();
Class<?> moduleClass = moduleClasses.get(moduleId);
Module annotation = moduleClass.getAnnotation(Module.class);
ModuleEndpoint[] inputs = annotation.inputs();
ModuleEndpoint[] outputs = annotation.outputs();
for (ModuleEndpoint input : inputs) {
String inputName = input.name();
boolean found = false;
for (Edge edge : edges) {
if (edge.getTargetNode().getId().equals(node.getId())
&& inputName.equals(edge.getTargetEndpoint())){
found = true;
break;
}
}
if( !found){
messageBuilder.append("Empty input of node " + node.getName() + " input " + inputName);
return false;
}
}
for(ModuleEndpoint output : outputs){
String outputName = output.name();
boolean found = false;
for( Edge edge : edges) {
if( edge.getSourceNode().getId().equals(node.getId())
&& outputName.equals(edge.getSourceEndpoint())){
found = true;
break;
}
}
if ( !found){
messageBuilder.append("Empty output of node " + node.getName() + " output " + outputName + "<br>");
return false;
}
}
}
return true;// TODO
}
/**
* Check if each node of the flow has all the required configurations.
* @param flow
* @param moduleClasses
* @param messageBuilder
* @return
*/
private boolean validateEachNode(Flow flow, Map<UUID, Class<?>>
moduleClasses, StringBuilder messageBuilder){
Set<Node> nodes = flow.getNodes();
for( Node node : nodes){
Class<?> moduleClass = moduleClasses.get(node.getModuleId());
Module anno = moduleClass.getAnnotation(Module.class);
ModuleConfiguration[] moduleConfs = anno.configurations();
List<NodeConfiguration> nodeConfs = this.getNodeConfigurationService().getNodeConfiguration(node.getId());
for(ModuleConfiguration moduleConf : moduleConfs){
if( moduleConf.required()){
boolean found = false;
for( NodeConfiguration nodeConf : nodeConfs){
if( nodeConf.getKey().equals(moduleConf.key()) && nodeConf.getValue() != null & nodeConf.getValue().trim() != ""){
found = true;
break;
}
}
if( !found){
messageBuilder.append("No value for " + moduleConf.key() + " of node " + node.getName() + "<br>");
return false;
}
}
}
}
return true;
}
private String genWorkflowXml(String flowName, List<Integer> sorted,
Map<UUID, Class<?>> moduleClasses, DirectedGraph graph)
throws InstantiationException, IllegalAccessException {
@ -293,8 +429,8 @@ public class OozieEngine extends AbstractEngine {
Edge path = v.getPath();
outputs.put(path.getSourceEndpoint(), // "ok"
path.getTargetNode());
System.out.println(path.getSourceEndpoint());
System.out.println(path.getTargetNode().getName());
System.out.println(path.getSourceEndpoint()
+ " to " + path.getTargetNode().getName());
}
}

View File

@ -15,4 +15,6 @@ public @interface ModuleConfiguration {
String pattern();
ModuleConfigurationType type();
boolean required() default false;
}

View File

@ -6,6 +6,8 @@ import haflow.ui.model.FlowRunHistoryListModel;
import haflow.ui.model.FlowRunHistoryModel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@ -39,6 +41,19 @@ public class RunHistoryHelper {
frhm.setTimestamp(feh.getTimestamp());
flowHistorys.add(frhm);
}
Collections.sort(flowHistorys, new Comparator<FlowRunHistoryModel>(){
public int compare(FlowRunHistoryModel o1, FlowRunHistoryModel o2) {
if( o1 == o2) return 0;
FlowRunHistoryModel f1 = (FlowRunHistoryModel)o1;
FlowRunHistoryModel f2 = (FlowRunHistoryModel)o2;
if( f1.getTimestamp().before(f2.getTimestamp())){
return 1;
}else if(f1.getTimestamp().after(f2.getTimestamp()) ){
return -1;
}
return 0;
}
});
FlowRunHistoryListModel frhlm = new FlowRunHistoryListModel();
frhlm.setFlowHistorys(flowHistorys);
return frhlm;

View File

@ -180,7 +180,7 @@ HAFlow.Main.prototype.runFlow = function(flowId) {
data : JSON.stringify({}),
success : function(data, status) {
HAFlow.showDialog("Run Flow", " Commited: " + data.commited
+ "\n" + "Result: " + data.message);
+ "<br>" + "Message:<br> " + data.message);
},
error : function(request, status, error) {
HAFlow.showDialog("Error",