diff --git a/src/main/java/haflow/engine/model/DirectedGraph.java b/src/main/java/haflow/engine/model/DirectedGraph.java index ca1bbcc..2e2319e 100644 --- a/src/main/java/haflow/engine/model/DirectedGraph.java +++ b/src/main/java/haflow/engine/model/DirectedGraph.java @@ -98,22 +98,22 @@ public class DirectedGraph { // return ret; // } -// private List getNodeList() { -// return nodeList; -// } -// -// private void setNodeList(List nodeList) { -// this.nodeList = nodeList; -// } -// -// private List getEdgeList() { -// return edgeList; -// } -// -// private void setEdgeList(List edgeList) { -// this.edgeList = edgeList; -// } -// + public List getNodeList() { + return nodeList; + } + + public void setNodeList(List nodeList) { + this.nodeList = nodeList; + } + + public List getEdgeList() { + return edgeList; + } + + public void setEdgeList(List edgeList) { + this.edgeList = edgeList; + } + // private Node getStartNode() { // return startNode; // } diff --git a/src/main/java/haflow/engine/oozie/OozieEngine.java b/src/main/java/haflow/engine/oozie/OozieEngine.java index 23f5bde..fb12750 100644 --- a/src/main/java/haflow/engine/oozie/OozieEngine.java +++ b/src/main/java/haflow/engine/oozie/OozieEngine.java @@ -15,6 +15,8 @@ import haflow.engine.model.TopologicalSort; import haflow.module.AbstractHiveModule; import haflow.module.AbstractJavaModule; import haflow.module.Module; +import haflow.module.ModuleConfiguration; +import haflow.module.ModuleEndpoint; import haflow.module.basic.EndModule; import haflow.module.basic.StartModule; import haflow.module.util.ModuleUtil; @@ -141,79 +143,132 @@ public class OozieEngine extends AbstractEngine { .searchForModuleClasses(); Set nodes = flow.getNodes(); - - messageBuilder.append("Start parsing flow ..." + "
"); + + messageBuilder.append("Finding start node ..."); List startNodes = findStartNodes(nodes, moduleClasses); - if (startNodes.size() != 1) { - messageBuilder.append("Error: Wrong start node number " - + startNodes.size()); - } else { + if (startNodes.size() == 1) { + messageBuilder.append(" done" + "
"); DirectedGraph graph = new DirectedGraph(flow.getNodes(), flow.getEdges(), startNodes.get(0)); - List sorted = new TopologicalSort(graph).getOrder(); + + messageBuilder.append("Start validating flow inputs and outputs ..."); + boolean isValidGraph = validateGraph(graph, moduleClasses, messageBuilder);//TODO + if (isValidGraph) { + messageBuilder.append(" done" + "
"); + + messageBuilder.append("Start sorting nodes ..."); + List sorted = new TopologicalSort(graph) + .getOrder(); - if (sorted == null) { - messageBuilder.append("Error: Flow has Circles!"); - } else { - String flowName = flow.getName(); - String workflowXml = genWorkflowXml(flowName, sorted, - moduleClasses, graph); - - messageBuilder.append("Parsing flow ... Finised" + "
"); - messageBuilder.append("Start deploying flow ..." + "
"); - - String localDeployPath = this.getClusterConfiguration() - .getProperty(ClusterConfiguration.WORKSPACE_LOCAL) - + flowName; - boolean deloyedLocally = this.getFlowDeployService() - .deployFlowLocal(localDeployPath, workflowXml, - getJarPaths(nodes, moduleClasses)); - if (deloyedLocally) { - messageBuilder.append(flowName - + " has been deployed locally!" + "
"); - - String hdfsDeployPath = this.getClusterConfiguration() - .getProperty( - ClusterConfiguration.WORKSPACE_HDFS) - + flowName; - boolean deleted = this.getHdfsService() - .deleteDirectory(hdfsDeployPath); - if (deleted) { - messageBuilder.append("Old folder deleted: " - + hdfsDeployPath + "
"); - } - - boolean deployedToHdfs = this.getHdfsService() - .uploadFile(localDeployPath, hdfsDeployPath); - if (deployedToHdfs) { - messageBuilder.append(flowName - + " has been uploaded to hdfs!" + "
"); - - String jobId = this.getOozieService().runJob( - flowName); - if (jobId == null) { - messageBuilder.append("Failed to commit job: " - + flowName + "
"); - } else { - messageBuilder.append("Job commited! Job id : " - + jobId + "
"); - model.setCommitted(true); - model.setJobId(jobId); + if (sorted != null) { + messageBuilder.append(" done" + "
"); + + messageBuilder.append("Start validating configurations of each node ..." + + "
"); + boolean isValide = validateEachNode(flow, moduleClasses, messageBuilder);// TODO + if (isValide) { + messageBuilder.append(" done" + "
"); + + messageBuilder.append("Start generating Oozie xml ..."); + String flowName = flow.getName(); + String workflowXml = null; + try { + workflowXml = genWorkflowXml(flowName, sorted, + moduleClasses, graph); + } catch (Exception e) { + messageBuilder.append(e.getMessage()); } - } else { - messageBuilder.append(flowName - + " failed to be uploaded to hdfs!" + "
"); - } - } else { - messageBuilder.append(flowName - + " failed to be deployed locally!" + "
"); - } + + if (workflowXml != null){ + messageBuilder.append(" done" + "
"); + + messageBuilder + .append("Start deploying flow locally ..."); + String localDeployPath = this + .getClusterConfiguration() + .getProperty( + ClusterConfiguration.WORKSPACE_LOCAL) + + flowName; + boolean deloyedLocally = this + .getFlowDeployService() + .deployFlowLocal( + localDeployPath, + workflowXml, + getJarPaths(nodes, + moduleClasses)); + if (deloyedLocally) { + messageBuilder.append(" done" + "
"); + + String hdfsDeployPath = this + .getClusterConfiguration() + .getProperty( + ClusterConfiguration.WORKSPACE_HDFS) + + flowName; + boolean deleted = this.getHdfsService() + .deleteDirectory(hdfsDeployPath); + if (deleted) { + messageBuilder + .append( "
" + "Old folder deleted: " + + hdfsDeployPath + + "
"); + } + + messageBuilder + .append("Start deploying flow to HDFS ..."); + boolean deployedToHdfs = this + .getHdfsService().uploadFile( + localDeployPath, + hdfsDeployPath); + if (deployedToHdfs) { + messageBuilder.append(" done" + "
"); + + messageBuilder + .append("Start committing job to Oozie ..."); + + String jobId = this.getOozieService() + .runJob(flowName); + if (jobId != null) { + messageBuilder.append(" done" + "
"); + + messageBuilder + .append("Job commited! Job id : " + + jobId + "
"); + model.setCommitted(true); + model.setJobId(jobId); + } else { + + messageBuilder + .append("Error :" + "Failed to commit job: " + + flowName + "
"); + } + } else { + messageBuilder + .append("Error :" + flowName + + " failed to be uploaded to hdfs!" + + "
"); + } + }else { + messageBuilder.append("Error :" + flowName + + " failed to be deployed locally!" + + "
"); + } + }else { + messageBuilder + .append("Error : Failed to generate Oozie xml!"); + } + }else { + messageBuilder.append("Error : Invalid flow!"); + } + }else { + messageBuilder.append("Error: Flow has Circles!"); + } } + } else { + messageBuilder.append("Error: Wrong start node number " + + startNodes.size()); } -// System.out.println(messageBuilder.toString()); model.setMessage(messageBuilder.toString()); -// System.out.println(messageBuilder.toString()); logger.info(messageBuilder.toString()); return model; } catch (Exception e) { @@ -264,6 +319,87 @@ public class OozieEngine extends AbstractEngine { } } + private boolean validateGraph(DirectedGraph graph, + Map> moduleClasses, StringBuilder messageBuilder) { + List edges = graph.getEdgeList(); + + for (int i = 0; i < graph.getNodeCount(); i++) { + Node node = graph.getNode(i); + UUID moduleId = node.getModuleId(); + Class moduleClass = moduleClasses.get(moduleId); + Module annotation = moduleClass.getAnnotation(Module.class); + ModuleEndpoint[] inputs = annotation.inputs(); + ModuleEndpoint[] outputs = annotation.outputs(); + + for (ModuleEndpoint input : inputs) { + String inputName = input.name(); + boolean found = false; + for (Edge edge : edges) { + if (edge.getTargetNode().getId().equals(node.getId()) + && inputName.equals(edge.getTargetEndpoint())){ + found = true; + break; + } + } + if( !found){ + messageBuilder.append("Empty input of node " + node.getName() + " input " + inputName); + return false; + } + } + + for(ModuleEndpoint output : outputs){ + String outputName = output.name(); + boolean found = false; + for( Edge edge : edges) { + if( edge.getSourceNode().getId().equals(node.getId()) + && outputName.equals(edge.getSourceEndpoint())){ + found = true; + break; + } + } + if ( !found){ + messageBuilder.append("Empty output of node " + node.getName() + " output " + outputName + "
"); + return false; + } + } + } + return true;// TODO + } + + /** + * Check if each node of the flow has all the required configurations. + * @param flow + * @param moduleClasses + * @param messageBuilder + * @return + */ + private boolean validateEachNode(Flow flow, Map> + moduleClasses, StringBuilder messageBuilder){ + Set nodes = flow.getNodes(); + for( Node node : nodes){ + Class moduleClass = moduleClasses.get(node.getModuleId()); + Module anno = moduleClass.getAnnotation(Module.class); + ModuleConfiguration[] moduleConfs = anno.configurations(); + List nodeConfs = this.getNodeConfigurationService().getNodeConfiguration(node.getId()); + for(ModuleConfiguration moduleConf : moduleConfs){ + if( moduleConf.required()){ + boolean found = false; + for( NodeConfiguration nodeConf : nodeConfs){ + if( nodeConf.getKey().equals(moduleConf.key()) && nodeConf.getValue() != null & nodeConf.getValue().trim() != ""){ + found = true; + break; + } + } + if( !found){ + messageBuilder.append("No value for " + moduleConf.key() + " of node " + node.getName() + "
"); + return false; + } + } + } + } + return true; + } + private String genWorkflowXml(String flowName, List sorted, Map> moduleClasses, DirectedGraph graph) throws InstantiationException, IllegalAccessException { @@ -293,8 +429,8 @@ public class OozieEngine extends AbstractEngine { Edge path = v.getPath(); outputs.put(path.getSourceEndpoint(), // "ok" path.getTargetNode()); - System.out.println(path.getSourceEndpoint()); - System.out.println(path.getTargetNode().getName()); + System.out.println(path.getSourceEndpoint() + + " to " + path.getTargetNode().getName()); } } diff --git a/src/main/java/haflow/module/ModuleConfiguration.java b/src/main/java/haflow/module/ModuleConfiguration.java index 31a7fd3..1c58351 100644 --- a/src/main/java/haflow/module/ModuleConfiguration.java +++ b/src/main/java/haflow/module/ModuleConfiguration.java @@ -15,4 +15,6 @@ public @interface ModuleConfiguration { String pattern(); ModuleConfigurationType type(); + + boolean required() default false; } diff --git a/src/main/java/haflow/ui/helper/RunHistoryHelper.java b/src/main/java/haflow/ui/helper/RunHistoryHelper.java index 13bb079..e99a3c3 100644 --- a/src/main/java/haflow/ui/helper/RunHistoryHelper.java +++ b/src/main/java/haflow/ui/helper/RunHistoryHelper.java @@ -6,6 +6,8 @@ import haflow.ui.model.FlowRunHistoryListModel; import haflow.ui.model.FlowRunHistoryModel; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.UUID; @@ -39,6 +41,19 @@ public class RunHistoryHelper { frhm.setTimestamp(feh.getTimestamp()); flowHistorys.add(frhm); } + Collections.sort(flowHistorys, new Comparator(){ + public int compare(FlowRunHistoryModel o1, FlowRunHistoryModel o2) { + if( o1 == o2) return 0; + FlowRunHistoryModel f1 = (FlowRunHistoryModel)o1; + FlowRunHistoryModel f2 = (FlowRunHistoryModel)o2; + if( f1.getTimestamp().before(f2.getTimestamp())){ + return 1; + }else if(f1.getTimestamp().after(f2.getTimestamp()) ){ + return -1; + } + return 0; + } + }); FlowRunHistoryListModel frhlm = new FlowRunHistoryListModel(); frhlm.setFlowHistorys(flowHistorys); return frhlm; diff --git a/src/main/webapp/script/haflow.main.js b/src/main/webapp/script/haflow.main.js index c71eb09..6b00ad6 100644 --- a/src/main/webapp/script/haflow.main.js +++ b/src/main/webapp/script/haflow.main.js @@ -180,7 +180,7 @@ HAFlow.Main.prototype.runFlow = function(flowId) { data : JSON.stringify({}), success : function(data, status) { HAFlow.showDialog("Run Flow", " Commited: " + data.commited - + "\n" + "Result: " + data.message); + + "
" + "Message:
" + data.message); }, error : function(request, status, error) { HAFlow.showDialog("Error",