Merge pull request #8 from justinliucs/master

Merge from base project.
This commit is contained in:
Zhen Tang 2013-06-07 20:39:23 -07:00
commit 1efad10641
15 changed files with 142 additions and 231 deletions

Binary file not shown.

View File

@ -1,33 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadooptmp</value>
<description>A base for other temporary directories.</description>
</property>
<property>
<name>hadoop.log.dir</name>
<value>/opt/hadooplog</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://m150:9000</value>
</property>
<!-- OOZIE -->
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
</configuration>

View File

@ -0,0 +1,63 @@
package haflow.configuration.helper;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Properties;
import org.springframework.stereotype.Component;
@Component
public class ConfigurationHelper {
public static final String WORKSPACE_HDFS= "workspace_hdfs";
public static final String WORKSPACE_LOCAL = "workspace_local";
public static final String FS_DEFAULT_NAME = "fs.default.name";
private Properties properties;
public ConfigurationHelper() {
this.loadClusterConf();
}
private void loadClusterConf() {
ClassLoader loader = ConfigurationHelper.class.getClassLoader();
URL url = loader.getResource("cluster.properties");
properties = new Properties();
try {
FileInputStream inputFile = new FileInputStream(url.getFile());
properties.load(inputFile);
inputFile.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public String getProperty(String key){
if( properties != null){
return properties.getProperty(key);
}
return null;
}
public static void main(String[] args) {
// ConfigurationHelper h = new ConfigurationHelper();
// Properties prp = h.loadClusterConf();
// for( Object key : prp.keySet()){
// System.out.println(key.toString() + " = " + prp.getProperty(key.toString()).toString());
// }
URLClassLoader classLoader = (URLClassLoader) Thread
.currentThread().getContextClassLoader();
URL[] urls = classLoader.getURLs();
for (URL url : urls) {
System.out.println(url.getPath());
}
}
}

View File

@ -1,5 +1,7 @@
package haflow.hdfs.client;
import haflow.configuration.helper.ConfigurationHelper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@ -13,23 +15,38 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HdfsHelper {
private FileSystem hdfs;
private ConfigurationHelper clusterConfHelper;
public ConfigurationHelper getClusterConfHelper() {
return clusterConfHelper;
}
public HdfsHelper() {
public void setClusterConfHelper(ConfigurationHelper clusterConfHelper) {
this.clusterConfHelper = clusterConfHelper;
}
@Autowired
public HdfsHelper(ConfigurationHelper clusterConfHelper) {
this.clusterConfHelper = clusterConfHelper;
try {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://m150:9000");
conf.set(ConfigurationHelper.FS_DEFAULT_NAME,
this.clusterConfHelper
.getProperty(ConfigurationHelper.FS_DEFAULT_NAME));
// conf.set("fs.default.name", "hdfs://m150:9000");
hdfs = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public boolean putFile(String srcFile, String dstFile) {
try {
Path srcPath = new Path(srcFile);
@ -110,10 +127,10 @@ public class HdfsHelper {
}
public static void main(String[] args) throws IOException {
final String filePath = "hdfs://m150:9000/ztest/split/part-r-00000";
// final String filePath = "hdfs://m150:9000/ztest/split/part-r-00000";
// final String newFile = "hdfs://m150:9000/ztest/test/part-r-00000";
HdfsHelper reader = new HdfsHelper();
reader.readFile(filePath);
//
// HdfsHelper reader = new HdfsHelper();
// reader.readFile(filePath);
}
}

View File

@ -1,131 +0,0 @@
package haflow.hdfs.client;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class HdfsStaticHelper {
public static void main(String[] args) {
// try {
// uploadTohdfs();
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// } catch (IOException e) {
// e.printStackTrace();
// };
PutFile("D:/haflow/flows/NewFlow", "hdfs://m150:9000/ztest/test1");
}
public static void uploadTohdfs() throws FileNotFoundException, IOException {
String localSrc = "D:/haflow/flows/NewFlow/workflow.xml";
String dst = "hdfs://m150:9000/ztest/test/newflow";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.println(".");
}
});
System.out.println("finised uploading!");
IOUtils.copyBytes(in, out, 4096, true);
}
public static void PutFile( String srcFile, String dstFile) {
FileSystem hdfs;
Configuration conf = new Configuration();
try {
hdfs = FileSystem.get(conf);
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void readHdfs() throws FileNotFoundException, IOException {
String dst = "hdfs://192.168.1.11:9000/usr/yujing/test.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
FSDataInputStream hdfsInStream = fs.open(new Path(dst));
OutputStream out = new FileOutputStream("d:/qq-hdfs.txt");
byte[] ioBuffer = new byte[1024];
int readLen = hdfsInStream.read(ioBuffer);
while (-1 != readLen) {
out.write(ioBuffer, 0, readLen);
readLen = hdfsInStream.read(ioBuffer);
}
System.out.println("finished reading!");
out.close();
hdfsInStream.close();
fs.close();
}
/**
* 以append方式将内容添加到HDFS上文件的末尾;注意文件更新需要在hdfs-site.xml中添
* <property>
* <name>dfs.append.support</name>
* <value>true</value>
* </property>
*/
public static void appendToHdfs() throws FileNotFoundException, IOException {
String dst = "hdfs://192.168.1.11:9000/usr/yujing/test.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
FSDataOutputStream out = fs.append(new Path(dst));
int readLen = "zhangzk add by hdfs java api".getBytes().length;
while (-1 != readLen) {
out.write("zhangzk add by hdfs java api".getBytes(), 0, readLen);
}
out.close();
fs.close();
}
public static void deleteFromHdfs() throws FileNotFoundException,
IOException {
String dst = "hdfs://192.168.1.11:9000/usr/yujing";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
fs.deleteOnExit(new Path(dst));
fs.close();
}
/** 遍历HDFS上的文件和目录 */
public static void getDirectoryFromHdfs() throws FileNotFoundException,
IOException {
String dst = "hdfs://192.168.1.11:9000/usr/yujing";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
FileStatus fileList[] = fs.listStatus(new Path(dst));
int size = fileList.length;
for (int i = 0; i < size; i++) {
System.out.println("文件名name:" + fileList[i].getPath().getName()
+ "文件大小/t/tsize:" + fileList[i].getLen());
}
fs.close();
}
}

View File

@ -16,7 +16,7 @@ import haflow.module.Module;
import haflow.module.ModuleConfiguration;
import haflow.module.ModuleMetadata;
@Module(id = "70d027c3-a4bd-61b5-5063-134ff71f8122", name = "End", category = "Basic")
@Module(id = "a0d027c3-a4bd-61b5-5063-134ff71f8122", name = "End", category = "Basic")
@ModuleConfiguration(configurationKeys = { "ccc" }, configurationDisplayNames = { "ddd" })
public class EndModule implements ModuleMetadata {

View File

@ -19,7 +19,7 @@ import javax.xml.transform.stream.StreamResult;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
@Module(id = "9208d7d2-a8ff-2493-64c2-36f50bc95752", name = "Start", category = "Basic")
@Module(id = "a208d7d2-a8ff-2493-64c2-36f50bc95752", name = "Start", category = "Basic")
@ModuleConfiguration(configurationKeys = { "aaa" }, configurationDisplayNames = { "bbb" })
public class StartModule implements ModuleMetadata {

View File

@ -25,7 +25,7 @@ import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
@Module(id = "9208d7d2-a8ff-2493-64c2-36f50bc95754", name = "Preprocess", category = "zrace")
@Module(id = "a208d7d2-a8ff-2493-64c2-36f50bc95754", name = "Preprocess", category = "zrace")
@ModuleConfiguration(configurationKeys = { "input_path", "output_path" }, configurationDisplayNames = { "input path", "output path" })
public class ZracePreprocess implements ModuleMetadata{
public static void main(String[] args) {

View File

@ -1,5 +1,6 @@
package haflow.service;
import haflow.configuration.helper.ConfigurationHelper;
import haflow.entity.Flow;
import haflow.entity.Node;
import haflow.flow.entity.Digraph;
@ -88,6 +89,17 @@ public class FlowExecuteService {
this.flowDeployService = flowDeployService;
}
private ConfigurationHelper clusterConfHelper;
public ConfigurationHelper getClusterConfHelper() {
return clusterConfHelper;
}
@Autowired
public void setClusterConfHelper(ConfigurationHelper clusterConfHelper) {
this.clusterConfHelper = clusterConfHelper;
}
private HdfsHelper hdfsHelper;
public HdfsHelper getHdfsHelper() {
@ -110,8 +122,9 @@ public class FlowExecuteService {
this.oozieHelper = oozieHelper;
}
private final String workspace_local = "D:/haflow/flows/";
private final String workspace_hdfs = "hdfs://m150:9000/user/root/examples/apps/";
// private final String workspace_local = "D:/haflow/flows/";
// private final String workspace_hdfs =
// "hdfs://m150:9000/user/root/examples/apps/";
public RunFlowResultModel runFlow(UUID flowId) {
RunFlowResultModel model = new RunFlowResultModel();
@ -132,13 +145,7 @@ public class FlowExecuteService {
try {
Map<String, Class<?>> moduleClasses = this.getModuleLoader()
.searchForModuleClasses();
// Map<String, Class<?>> moduleClasses = new HashMap<String, Class<?>>();
// moduleClasses.put("9208d7d2-a8ff-2493-64c2-36f50bc95752", StartModule.class);
// moduleClasses.put("b0d027c3-a4bd-61b5-5063-134ff71f8123", KillModule.class);
// moduleClasses.put("70d027c3-a4bd-61b5-5063-134ff71f8122", EndModule.class);
// moduleClasses.put("5da600a8-aa63-968a-ca46-9085e0e0bd2e", JavaModule.class);
Set<Node> nodes = flow.getNodes();
messageBuilder.append("Start parsing flow ..." + "\n");
@ -161,48 +168,54 @@ public class FlowExecuteService {
messageBuilder.append("Parsing flow ... Finised" + "\n");
messageBuilder.append("Start deploying flow ..." + "\n");
String localDeployPath = workspace_local + flowName;
String localDeployPath = this.clusterConfHelper
.getProperty(ConfigurationHelper.WORKSPACE_LOCAL)
+ flowName;
boolean deloyedLocally = this.flowDeployService
.deployFlowLocal(localDeployPath, workflowXml,
getJarPaths(nodes, moduleClasses));
if (deloyedLocally) {
messageBuilder.append(flowName
+ " has been deployed locally!");
String hdfsDeployPath = workspace_hdfs + flowName;
// System.out.println("hdfs deploy path " + hdfsDeployPath );
boolean deleted = this.hdfsHelper.deleteFolder(hdfsDeployPath);
if( deleted){
messageBuilder.append("Old folder deleted: " + hdfsDeployPath);
+ " has been deployed locally!" + "\n");
String hdfsDeployPath = this.clusterConfHelper
.getProperty(ConfigurationHelper.WORKSPACE_HDFS)
+ flowName;
boolean deleted = this.hdfsHelper
.deleteFolder(hdfsDeployPath);
if (deleted) {
messageBuilder.append("Old folder deleted: "
+ hdfsDeployPath + "\n");
}
boolean deployedToHdfs = this.hdfsHelper.putFile(
localDeployPath, hdfsDeployPath);
if (deployedToHdfs) {
messageBuilder.append(flowName
+ " has been uploaded to hdfs!");
+ " has been uploaded to hdfs!" + "\n");
String jobId = this.oozieHelper.runJob(flowName);
if (jobId == null) {
messageBuilder.append("Failed to commit job: "
+ flowName);
+ flowName + "\n");
} else {
messageBuilder
.append("Job commited! Job id : " + jobId);
messageBuilder.append("Job commited! Job id : "
+ jobId + "\n");
model.setCommited(true);
model.setJobId(jobId);
}
} else {
messageBuilder.append(flowName
+ " failed to be uploaded to hdfs!");
+ " failed to be uploaded to hdfs!" + "\n");
}
} else {
messageBuilder.append(flowName
+ " failed to be deployed locally!");
+ " failed to be deployed locally!" + "\n");
}
}
}
System.out.println(messageBuilder.toString());
session.close();
} catch (Exception e) {
e.printStackTrace();
@ -240,7 +253,8 @@ public class FlowExecuteService {
return startNodes;
}
private void replaceEndNode(List<Integer> sorted, Map<String, Class<?>> moduleClasses, Digraph graph){
private void replaceEndNode(List<Integer> sorted,
Map<String, Class<?>> moduleClasses, Digraph graph) {
for (int i = 0; i < sorted.size(); i++) {// move end node to the end
int w = sorted.get(i);
Node node = graph.getNode(w);
@ -256,7 +270,7 @@ public class FlowExecuteService {
}
}
}
private String genWorkflowXml(String flowName, List<Integer> sorted,
Map<String, Class<?>> moduleClasses, Digraph graph)
throws InstantiationException, IllegalAccessException {
@ -266,9 +280,9 @@ public class FlowExecuteService {
+ flowName + "\">" + "\n");
this.replaceEndNode(sorted, moduleClasses, graph);
for (int i = 0; i < sorted.size(); i++) {// generate xml
if (i == sorted.size() - 1) {//inject kill node
if (i == sorted.size() - 1) {// inject kill node
workflowXml
.append("<kill name=\"fail\">"
+ "<message>Work flow failed, "
@ -281,7 +295,7 @@ public class FlowExecuteService {
.toString());
ModuleMetadata module = (ModuleMetadata) moduleClass.newInstance();
Map<String, String> configurations = new HashMap<String, String>();
configurations.put("name", node.getId().toString());
configurations.put("name", node.getName());
List<NodeConfigurationProfile> ncps = this
.getNodeConfigurationProfileService()
.getNodeConfigurationProfile(node.getId());
@ -294,8 +308,7 @@ public class FlowExecuteService {
List<Integer> adj = graph.getAdj(w);
for (int v : adj) {
if (sorted.contains(v)) {
configurations.put("ok", graph.getNode(v).getId()
.toString());
configurations.put("ok", graph.getNode(v).getName());
break;// TODO
}
}
@ -307,9 +320,4 @@ public class FlowExecuteService {
return workflowXml.toString();
}
public static void main(String[] args) {
FlowExecuteService flowExecuteService = new FlowExecuteService();
flowExecuteService.runFlow(UUID
.fromString("67f1811e-c2b3-4a32-b70a-32f486a0a947"));
}
}

View File

@ -31,7 +31,7 @@ public class ModuleLoader {
Map<Module, ModuleMetadata> modules = new HashMap<Module, ModuleMetadata>();
List<String> classNames = this.getClassHelper().getClassNames(
"haflow", true);
// classNames.addAll(this.getClassHelper().getClassNames("hmodule", true));
classNames.addAll(this.getClassHelper().getClassNames("hmodule", true));
for (String className : classNames) {
Class<?> moduleClass = Class.forName(className);
if (moduleClass.isAnnotationPresent(haflow.module.Module.class)) {
@ -79,7 +79,7 @@ public class ModuleLoader {
Map<String, Class<?>> moduleClasses = new HashMap<String, Class<?>>();
List<String> classNames = this.getClassHelper().getClassNames(
"haflow", true);
// classNames.addAll(this.getClassHelper().getClassNames("hmodule", true));
classNames.addAll(this.getClassHelper().getClassNames("hmodule", true));
for (String className : classNames) {
Class<?> moduleClass = Class.forName(className);
if (moduleClass.isAnnotationPresent(haflow.module.Module.class)) {

View File

@ -1,17 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
<description>default 3, can not be greater than the node count.</description>
</property>
<property>
<name>dfs.data.dir</name>
<value>${hadoop.tmp.dir}/dfs/data</value>
</property>
</configuration>

View File

@ -0,0 +1,4 @@
workspace_local = D:/haflow/flows/
workspace_hdfs = hdfs://m150:9000/user/root/examples/apps/
fs.default.name = hdfs://m150:9000

View File

@ -401,7 +401,7 @@ HAFlow.Main.prototype.doAddModule = function(instance, flowId, moduleId, left,
newNode["id"] = id;
newNode["flowId"] = flowId;
newNode["moduleId"] = moduleId;
newNode["name"] = "New Node " + id;
newNode["name"] = "NewNode-" + id;
newNode["position"] = {};
newNode.position["left"] = left;
newNode.position["top"] = top;