Merge pull request #6 from justinliucs/master

Merge from base project.
This commit is contained in:
Zhen Tang 2013-06-06 02:35:37 -07:00
commit 7c924058c4
14 changed files with 197 additions and 79 deletions

Binary file not shown.

Binary file not shown.

View File

@ -3,10 +3,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>1.0</modelVersion>
<groupId>com.otcaix.haflow</groupId>
<artifactId>java7-module</artifactId>
<artifactId>haflow-demo-module</artifactId>
<packaging>jar</packaging>
<name>java7</name>
<description>java7 module</description>
<url>http://www.baidu.com</url>
<name>demo</name>
<description>haflow-demo-module</description>
<version>1.0</version>
</project>

Binary file not shown.

29
pom.xml
View File

@ -43,7 +43,7 @@
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-core</artifactId>
<version>4.3.0.Beta2</version>
<version>4.3.0.Beta3</version>
</dependency>
<dependency>
@ -95,7 +95,7 @@
</dependency>
<dependency>
<groupId>com.otcaix.haflow</groupId>
<artifactId>java7-module</artifactId>
<artifactId>haflow-demo-module</artifactId>
<version>1.0</version>
</dependency>
<dependency>
@ -105,18 +105,23 @@
</dependency>
</dependencies>
<build>
<finalName>haflow</finalName>
<plugins>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>9.0.3.v20130506</version>
</plugin>
</plugins>
</build>
<profiles>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<version>8.1.11.v20130520</version>
</plugin>
<plugin>
<groupId>org.kohsuke.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>7.0.0pre1</version>
</plugin>
</plugins>
</build>
</project>

View File

