update gather_program

This commit is contained in:
ronger 2016-10-29 18:34:29 +08:00
parent 4095c893ca
commit 41f26c956e
1 changed files with 363 additions and 363 deletions

View File

@ -1,363 +1,363 @@
package org.ossean.gather.process; package org.ossean.gather.process;
import java.util.List; import java.util.List;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.ossean.gather.model.Configure; import org.ossean.gather.model.Configure;
import org.ossean.gather.model.GatherProject; import org.ossean.gather.model.GatherProject;
import org.ossean.gather.model.JobRequirement; import org.ossean.gather.model.JobRequirement;
import org.ossean.gather.model.PKControlPosts; import org.ossean.gather.model.PKControlPosts;
import org.ossean.gather.model.PKControlProjects; import org.ossean.gather.model.PKControlProjects;
import org.ossean.gather.model.RelativeMemo; import org.ossean.gather.model.RelativeMemo;
import org.ossean.gather.model.Taggings; import org.ossean.gather.model.Taggings;
import org.ossean.gather.sourceDao.GatherDao; import org.ossean.gather.sourceDao.GatherDao;
import org.ossean.gather.sourceDao.PKControlPostsDao; import org.ossean.gather.sourceDao.PKControlPostsDao;
import org.ossean.gather.sourceDao.PKControlProjectsDao; import org.ossean.gather.sourceDao.PKControlProjectsDao;
import org.ossean.gather.targetDao.PointerDao; import org.ossean.gather.targetDao.PointerDao;
import org.ossean.gather.targetDao.TargetDao; import org.ossean.gather.targetDao.TargetDao;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component("gatherThread") @Component("gatherThread")
@Scope("prototype") @Scope("prototype")
public class GatherThread implements Runnable { public class GatherThread implements Runnable {
private static Logger logger = Logger.getLogger(GatherThread.class); private static Logger logger = Logger.getLogger(GatherThread.class);
private Configure conf; private Configure conf;
@Resource @Resource
private GatherDao gatherDao; private GatherDao gatherDao;
@Resource @Resource
private PointerDao pointerDao; private PointerDao pointerDao;
@Resource @Resource
private PKControlPostsDao pkControlPostsDao; private PKControlPostsDao pkControlPostsDao;
@Resource @Resource
private TargetDao targetDao; private TargetDao targetDao;
@Resource @Resource
private PKControlProjectsDao pkControlProjectsDao; private PKControlProjectsDao pkControlProjectsDao;
private int idsBegin; // 转移开始Id值 private int idsBegin; // 转移开始Id值
private int idsEnd; // 转移结束Id值 private int idsEnd; // 转移结束Id值
private int idsIncrement;// 每次转移的Id量 private int idsIncrement;// 每次转移的Id量
private int beginId; private int beginId;
private int endId; private int endId;
private String sourceTableName; private String sourceTableName;
private String pkControlPostsTableName = "pk_control_posts"; private String pkControlPostsTableName = "pk_control_posts";
private String pkControlProjectsTableName = "pk_control_projects"; private String pkControlProjectsTableName = "pk_control_projects";
private String taggingsTableName = "taggings"; private String taggingsTableName = "taggings";
private String tagsTableName = "tags"; private String tagsTableName = "tags";
private String gatherPostsTableName = "relative_memos"; private String gatherPostsTableName = "relative_memos";
private int maxId; private int maxId;
public void setParameters(Configure conf, String sourceTableName) { public void setParameters(Configure conf, String sourceTableName) {
this.conf = conf; this.conf = conf;
this.sourceTableName = sourceTableName; this.sourceTableName = sourceTableName;
} }
// 读指针 // 读指针
public int readPointer(String table, String source, String target) { public int readPointer(String table, String source, String target) {
int pointer = 1; int pointer = 1;
try { try {
pointer = pointerDao.getPointer(table, source, target); pointer = pointerDao.getPointer(table, source, target);
} catch (Exception e) { } catch (Exception e) {
// 表示表中没有数据 // 表示表中没有数据
logger.info("No such pointer! Create one"); logger.info("No such pointer! Create one");
pointerDao.insertPointer(table, source, target, 1); pointerDao.insertPointer(table, source, target, 1);
} }
return pointer; return pointer;
} }
@Override @Override
public void run() { public void run() {
// long start = System.currentTimeMillis(); // long start = System.currentTimeMillis();
Thread.currentThread().setName(sourceTableName); Thread.currentThread().setName(sourceTableName);
idsIncrement = conf.getIdsIncrement(); idsIncrement = conf.getIdsIncrement();
idsBegin = readPointer(conf.getPointerTableName(), sourceTableName, idsBegin = readPointer(conf.getPointerTableName(), sourceTableName,
conf.getTargetTableName()); conf.getTargetTableName());
idsEnd = maxId = gatherDao.getMaxId(sourceTableName); idsEnd = maxId = gatherDao.getMaxId(sourceTableName);
while (idsBegin < idsEnd) { while (idsBegin < idsEnd) {
beginId = idsBegin; beginId = idsBegin;
endId = beginId + idsIncrement - 1; // 取数据时两边都取等号 endId = beginId + idsIncrement - 1; // 取数据时两边都取等号
if (endId <= maxId) { if (endId <= maxId) {
handleBatchData(beginId, endId, conf); handleBatchData(beginId, endId, conf);
idsBegin = idsBegin + idsIncrement; idsBegin = idsBegin + idsIncrement;
} else { } else {
endId = maxId; // endId应小于maxId endId = maxId; // endId应小于maxId
handleBatchData(beginId, endId, conf); handleBatchData(beginId, endId, conf);
break; break;
} }
} }
GatherProcess.gatherState.put(sourceTableName, false); GatherProcess.gatherState.put(sourceTableName, false);
// long end = System.currentTimeMillis(); // long end = System.currentTimeMillis();
// logger.info((end - start) / 6000); // logger.info((end - start) / 6000);
} }
public void handleBatchData(int beginId, int endId, Configure conf) { public void handleBatchData(int beginId, int endId, Configure conf) {
logger.info("BeginId#" + sourceTableName + ":" + beginId); logger.info("BeginId#" + sourceTableName + ":" + beginId);
// 表示任务没有完成 // 表示任务没有完成
int maxId = gatherDao.getMaxId(sourceTableName); int maxId = gatherDao.getMaxId(sourceTableName);
// 防止转移超过当前最大值的Id数据 // 防止转移超过当前最大值的Id数据
if (beginId >= 0 && endId > 0 && maxId >= endId) { if (beginId >= 0 && endId > 0 && maxId >= endId) {
// 更新执行开始时间 // 更新执行开始时间
logger.info("begin gathering..."); logger.info("begin gathering...");
// 插入Id段数据忽略重复值 // 插入Id段数据忽略重复值
try { try {
String[] sourceFields = conf.getSourceFields().split(","); String[] sourceFields = conf.getSourceFields().split(",");
String[] targetFields = conf.getTargetFields().split(","); String[] targetFields = conf.getTargetFields().split(",");
String selectItems = ""; String selectItems = "";
for (int i = 0; i < sourceFields.length; i++) { for (int i = 0; i < sourceFields.length; i++) {
String str_source = sourceFields[i]; String str_source = sourceFields[i];
String str_target = targetFields[i]; String str_target = targetFields[i];
selectItems += str_source + " as " + str_target + ","; selectItems += str_source + " as " + str_target + ",";
} }
selectItems = selectItems selectItems = selectItems
.substring(0, selectItems.length() - 1) + " "; .substring(0, selectItems.length() - 1) + " ";
if (conf.getTargetTableName().equals("relative_memos")) { if (conf.getTargetTableName().equals("relative_memos")) {
List<RelativeMemo> dataGet = gatherDao.getPostGatherData( List<RelativeMemo> dataGet = gatherDao.getPostGatherData(
sourceTableName, selectItems, beginId, endId, sourceTableName, selectItems, beginId, endId,
conf.getAndWhere()); conf.getAndWhere());
for (int i = 0; i < dataGet.size(); i++) { for (int i = 0; i < dataGet.size(); i++) {
RelativeMemo model = dataGet.get(i); RelativeMemo model = dataGet.get(i);
String urlMD5 = model.getUrl_md5();// 通过urlMD5判断是不是已经存在该帖子 String urlMD5 = model.getUrl_md5();// 通过urlMD5判断是不是已经存在该帖子
// 是否更新 // 是否更新
int postId = 0; int postId = 0;
if(GatherProcess.urlMd5Set.contains(urlMD5)){ if(GatherProcess.urlMd5Set.contains(urlMD5)){
//urlmd5存在则更新 //urlmd5存在则更新
RelativeMemo samePost = targetDao.findPostByUrlMD5( RelativeMemo samePost = targetDao.findPostByUrlMD5(
conf.getTargetTableName(), urlMD5); conf.getTargetTableName(), urlMD5);
// update gather_projects表中对应的记录在维持待更新表 // update gather_projects表中对应的记录在维持待更新表
postId = samePost.getId(); postId = samePost.getId();
model.setId(postId); model.setId(postId);
handleUpdateGatherPosts(samePost.getId(), model); handleUpdateGatherPosts(samePost.getId(), model);
}else{ }else{
// 不存在 插入 // 不存在 插入
PKControlPosts pkControlModel = pkControlPostsDao PKControlPosts pkControlModel = pkControlPostsDao
.selectItemByUrlMD5( .selectItemByUrlMD5(
pkControlPostsTableName, urlMD5);// 查看有没有固定的id pkControlPostsTableName, urlMD5);// 查看有没有固定的id
if (pkControlModel != null) if (pkControlModel != null)
model.setId(pkControlModel.getId()); model.setId(pkControlModel.getId());
else { else {
// 在pk_control_posts表中生成当前项目对应的id // 在pk_control_posts表中生成当前项目对应的id
pkControlPostsDao.insertOneItem( pkControlPostsDao.insertOneItem(
pkControlPostsTableName, urlMD5); pkControlPostsTableName, urlMD5);
// 查看刚刚插入信息的id // 查看刚刚插入信息的id
PKControlPosts controlItem = pkControlPostsDao PKControlPosts controlItem = pkControlPostsDao
.selectItemByUrlMD5( .selectItemByUrlMD5(
pkControlPostsTableName, urlMD5); pkControlPostsTableName, urlMD5);
// 用id构造model对应的固定不变的id // 用id构造model对应的固定不变的id
model.setId(controlItem.getId()); model.setId(controlItem.getId());
postId = model.getId(); postId = model.getId();
} }
handleInsertGatherPosts(model, conf); handleInsertGatherPosts(model, conf);
GatherProcess.urlMd5Set.add(urlMD5); GatherProcess.urlMd5Set.add(urlMD5);
} }
// 将tag和项目的关系存入表item_tag_relation 并分离tag // 将tag和项目的关系存入表item_tag_relation 并分离tag
String tags = model.getTags(); String tags = model.getTags();
if (tags == null) { if (tags == null) {
// 表示该项目没有标签 // 表示该项目没有标签
continue; continue;
} }
List<String> tagList = DataHandler List<String> tagList = DataHandler
.tagsSegmentation(tags); .tagsSegmentation(tags);
for (String tag : tagList) { for (String tag : tagList) {
targetDao.insertTag(tagsTableName, tag);// ignore方式插入该项目的标签 targetDao.insertTag(tagsTableName, tag);// ignore方式插入该项目的标签
int tag_id = targetDao.selectTagIdByName( int tag_id = targetDao.selectTagIdByName(
tagsTableName, tag); tagsTableName, tag);
Taggings taggings = new Taggings(); Taggings taggings = new Taggings();
taggings.setTag_id(tag_id); taggings.setTag_id(tag_id);
taggings.setTaggable_id(postId); taggings.setTaggable_id(postId);
taggings.setTaggable_type("RelativeMemo"); taggings.setTaggable_type("RelativeMemo");
taggings.setContext("tags"); taggings.setContext("tags");
taggings.setCreated_at(DataHandler.getNow()); taggings.setCreated_at(DataHandler.getNow());
// 将Taggings对象存入数据库中 // 将Taggings对象存入数据库中
try { try {
targetDao.insertTaggings(taggingsTableName, targetDao.insertTaggings(taggingsTableName,
taggings); taggings);
} catch (Exception e) { } catch (Exception e) {
// 在插入记录之前 relative_memos表中的记录已经被删除掉了 // 在插入记录之前 relative_memos表中的记录已经被删除掉了
logger.error(e); logger.error(e);
System.exit(0); System.exit(0);
} }
} }
} }
} else if (conf.getTargetTableName().equals("gather_projects")) { } else if (conf.getTargetTableName().equals("gather_projects")) {
List<GatherProject> dataGet = gatherDao.getPrjGatherData( List<GatherProject> dataGet = gatherDao.getPrjGatherData(
sourceTableName, selectItems, beginId, endId, sourceTableName, selectItems, beginId, endId,
conf.getAndWhere()); conf.getAndWhere());
for (int i = 0; i < dataGet.size(); i++) { for (int i = 0; i < dataGet.size(); i++) {
GatherProject model = dataGet.get(i); GatherProject model = dataGet.get(i);
String urlMD5 = model.getUrl_md5();// 通过urlMD5判断是不是已经存在该项目 String urlMD5 = model.getUrl_md5();// 通过urlMD5判断是不是已经存在该项目
// 是否更新 // 是否更新
int prjId = 0; int prjId = 0;
if(GatherProcess.urlMd5Set.contains(urlMD5)){ if(GatherProcess.urlMd5Set.contains(urlMD5)){
GatherProject samePrj = targetDao.findPrjByUrlMD5( GatherProject samePrj = targetDao.findPrjByUrlMD5(
conf.getTargetTableName(), urlMD5); conf.getTargetTableName(), urlMD5);
// update gather_projects表中对应的记录在维持待更新表 // update gather_projects表中对应的记录在维持待更新表
prjId = samePrj.getId(); prjId = samePrj.getId();
model.setId(prjId); model.setId(prjId);
model.setUpdate_mark(1); model.setUpdate_mark(2);
handleUpdateGatherProjects(samePrj.getId(), model); handleUpdateGatherProjects(samePrj.getId(), model);
}else{ }else{
// 不存在 插入 // 不存在 插入
PKControlProjects pkControlProjects = pkControlProjectsDao PKControlProjects pkControlProjects = pkControlProjectsDao
.selectItemByUrlMD5( .selectItemByUrlMD5(
pkControlProjectsTableName, urlMD5);// 查看有没有固定的id pkControlProjectsTableName, urlMD5);// 查看有没有固定的id
if (pkControlProjects != null) if (pkControlProjects != null)
model.setId(pkControlProjects.getId()); model.setId(pkControlProjects.getId());
else { else {
// 在pk_control_posts表中生成当前项目对应的id // 在pk_control_posts表中生成当前项目对应的id
pkControlProjectsDao.insertOneItem( pkControlProjectsDao.insertOneItem(
pkControlProjectsTableName, urlMD5); pkControlProjectsTableName, urlMD5);
// 查看刚刚插入信息的id // 查看刚刚插入信息的id
PKControlProjects controlItem = pkControlProjectsDao PKControlProjects controlItem = pkControlProjectsDao
.selectItemByUrlMD5( .selectItemByUrlMD5(
pkControlProjectsTableName, pkControlProjectsTableName,
urlMD5); urlMD5);
// 用id构造model对应的固定不变的id // 用id构造model对应的固定不变的id
model.setId(controlItem.getId()); model.setId(controlItem.getId());
prjId = model.getId(); prjId = model.getId();
} }
model.setUpdate_mark(0); model.setUpdate_mark(0);
handleInsertGatherProjects(model, conf); handleInsertGatherProjects(model, conf);
GatherProcess.urlMd5Set.add(urlMD5); GatherProcess.urlMd5Set.add(urlMD5);
} }
// // 将tag和项目的关系存入表item_tag_relation 并分离tag // // 将tag和项目的关系存入表item_tag_relation 并分离tag
// String tags = model.getTags(); // String tags = model.getTags();
// if (tags == null) { // if (tags == null) {
// // 表示该项目没有标签 // // 表示该项目没有标签
// continue; // continue;
// } // }
// List<String> tagList = DataHandler // List<String> tagList = DataHandler
// .tagsSegmentation(tags); // .tagsSegmentation(tags);
// for (String tag : tagList) { // for (String tag : tagList) {
// targetDao.insertTag(tagsTableName, tag);// ignore方式插入该项目的标签 // targetDao.insertTag(tagsTableName, tag);// ignore方式插入该项目的标签
// int tag_id = targetDao.selectTagIdByName( // int tag_id = targetDao.selectTagIdByName(
// tagsTableName, tag); // tagsTableName, tag);
// Taggings taggings = new Taggings(); // Taggings taggings = new Taggings();
// taggings.setTag_id(tag_id); // taggings.setTag_id(tag_id);
// taggings.setTaggable_id(prjId); // taggings.setTaggable_id(prjId);
// taggings.setTaggable_type("OpenSourceProject"); // taggings.setTaggable_type("OpenSourceProject");
// taggings.setContext("tags"); // taggings.setContext("tags");
// taggings.setCreated_at(DataHandler.getNow()); // taggings.setCreated_at(DataHandler.getNow());
// // 将Taggings对象存入数据库中 // // 将Taggings对象存入数据库中
// try { // try {
// targetDao.insertTaggings(taggingsTableName, // targetDao.insertTaggings(taggingsTableName,
// taggings); // taggings);
// } catch (Exception e) { // } catch (Exception e) {
// // 在插入记录之前 relative_memos表中的记录已经被删除掉了 // // 在插入记录之前 relative_memos表中的记录已经被删除掉了
// logger.error(e); // logger.error(e);
// System.exit(0); // System.exit(0);
// } // }
// } // }
} }
} else { } else {
List<JobRequirement> dataGet = gatherDao.getJobGatherData( List<JobRequirement> dataGet = gatherDao.getJobGatherData(
sourceTableName, selectItems, beginId, endId, sourceTableName, selectItems, beginId, endId,
conf.getAndWhere()); conf.getAndWhere());
for (int i = 0; i < dataGet.size(); i++) { for (int i = 0; i < dataGet.size(); i++) {
JobRequirement model = dataGet.get(i); JobRequirement model = dataGet.get(i);
String urlMD5 = model.getUrl_md5();// 通过urlMD5判断是不是已经存在该帖子 String urlMD5 = model.getUrl_md5();// 通过urlMD5判断是不是已经存在该帖子
// 是否更新 // 是否更新
int postId = 0; int postId = 0;
if(GatherProcess.urlMd5Set.contains(urlMD5)){ if(GatherProcess.urlMd5Set.contains(urlMD5)){
JobRequirement sameJob = targetDao.findJobByUrlMD5( JobRequirement sameJob = targetDao.findJobByUrlMD5(
conf.getTargetTableName(), urlMD5); conf.getTargetTableName(), urlMD5);
// update gather_projects表中对应的记录在维持待更新表 // update gather_projects表中对应的记录在维持待更新表
postId = sameJob.getId(); postId = sameJob.getId();
model.setId(postId); model.setId(postId);
handleUpdateGatherJobs(sameJob.getId(), model); handleUpdateGatherJobs(sameJob.getId(), model);
}else{ }else{
// 不存在 插入 // 不存在 插入
PKControlPosts pkControlPosts = pkControlPostsDao PKControlPosts pkControlPosts = pkControlPostsDao
.selectItemByUrlMD5( .selectItemByUrlMD5(
pkControlPostsTableName, urlMD5);// 查看有没有固定的id pkControlPostsTableName, urlMD5);// 查看有没有固定的id
if (pkControlPosts != null) if (pkControlPosts != null)
model.setId(pkControlPosts.getId()); model.setId(pkControlPosts.getId());
else { else {
// 在pk_control_posts表中生成当前项目对应的id // 在pk_control_posts表中生成当前项目对应的id
pkControlPostsDao.insertOneItem( pkControlPostsDao.insertOneItem(
pkControlPostsTableName, urlMD5); pkControlPostsTableName, urlMD5);
// 查看刚刚插入信息的id // 查看刚刚插入信息的id
PKControlPosts controlItem = pkControlPostsDao PKControlPosts controlItem = pkControlPostsDao
.selectItemByUrlMD5( .selectItemByUrlMD5(
pkControlPostsTableName, urlMD5); pkControlPostsTableName, urlMD5);
// 用id构造model对应的固定不变的id // 用id构造model对应的固定不变的id
model.setId(controlItem.getId()); model.setId(controlItem.getId());
postId = model.getId(); postId = model.getId();
} }
handleInsertGatherJobs(model, conf); handleInsertGatherJobs(model, conf);
GatherProcess.urlMd5Set.add(urlMD5); GatherProcess.urlMd5Set.add(urlMD5);
} }
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
// 数据迁移过程可能发生异常情况 // 数据迁移过程可能发生异常情况
logger.error(ex); logger.error(ex);
System.exit(0); System.exit(0);
} }
// 更新游标到本次 EndId+1; // 更新游标到本次 EndId+1;
pointerDao.updatePointer(conf.getPointerTableName(), pointerDao.updatePointer(conf.getPointerTableName(),
sourceTableName, conf.getTargetTableName(), endId + 1);// sourceIdBegin sourceTableName, conf.getTargetTableName(), endId + 1);// sourceIdBegin
// + // +
// idsIncrement // idsIncrement
logger.info("current--" + sourceTableName + ": " + endId); logger.info("current--" + sourceTableName + ": " + endId);
} }
} }
// 处理URL不存在的帖子 插入relative_memos表 // 处理URL不存在的帖子 插入relative_memos表
public void handleInsertGatherPosts(RelativeMemo model, Configure conf) { public void handleInsertGatherPosts(RelativeMemo model, Configure conf) {
try { try {
targetDao.insertRelativeMemo(conf.getTargetTableName(), targetDao.insertRelativeMemo(conf.getTargetTableName(),
conf.getTargetFields(), model); conf.getTargetFields(), model);
} catch (Exception e) { } catch (Exception e) {
logger.error(e); logger.error(e);
} }
} }
// 处理URL相同的帖子更新 id表示更新的帖子固定id // 处理URL相同的帖子更新 id表示更新的帖子固定id
public void handleUpdateGatherPosts(int id, RelativeMemo model_new) { public void handleUpdateGatherPosts(int id, RelativeMemo model_new) {
targetDao.updateRelativeMemo(gatherPostsTableName, model_new, id);// 更新数据relative_memos表 targetDao.updateRelativeMemo(gatherPostsTableName, model_new, id);// 更新数据relative_memos表
} }
// 处理URL不存在的项目 插入gather_projects表 // 处理URL不存在的项目 插入gather_projects表
public void handleInsertGatherProjects(GatherProject model, Configure conf) { public void handleInsertGatherProjects(GatherProject model, Configure conf) {
try { try {
targetDao.insertOpenSourceProject(conf.getTargetTableName(), targetDao.insertOpenSourceProject(conf.getTargetTableName(),
conf.getTargetFields(), model); conf.getTargetFields(), model);
} catch (Exception e) { } catch (Exception e) {
logger.error(e); logger.error(e);
} }
} }
// 处理URL相同的项目更新 id表示更新的项目固定id // 处理URL相同的项目更新 id表示更新的项目固定id
public void handleUpdateGatherProjects(int id, GatherProject model_new) { public void handleUpdateGatherProjects(int id, GatherProject model_new) {
targetDao.updateOpenSourceProject(conf.getTargetTableName(), model_new, targetDao.updateOpenSourceProject(conf.getTargetTableName(), model_new,
id);// 更新数据gather_projects表 id);// 更新数据gather_projects表
} }
// 处理URL不存在的项目 插入job_requirements表 // 处理URL不存在的项目 插入job_requirements表
public void handleInsertGatherJobs(JobRequirement model, Configure conf) { public void handleInsertGatherJobs(JobRequirement model, Configure conf) {
try { try {
targetDao.insertJobRequirement(conf.getTargetTableName(), targetDao.insertJobRequirement(conf.getTargetTableName(),
conf.getTargetFields(), model); conf.getTargetFields(), model);
} catch (Exception e) { } catch (Exception e) {
logger.error(e); logger.error(e);
} }
} }
// 处理URL相同的项目更新 id表示更新的项目固定id // 处理URL相同的项目更新 id表示更新的项目固定id
public void handleUpdateGatherJobs(int id, JobRequirement model_new) { public void handleUpdateGatherJobs(int id, JobRequirement model_new) {
targetDao targetDao
.updateJobRequirement(conf.getTargetTableName(), model_new, id);// 更新数据job_requirements表 .updateJobRequirement(conf.getTargetTableName(), model_new, id);// 更新数据job_requirements表
} }
} }