diff --git a/maven-install/java7-module.jar b/maven-install/java7-module.jar new file mode 100644 index 0000000..f2aaead Binary files /dev/null and b/maven-install/java7-module.jar differ diff --git a/maven-install/java7-module.pom b/maven-install/java7-module.pom new file mode 100644 index 0000000..0a8dc92 --- /dev/null +++ b/maven-install/java7-module.pom @@ -0,0 +1,12 @@ + + + 1.0 + com.otcaix.haflow + java7-module + jar + java7 + java7 module + http://www.baidu.com + 1.0 + \ No newline at end of file diff --git a/maven-install/oozie-client-3.3.2.jar b/maven-install/oozie-client-3.3.2.jar new file mode 100644 index 0000000..1c1b1a4 Binary files /dev/null and b/maven-install/oozie-client-3.3.2.jar differ diff --git a/maven-install/oozie-client-3.3.2.pom b/maven-install/oozie-client-3.3.2.pom new file mode 100644 index 0000000..b68b27e --- /dev/null +++ b/maven-install/oozie-client-3.3.2.pom @@ -0,0 +1,12 @@ + + + 3.3.2 + com.yahoo.oozie + oozie-client + jar + oozie + oozie client 3.3.2 + http://www.baidu.com + 3.3.2 + \ No newline at end of file diff --git a/maven-install/如何增加maven中没有的jar包.pdf b/maven-install/如何增加maven中没有的jar包.pdf new file mode 100644 index 0000000..aa22b47 Binary files /dev/null and b/maven-install/如何增加maven中没有的jar包.pdf differ diff --git a/src/main/java/core-site.xml b/src/main/java/core-site.xml new file mode 100644 index 0000000..6393781 --- /dev/null +++ b/src/main/java/core-site.xml @@ -0,0 +1,33 @@ + + + + + + + + hadoop.tmp.dir + /opt/hadooptmp + A base for other temporary directories. + + + + hadoop.log.dir + /opt/hadooplog + + + + fs.default.name + hdfs://m150:9000 + + + + + hadoop.proxyuser.root.hosts + * + + + hadoop.proxyuser.root.groups + * + + + diff --git a/src/main/java/haflow/hdfs/client/HdfsHelper.java b/src/main/java/haflow/hdfs/client/HdfsHelper.java new file mode 100644 index 0000000..7fd9ee7 --- /dev/null +++ b/src/main/java/haflow/hdfs/client/HdfsHelper.java @@ -0,0 +1,122 @@ +package haflow.hdfs.client; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +/** + * Read top 10 lines, from files on hadoop + * + * @author ZhaoWei + * + */ +public class HdfsHelper { + public static final String filePath = "hdfs://m150:9000/ztest/split/part-r-00000"; + public static final String newFile = "hdfs://m150:9000/ztest/test/part-r-00000"; + + + private FileSystem hdfs; + + public static void main(String[] args) throws IOException { + Configuration conf = new Configuration(); + + HdfsHelper reader = new HdfsHelper(conf); + reader.ReadFile(filePath); + } + + public HdfsHelper(Configuration conf){ + try { + hdfs = FileSystem.get(conf); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void ReadFile(String filePath) { + try { + FSDataInputStream stream = hdfs.open(new Path(filePath)); + BufferedReader br = new BufferedReader(new InputStreamReader(stream, + "UTF-8")); + String line = br.readLine(); + for (int i = 0; i < 15 && line != null; i++) { + System.out.println(line); + line = br.readLine(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + public FSDataOutputStream CreateFile(String FileName) { + try { + Path path = new Path(FileName); + FSDataOutputStream outputStream = hdfs.create(path); + return outputStream; + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + public List GetFileBolckHost(String FileName) { + try { + List list = new ArrayList(); + Path path = new Path(FileName); + FileStatus fileStatus = hdfs.getFileStatus(path); + + BlockLocation[] blkLocations = hdfs.getFileBlockLocations( + fileStatus, 0, fileStatus.getLen()); + + int blkCount = blkLocations.length; + for (int i = 0; i < blkCount; i++) { + String[] hosts = blkLocations[i].getHosts(); + list.add(hosts); + } + return list; + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + public void PutFile( String srcFile, String dstFile) { + try { + Path srcPath = new Path(srcFile); + Path dstPath = new Path(dstFile); + hdfs.copyFromLocalFile(srcPath, dstPath); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void ReadFile(Configuration conf, String FileName) { + try { + FileSystem hdfs = FileSystem.get(conf); + FSDataInputStream dis = hdfs.open(new Path(FileName)); + IOUtils.copyBytes(dis, System.out, 4096, false); + dis.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void closeHdfs(){ + try { + if( hdfs != null) + hdfs.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/haflow/hdfs/client/HdfsStaticHelper.java b/src/main/java/haflow/hdfs/client/HdfsStaticHelper.java new file mode 100644 index 0000000..fb61db1 --- /dev/null +++ b/src/main/java/haflow/hdfs/client/HdfsStaticHelper.java @@ -0,0 +1,112 @@ +package haflow.hdfs.client; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Progressable; + +public class HdfsStaticHelper { + + public static void main(String[] args) { + try { + uploadTohdfs(); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + }; + } + + public static void uploadTohdfs() throws FileNotFoundException, IOException { + String localSrc = "D://a.txt"; + String dst = "hdfs://m150:9000/ztest/test/a.txt"; + InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(URI.create(dst), conf); + OutputStream out = fs.create(new Path(dst), new Progressable() { + public void progress() { + System.out.println("."); + } + }); + System.out.println("ÉÏ´«Îļþ³É¹¦"); + IOUtils.copyBytes(in, out, 4096, true); + } + + /** ´ÓHDFSÉ϶ÁÈ¡Îļþ */ + public static void readHdfs() throws FileNotFoundException, IOException { + String dst = "hdfs://192.168.1.11:9000/usr/yujing/test.txt"; + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(URI.create(dst), conf); + FSDataInputStream hdfsInStream = fs.open(new Path(dst)); + + OutputStream out = new FileOutputStream("d:/qq-hdfs.txt"); + byte[] ioBuffer = new byte[1024]; + int readLen = hdfsInStream.read(ioBuffer); + + while (-1 != readLen) { + out.write(ioBuffer, 0, readLen); + readLen = hdfsInStream.read(ioBuffer); + } + System.out.println("¶ÁÎļþ³É¹¦"); + out.close(); + hdfsInStream.close(); + fs.close(); + } + + /** + * ÒÔappend·½Ê½½«ÄÚÈÝÌí¼Óµ½HDFSÉÏÎļþµÄĩβ;×¢Ò⣺Îļþ¸üУ¬ÐèÒªÔÚhdfs-site.xmlÖÐÌídfs. + * append.supporttrue + */ + public static void appendToHdfs() throws FileNotFoundException, IOException { + String dst = "hdfs://192.168.1.11:9000/usr/yujing/test.txt"; + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(URI.create(dst), conf); + FSDataOutputStream out = fs.append(new Path(dst)); + + int readLen = "zhangzk add by hdfs java api".getBytes().length; + + while (-1 != readLen) { + out.write("zhangzk add by hdfs java api".getBytes(), 0, readLen); + } + out.close(); + fs.close(); + } + + /** ´ÓHDFSÉÏɾ³ýÎļþ */ + public static void deleteFromHdfs() throws FileNotFoundException, + IOException { + String dst = "hdfs://192.168.1.11:9000/usr/yujing"; + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(URI.create(dst), conf); + fs.deleteOnExit(new Path(dst)); + fs.close(); + } + + /** ±éÀúHDFSÉϵÄÎļþºÍĿ¼ */ + public static void getDirectoryFromHdfs() throws FileNotFoundException, + IOException { + String dst = "hdfs://192.168.1.11:9000/usr/yujing"; + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(URI.create(dst), conf); + FileStatus fileList[] = fs.listStatus(new Path(dst)); + int size = fileList.length; + for (int i = 0; i < size; i++) { + System.out.println("ÎļþÃûname:" + fileList[i].getPath().getName() + + "Îļþ´óС/t/tsize:" + fileList[i].getLen()); + } + fs.close(); + } +} diff --git a/src/main/java/haflow/hdfs/client/TestClassPath.java b/src/main/java/haflow/hdfs/client/TestClassPath.java new file mode 100644 index 0000000..5ad5941 --- /dev/null +++ b/src/main/java/haflow/hdfs/client/TestClassPath.java @@ -0,0 +1,14 @@ +package haflow.hdfs.client; + +import haflow.module.general.JavaModule; + +import java.io.File; + + +public class TestClassPath { + public static void main(String[] args) { + String path = JavaModule.class.getProtectionDomain().getCodeSource().getLocation().getFile(); + File jarFile = new File(path); + System.out.println(path); + } +} diff --git a/src/main/java/haflow/module/general/DemoJavaMain.java b/src/main/java/haflow/module/general/DemoJavaMain.java deleted file mode 100644 index d906b84..0000000 --- a/src/main/java/haflow/module/general/DemoJavaMain.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package haflow.module.general; - -public class DemoJavaMain { - public static void main(String[] args) { - System.out.println("Demo Java Main"); - - System.out.println("# Arguments: " + args.length); - for (int i = 0; i < args.length; i++) { - System.out.println("Argument[" + i + "]: " + args[i]); - } - } -} diff --git a/src/main/java/haflow/module/general/JavaModule.java b/src/main/java/haflow/module/general/JavaModule.java deleted file mode 100644 index e756b98..0000000 --- a/src/main/java/haflow/module/general/JavaModule.java +++ /dev/null @@ -1,69 +0,0 @@ -package haflow.module.general; - -import haflow.module.Module; -import haflow.module.ModuleConfiguration; -import haflow.module.ModuleMetadata; - -import java.io.IOException; -import java.io.StringReader; -import java.util.Map; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - -import org.w3c.dom.Document; -import org.xml.sax.InputSource; -import org.xml.sax.SAXException; - -@Module(id = "5da600a8-aa63-968a-ca46-9085e0e0bd2e", name = "Java", category = "General") -@ModuleConfiguration(configurationKeys = { "eee" }, configurationDisplayNames = { "fff" }) -public class JavaModule implements ModuleMetadata { - - public Document generate(Map configurations) { - String name = configurations.get("name"); - String eee = configurations.get("eee"); - String ok = configurations.get("ok"); - - Class moduleClass = this.getClass(); - assert (moduleClass.isAnnotationPresent(haflow.module.Module.class)); -// String moduleName = moduleClass.getAnnotation(haflow.module.Module.class).name(); - - String actionXML = - "" + "\n" + - "" + "\n" + - "${jobTracker}" + "\n" + - "${nameNode}" + "\n" + - "" + "\n" + - "" + "\n" + - "mapred.job.queue.name" + "\n" + - "${queueName}" + "\n" + - "" + "\n" + - "" + "\n" + - "" + DemoJavaMain.class.getName() + "" + "\n" +//TODO - "" + "-eee " + eee + "" + "\n" + - "" + "\n" + - "" + "\n" + - "" + "\n" + - ""; - - DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); - - try { - DocumentBuilder db = dbf.newDocumentBuilder(); - StringReader sr = new StringReader(actionXML); - InputSource is = new InputSource(sr); - Document doc = db.parse(is); - return doc; - } catch (SAXException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } catch (ParserConfigurationException e) { - e.printStackTrace(); - } - - return null; - } - -} diff --git a/src/main/java/haflow/service/FlowDeployService.java b/src/main/java/haflow/service/FlowDeployService.java new file mode 100644 index 0000000..77b5887 --- /dev/null +++ b/src/main/java/haflow/service/FlowDeployService.java @@ -0,0 +1,43 @@ +package haflow.service; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; + +import org.springframework.stereotype.Component; + +@Component +public class FlowDeployService { + + public long copyJarFile(File srcFile, File dstPath, String dstName){ + File dst = new File(dstPath, dstName); + if(!srcFile.exists()){ + System.out.println(srcFile.getAbsolutePath() + " do not exist!"); + }else if( !dstPath.exists()){ + System.out.println(dstPath.getAbsolutePath() + " do not exist!"); + }else if(dst.exists()){ + System.out.println(dst.getAbsolutePath() + " already exists!"); + }else{ + //start copy file + try { + @SuppressWarnings("resource") + FileChannel ifc = new FileInputStream(srcFile).getChannel(); + @SuppressWarnings("resource") + FileChannel ofc = new FileOutputStream(dst).getChannel(); + long inSize = ifc.size(); + ifc.transferTo(0, inSize, ofc); + ifc.close(); + ofc.close(); + return inSize; + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + return 0; + } +} diff --git a/src/main/java/haflow/service/FlowExecuteService.java b/src/main/java/haflow/service/FlowExecuteService.java index 94406b7..cc8f071 100644 --- a/src/main/java/haflow/service/FlowExecuteService.java +++ b/src/main/java/haflow/service/FlowExecuteService.java @@ -7,12 +7,14 @@ import haflow.flow.entity.Topological; import haflow.module.ModuleMetadata; import haflow.module.basic.EndModule; import haflow.module.basic.StartModule; +import haflow.module.general.JavaModule; import haflow.profile.NodeConfigurationProfile; import haflow.ui.model.RunFlowResultModel; import haflow.utility.ModuleLoader; import haflow.utility.SessionHelper; import haflow.utility.XmlHelper; +import java.io.File; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -74,6 +76,17 @@ public class FlowExecuteService { this.xmlHelper = xmlHelper; } + private FlowDeployService flowDeployService; + + public FlowDeployService getFlowDeployService() { + return flowDeployService; + } + + @Autowired + public void setFlowDeployService(FlowDeployService flowDeployService) { + this.flowDeployService = flowDeployService; + } + public RunFlowResultModel runFlow(UUID flowId) { RunFlowResultModel model = new RunFlowResultModel(); model.setFlowId(flowId); @@ -168,6 +181,21 @@ public class FlowExecuteService { sb.append("" + "\n"); System.out.println(sb.toString()); messageBuilder.append("Generated xml : \n" + sb.toString()); + + //deploy workflow + String flowName = flow.getName(); + for (Node node : nodes) { + Class module = moduleClasses.get(node.getModuleId() + .toString()); + String path = JavaModule.class.getProtectionDomain().getCodeSource().getLocation().getFile(); + if(path.endsWith(".jar")){ + File jarFile = new File(path); + File dstPath = new File("D:/haflow/flows/" + flowName); + System.out.println(path); + this.flowDeployService.copyJarFile(jarFile, dstPath, jarFile.getName()); + } + } + } session.close(); diff --git a/src/main/java/hdfs-site.xml b/src/main/java/hdfs-site.xml new file mode 100644 index 0000000..da3b88d --- /dev/null +++ b/src/main/java/hdfs-site.xml @@ -0,0 +1,17 @@ + + + + + + + + dfs.replication + 3 + default 3, can not be greater than the node count. + + + + dfs.data.dir + ${hadoop.tmp.dir}/dfs/data + +