@ -14,6 +14,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.stereotype.Component;
/**
* Read top 10 lines, from files on hadoop
@ -21,6 +22,7 @@ import org.apache.hadoop.io.IOUtils;
* @author ZhaoWei
*
*/
@Component
public class HdfsHelper {
public static final String filePath = "hdfs://m150:9000/ztest/split/part-r-00000";
public static final String newFile = "hdfs://m150:9000/ztest/test/part-r-00000";
@ -29,14 +31,13 @@ public class HdfsHelper {
private FileSystem hdfs;
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
HdfsHelper reader = new HdfsHelper(conf);
HdfsHelper reader = new HdfsHelper();
reader.ReadFile(filePath);
}
public HdfsHelper(Configuration conf){
public HdfsHelper(){
try {
Configuration conf = new Configuration();
hdfs = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
@ -94,7 +95,7 @@ public class HdfsHelper {
try {
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
hdfs.copyFromLocalFile(false, true, srcPath, dstPath);
} catch (IOException e) {
e.printStackTrace();
}

View File

@ -21,18 +21,20 @@ import org.apache.hadoop.util.Progressable;
public class HdfsStaticHelper {
public static void main(String[] args) {
try {
uploadTohdfs();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
};
// try {
// uploadTohdfs();
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// } catch (IOException e) {
// e.printStackTrace();
// };
PutFile("D:/haflow/flows/NewFlow", "hdfs://m150:9000/ztest/test1");
}
public static void uploadTohdfs() throws FileNotFoundException, IOException {
String localSrc = "D://a.txt";
String dst = "hdfs://m150:9000/ztest/test/a.txt";
String localSrc = "D:/haflow/flows/NewFlow/workflow.xml";
String dst = "hdfs://m150:9000/ztest/test/newflow";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
@ -41,11 +43,26 @@ public class HdfsStaticHelper {
System.out.println(".");
}
});
System.out.println("上传文件成功");
System.out.println("finised uploading!");
IOUtils.copyBytes(in, out, 4096, true);
}
/** 从HDFS上读取文件 */
public static void PutFile( String srcFile, String dstFile) {
FileSystem hdfs;
Configuration conf = new Configuration();
try {
hdfs = FileSystem.get(conf);
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void readHdfs() throws FileNotFoundException, IOException {
String dst = "hdfs://192.168.1.11:9000/usr/yujing/test.txt";
Configuration conf = new Configuration();
@ -60,15 +77,18 @@ public class HdfsStaticHelper {
out.write(ioBuffer, 0, readLen);
readLen = hdfsInStream.read(ioBuffer);
}
System.out.println("读文件成功");
System.out.println("finished reading!");
out.close();
hdfsInStream.close();
fs.close();
}
/**
* 以append方式将内容添加到HDFS上文件的末尾;注意文件更新需要在hdfs-site.xml中添<property><name>dfs.
* append.support</name><value>true</value></property>
* 以append方式将内容添加到HDFS上文件的末尾;注意文件更新需要在hdfs-site.xml中添
* <property>
* <name>dfs.append.support</name>
* <value>true</value>
* </property>
*/
public static void appendToHdfs() throws FileNotFoundException, IOException {
String dst = "hdfs://192.168.1.11:9000/usr/yujing/test.txt";
@ -85,7 +105,6 @@ public class HdfsStaticHelper {
fs.close();
}
/** 从HDFS上删除文件 */
public static void deleteFromHdfs() throws FileNotFoundException,
IOException {
String dst = "hdfs://192.168.1.11:9000/usr/yujing";

View File

@ -1,14 +1,12 @@
package haflow.hdfs.client;
import haflow.module.general.JavaModule;
import java.io.File;
public class TestClassPath {
public static void main(String[] args) {
String path = JavaModule.class.getProtectionDomain().getCodeSource().getLocation().getFile();
File jarFile = new File(path);
System.out.println(path);
// String path = JavaModule.class.getProtectionDomain().getCodeSource().getLocation().getFile();
// File jarFile = new File(path);
// System.out.println(path);
}
}

View File

@ -6,7 +6,7 @@ import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
public class JavaClient {
public class OozieHelper {
public static void main(String[] args) throws OozieClientException,
InterruptedException {
@ -42,4 +42,19 @@ public class JavaClient {
System.out.println(wc.getJobInfo(jobId));
}
// get a OozieClient for local Oozie
OozieClient wc = new OozieClient("http://m150:11000/oozie/");
// create a workflow job configuration
Properties conf = wc.createConfiguration();
public OozieHelper(){
}
public String runJob(String flowName){
return null;
}
}

View File

@ -1,26 +1,67 @@
package haflow.service;
import haflow.hdfs.client.HdfsHelper;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.channels.FileChannel;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FlowDeployService {
public long copyJarFile(File srcFile, File dstPath, String dstName){
private final String workspace = "D:/haflow/flows/";
private final String workspace_hdfs = "hdfs://m150:9000/user/root/examples/apps/";
private HdfsHelper hdfsHelper;
public HdfsHelper getHdfsHelper() {
return hdfsHelper;
}
@Autowired
public void setHdfsHelper(HdfsHelper hdfsHelper) {
this.hdfsHelper = hdfsHelper;
}
public boolean deployFlow(String flowName, String workflowXml, Set<String> jarPaths){
String deployPath = workspace + flowName;
this.rmr(new File(deployPath));
this.saveXmlToFlow(workflowXml, deployPath);
for (String jarPath : jarPaths) {
if(jarPath.endsWith(".jar")){
File jarFile = new File(jarPath);
File dstPath = new File(deployPath + "/lib");
System.out.println(jarPath);
this.copyJarFile(jarFile, dstPath, jarFile.getName());
}
}
this.hdfsHelper.PutFile(deployPath, workspace_hdfs + flowName);
return true;
}
private long copyJarFile(File srcFile, File dstPath, String dstName){
File dst = new File(dstPath, dstName);
if(!srcFile.exists()){
System.out.println(srcFile.getAbsolutePath() + " do not exist!");
}else if( !dstPath.exists()){
System.out.println(dstPath.getAbsolutePath() + " do not exist!");
}else if(dst.exists()){
System.out.println(dst.getAbsolutePath() + " already exists!");
}else{
if( !dstPath.exists()){
boolean made = mkdir(dstPath);
if( made )
System.out.println(dstPath.getAbsolutePath() + " created!");
}
//start copy file
try {
@SuppressWarnings("resource")
@ -40,4 +81,49 @@ public class FlowDeployService {
}
return 0;
}
private boolean saveXmlToFlow(String xml, String filePath){
File dir = new File(filePath);
mkdir(dir);
OutputStreamWriter fw;
try {
fw = new OutputStreamWriter(new FileOutputStream(new File(filePath, "workflow.xml")));
fw.write(xml);
fw.close();
return true;
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
private boolean rmr(File dir){
if( dir != null && dir.exists()){
if( dir.isDirectory()){
File[] subs = dir.listFiles();
for( File sub : subs){
rmr(sub);
}
return dir.delete();
}else if( dir.isFile()){
return dir.delete();
}
}
return true;
}
private boolean mkdir(File dir){
if( !dir.exists() ){
boolean made = dir.mkdirs();
if(made){
System.out.println("made dir " + dir.getAbsolutePath());
}else{
System.err.println("failed to make dir " + dir.getAbsolutePath());
return false;
}
}
return true;
}
}

View File

@ -7,16 +7,15 @@ import haflow.flow.entity.Topological;
import haflow.module.ModuleMetadata;
import haflow.module.basic.EndModule;
import haflow.module.basic.StartModule;
import haflow.module.general.JavaModule;
import haflow.profile.NodeConfigurationProfile;
import haflow.ui.model.RunFlowResultModel;
import haflow.utility.ModuleLoader;
import haflow.utility.SessionHelper;
import haflow.utility.XmlHelper;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -125,8 +124,8 @@ public class FlowExecuteService {
startNodes.get(0));
Topological topo = new Topological(graph);
List<Integer> sorted = topo.getOrder();
StringBuilder sb = new StringBuilder();
sb.append("<workflow-app xmlns=\"uri:oozie:workflow:0.2\" name=\""
StringBuilder workflowXml = new StringBuilder();
workflowXml.append("<workflow-app xmlns=\"uri:oozie:workflow:0.2\" name=\""
+ flow.getName() + "\">" + "\n");
if (sorted != null) {
for (int i = 0; i < sorted.size(); i++) {//move end node to the end
@ -142,7 +141,13 @@ public class FlowExecuteService {
break;
}
}
for (int i = 0; i < sorted.size(); i++) {
for (int i = 0; i < sorted.size(); i++) {//generate xml
if( i == sorted.size()-1 ){
workflowXml.append("<kill name=\"fail\">" +
"<message>Work flow failed, " +
"error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
"</kill>" + "\n");
}
int w = sorted.get(i);
Node node = graph.getNode(w);
Class<?> moduleClass = moduleClasses.get(node
@ -169,32 +174,24 @@ public class FlowExecuteService {
}
Document doc = module.generate(configurations);
String part = this.xmlHelper.getXmlString(doc);
sb.append(part + "\n");
workflowXml.append(part + "\n");
}
}else{
messageBuilder.append("Error: Flow is has Circles!");
}
// sb.append("<kill name=\"fail\">" +
// "<message>Work flow failed, " +
// "error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
// "</kill>" + "\n");
sb.append("</workflow-app>" + "\n");
System.out.println(sb.toString());
messageBuilder.append("Generated xml : \n" + sb.toString());
workflowXml.append("</workflow-app>" + "\n");
System.out.println(workflowXml.toString());
// messageBuilder.append("Generated xml : \n" + workflowXml.toString());
//deploy workflow
String flowName = flow.getName();
Set<String> jarPaths = new HashSet<String>();
for (Node node : nodes) {
Class<?> module = moduleClasses.get(node.getModuleId()
.toString());
String path = JavaModule.class.getProtectionDomain().getCodeSource().getLocation().getFile();
if(path.endsWith(".jar")){
File jarFile = new File(path);
File dstPath = new File("D:/haflow/flows/" + flowName);
System.out.println(path);
this.flowDeployService.copyJarFile(jarFile, dstPath, jarFile.getName());
}
String path = module.getProtectionDomain().getCodeSource().getLocation().getFile();
jarPaths.add(path);
}
// this.flowDeployService.deployFlow(flow.getName(), workflowXml.toString(), jarPaths);
}

View File

@ -1,10 +1,7 @@
package haflow.service;
import haflow.service.FlowExecuteService;
import java.util.UUID;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@ -12,7 +9,6 @@ public class FlowExecuteServiceTest {
private ApplicationContext context;
@Test
public void test() {
context = new ClassPathXmlApplicationContext(new String[] {"servlet-context.xml"});
FlowExecuteService fes = context.getBean(FlowExecuteService.class);

View File

@ -1,15 +1,15 @@
package haflow.utility;
import haflow.entity.Module;
import haflow.entity.ModuleConfiguration;
import haflow.module.ModuleMetadata;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import haflow.entity.Module;
import haflow.entity.ModuleConfiguration;
import haflow.module.ModuleMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -31,6 +31,7 @@ public class ModuleLoader {
Map<Module, ModuleMetadata> modules = new HashMap<Module, ModuleMetadata>();
List<String> classNames = this.getClassHelper().getClassNames(
"haflow", true);
classNames.addAll(this.getClassHelper().getClassNames("hmodule", true));
for (String className : classNames) {
Class<?> moduleClass = Class.forName(className);
if (moduleClass.isAnnotationPresent(haflow.module.Module.class)) {
@ -78,6 +79,7 @@ public class ModuleLoader {
Map<String, Class<?>> moduleClasses = new HashMap<String, Class<?>>();
List<String> classNames = this.getClassHelper().getClassNames(
"haflow", true);
classNames.addAll(this.getClassHelper().getClassNames("hmodule", true));
for (String className : classNames) {
Class<?> moduleClass = Class.forName(className);
if (moduleClass.isAnnotationPresent(haflow.module.Module.class)) {

View File

@ -8,8 +8,8 @@
<property name="hibernate.connection.username">root </property>
<property name="hibernate.connection.password">123456 </property>
<property name="hibernate.connection.pool.size">20 </property>
<property name="hibernate.show_sql">true </property>
<property name="format_sql">true</property>
<property name="hibernate.show_sql">false </property>
<property name="format_sql">false</property>
<property name="Connection.useUnicode">true </property>
<property name="connection.characterEncoding">utf-8 </property>
<property name="hibernate.dialect">org.hibernate.dialect.MySQLDialect </property>