--修复抽取bug

This commit is contained in:
wu ming 2016-12-07 09:19:45 +08:00
parent bf3fcb267f
commit f08b57e3d7
11 changed files with 834 additions and 451 deletions

View File

@ -83,30 +83,30 @@ oracle-psw=oracle
# windows path
#=============================================================================================================
#extract-log-localtion=D:\\test\\log\\
#extract-standard-log-localtion=D:\\test\\log2\\
#
#file_upload_path=D:\\test\\
#file_download_path=D:\\test\\export.xlsx
#
#package_download_path=D:\\test\\
#package_name=sql_script_standard
#
#sql_script_path_last=D:\\test\\sql_script_last\\
#sql_script_path_standard=D:\\test\\sql_script_standard\\
extract-log-localtion=D:\\test\\log\\
extract-standard-log-localtion=D:\\test\\log2\\
file_upload_path=D:\\test\\
file_download_path=D:\\test\\export.xlsx
package_download_path=D:\\test\\
package_name=sql_script_standard
sql_script_path_last=D:\\test\\sql_script_last\\
sql_script_path_standard=D:\\test\\sql_script_standard\\
#=============================================================================================================
# linux path
#=============================================================================================================
extract-log-localtion=/home/web_manage/log/
extract-standard-log-localtion=/home/web_manage/log2/
file_upload_path=/excel_import_dir/
file_download_path=/excel_export_dir/export.xlsx
package_download_path=/
package_name=DefaultDescription
sql_script_path_last=/DefaultDescription_last/
sql_script_path_standard=/DefaultDescription/
#extract-log-localtion=/home/web_manage/log/
#extract-standard-log-localtion=/home/web_manage/log2/
#
#file_upload_path=/excel_import_dir/
#file_download_path=/excel_export_dir/export.xlsx
#
#package_download_path=/
#package_name=DefaultDescription
#
#sql_script_path_last=/DefaultDescription_last/
#sql_script_path_standard=/DefaultDescription/

View File

@ -23,28 +23,30 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import com.base.BaseController;
import com.platform.entities.BasedTask;
import com.platform.entities.DataInfoEntity;
import com.platform.entities.DataInfoEntityMoveTmp;
import com.platform.entities.GatherOracleInfo;
import com.platform.entities.OracleConnectorParams;
import com.platform.entities.OracleExtractExecuter;
import com.platform.form.PagerOptions;
import com.platform.form.oracleForm;
import com.platform.form.volumeMoveForm;
import com.platform.http.HttpUtils;
import com.platform.service.DataInfoService;
import com.platform.service.ICodeService;
import com.platform.service.IGfsService;
import com.platform.service.IGatherOracleService;
import com.platform.service.ILogRead;
import com.platform.service.IMoveDataService;
import com.platform.service.IGatherOracleService;
import com.platform.service.IOracleExtractService;
import com.platform.service.IVolumeService;
import com.platform.service.OracleExtractTask;
import com.platform.service.OracleStatusService;
import com.platform.service.thread.ThreadExtractOracle;
import com.platform.service.thread.ThreadGainOracleConnect;
import com.platform.utils.Constant;
import com.platform.utils.UtilsHelper;
/** 数据管理
/**
* 数据管理
*
* @author chen
*
*/
@ -99,7 +101,9 @@ public class DataModelController extends BaseController {
this.dfs = dfs;
}
/** 数据管理--分页查询数据
/**
* 数据管理--分页查询数据
*
* @param res
* @param req
* @return
@ -124,17 +128,19 @@ public class DataModelController extends BaseController {
sb.append(str).append(":").append("null").append(",");
}
}
log.info(sb.deleteCharAt(sb.length() - 1)
.append("}").toString());
log.info(sb.deleteCharAt(sb.length() - 1).append("}").toString());
PagerOptions pagerOptions = (PagerOptions) UtilsHelper
.newObjAndSetAttrsByClass(PagerOptions.class, params);
pagerOptions.setCurrentPageNum(Integer.valueOf(params.get("currentPageNum")));
//冷热区查询字段mark
pagerOptions.setCurrentPageNum(Integer.valueOf(params
.get("currentPageNum")));
// 冷热区查询字段mark
pagerOptions.setMark(pagerOptions.getVolumeType().trim());
return dfs.getPagerTableData(pagerOptions);
}
/** 数据管理--删除数据
/**
* 数据管理--删除数据
*
* @param res
* @param req
* @throws Exception
@ -149,7 +155,9 @@ public class DataModelController extends BaseController {
req.setStatus(200);
}
/** 连接oracle
/**
* 连接oracle
*
* @param res
* @param req
* @throws UnsupportedEncodingException
@ -165,7 +173,8 @@ public class DataModelController extends BaseController {
log.info("执行连接\t" + rcName);
String cmd = "kubectl label --overwrite rc " + rcName
+ " status=0";
List<String> rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
@ -175,7 +184,9 @@ public class DataModelController extends BaseController {
req.setStatus(200);
}
/** 断开oracle连接
/**
* 断开oracle连接
*
* @param res
* @param req
* @throws UnsupportedEncodingException
@ -196,7 +207,9 @@ public class DataModelController extends BaseController {
req.setStatus(200);
}
/** gfs的 volume节点的查询
/**
* gfs的 volume节点的查询
*
* @return
* @throws Exception
*/
@ -208,7 +221,9 @@ public class DataModelController extends BaseController {
return rest;
}
/** 数据迁移功能
/**
* 数据迁移功能
*
* @param res
* @param req
* @param form
@ -233,7 +248,9 @@ public class DataModelController extends BaseController {
req.setStatus(200);
}
/** oracle的 汇总功能
/**
* oracle的 汇总功能
*
* @param res
* @param req
* @param form
@ -241,39 +258,32 @@ public class DataModelController extends BaseController {
*/
@RequestMapping(value = "/oracle/{name}/extract", method = RequestMethod.POST)
public void oracleExtract(HttpServletRequest res, HttpServletResponse req,
@RequestBody oracleForm form) throws Exception {
@RequestBody oracleForm form) throws Exception {
log.error("/oracle/{name}/extract");
boolean isConnect = false;
//5秒内是否能获得oracle连接否则认为超时
if (null != form.getTarget()) {
ThreadGainOracleConnect thOrcl = new ThreadGainOracleConnect(form, OracleExtract);
thOrcl.start();
for (int i = 0; i < 10; i++) {
Thread.sleep(400);
isConnect = thOrcl.isConnect();
if (isConnect) {
break;
}
else {
if (thOrcl.isExcept()) {
break;
if (null != form.getTarget()) { // 检查请求参数中是否包含汇总库信息
boolean isConnect = OracleExtract.isConnectTotalOracle(form
.getTarget()); // 检查汇总库是否可以连接成功连接成功返回200状态吗连接失败返回500状态吗
if (isConnect) {
req.setStatus(200);
if (null != form.getInneed() && form.getInneed().size() > 0) {
for (OracleConnectorParams oracleParams : form.getInneed()) {
BasedTask task = new OracleExtractTask(
oracleParams.getName(), form.getTarget(),
oracleParams, OracleExtract);
OracleExtractExecuter oee = new OracleExtractExecuter(
task);
new Thread(oee, oracleParams.getName()).start();
}
Thread.sleep(100);
}
}
}
if (isConnect)
req.setStatus(200);
else
req.setStatus(500);
// 开始抽取数据到汇总库
if (isConnect && null != form.getInneed() && form.getInneed().size() > 0) {
ThreadExtractOracle thExtra = new ThreadExtractOracle(form, OracleExtract);
thExtra.start();
} else
req.setStatus(500);
}
}
/** oracle汇总抽取库的 查询
/**
* oracle汇总抽取库的 查询
*
* @return
* @throws Exception
*/
@ -285,7 +295,9 @@ public class DataModelController extends BaseController {
return result;
}
/** oracle汇总抽取库的 删除
/**
* oracle汇总抽取库的 删除
*
* @param req
* @param res
* @param id
@ -301,7 +313,9 @@ public class DataModelController extends BaseController {
res.setStatus(200);
}
/** oracle汇总抽取库的 新增
/**
* oracle汇总抽取库的 新增
*
* @param res
* @param req
* @param id
@ -319,7 +333,9 @@ public class DataModelController extends BaseController {
req.setStatus(200);
}
/** oracle汇总抽取库的 更新
/**
* oracle汇总抽取库的 更新
*
* @param res
* @param req
* @param id
@ -337,7 +353,9 @@ public class DataModelController extends BaseController {
req.setStatus(200);
}
/** 迁移 数据 的查询
/**
* 迁移 数据 的查询
*
* @return
* @throws Exception
*/
@ -349,7 +367,9 @@ public class DataModelController extends BaseController {
return result;
}
/** 迁移完成后的 删除记录功能
/**
* 迁移完成后的 删除记录功能
*
* @param res
* @param req
* @param id
@ -368,7 +388,9 @@ public class DataModelController extends BaseController {
return result;
}
/** 迁移数据完成后新增一条数据暂时去掉新增功能不在此
/**
* 迁移数据完成后新增一条数据暂时去掉新增功能不在此
*
* @param res
* @param req
* @param move
@ -381,12 +403,14 @@ public class DataModelController extends BaseController {
HttpServletResponse req, @RequestBody DataInfoEntity move)
throws Exception {
log.debug("---------/task/transfer/save-----------------------");
// int result = dfs.save(move);
// int result = dfs.save(move);
req.setStatus(200);
return 1;
}
/** 地区和系统的 code 对应的名称 获取
/**
* 地区和系统的 code 对应的名称 获取
*
* @return
* @throws Exception
*/
@ -398,7 +422,9 @@ public class DataModelController extends BaseController {
return result;
}
/** oracle 汇总的 日志 读取
/**
* oracle 汇总的 日志 读取
*
* @param name
* @param res
* @param req
@ -411,7 +437,7 @@ public class DataModelController extends BaseController {
HttpServletRequest res, HttpServletResponse req) throws Exception {
log.info("---------/oracle/extract/log-------------------");
String result = logReadService.readLog(name);
// "查看相应日志"
// "查看相应日志"
Map<String, String> log = new HashMap<String, String>();
log.put(name, result);
return log;

View File

@ -0,0 +1,32 @@
package com.platform.entities;
public abstract class AbstractOracleExtractTask implements BasedTask {
private String name;
private int status; // 任务的执行状态,0未执行1执行中2完成3失败
public AbstractOracleExtractTask() {
}
public AbstractOracleExtractTask(String name) {
this.name = name;
}
public abstract void handler();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
}

View File

@ -0,0 +1,5 @@
package com.platform.entities;
public interface BasedTask {
public void handler();
}

View File

@ -0,0 +1,15 @@
package com.platform.entities;
public class OracleExtractExecuter implements Runnable {
private BasedTask task;
public OracleExtractExecuter(BasedTask task) {
this.task = task;
}
@Override
public void run() {
if (null != task)
task.handler();
}
}

View File

@ -16,7 +16,8 @@ import com.platform.utils.FileOperateHelper;
public class OracleConnector {
public static Logger log = Configs.DAILY_ROLLING_LOGGER.getLogger(OracleConnector.class);
public static Logger log = Configs.DAILY_ROLLING_LOGGER
.getLogger(OracleConnector.class);
public OracleConnector() {
}
@ -30,24 +31,27 @@ public class OracleConnector {
}
}
public synchronized static Connection connectionBuilder(String url, String user,
String password, OracleConnectorParams oc) throws CustomException {
Connection conn=null;
public synchronized static Connection connectionBuilder(String url,
String user, String password, OracleConnectorParams oc)
throws CustomException {
Connection conn = null;
try {
conn = DriverManager.getConnection(url, user, password);
} catch (SQLException e) {
Configs.CONSOLE_LOGGER.info("创建oracle连接失败: [" + e.getMessage() + "]");
Configs.CONSOLE_LOGGER.info("创建oracle连接失败: [" + e.getMessage()
+ "]");
if (null != oc) {
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ oc.getName()+".log", "创建oracle连接失败: [" + e.getMessage() + "]\r\n");
FileOperateHelper.fileWrite(
Configs.EXTRACT_LOG_LOCALTION + oc.getName() + ".log",
"创建oracle连接失败: [" + e.getMessage() + "]\r\n");
}
throw new CustomException(Custom4exception.OracleSQL_Except, e);
}
return conn;
}
public synchronized static boolean canConnect(String url, String user, String password) {
public synchronized static boolean canConnect(String url, String user,
String password) {
Connection result = null;
try {
result = connectionBuilder(url, user, password, null);
@ -57,25 +61,27 @@ public class OracleConnector {
return (null != result);
}
public synchronized static ResultSet getSQLExecResultSet(Connection conn, String sql, String filePath) {
public synchronized static ResultSet getSQLExecResultSet(Connection conn,
String sql, String filePath) {
ResultSet resultSet = null;
if (null != filePath) {
filePath = filePath.replace(".log", "");
}
Statement statement = null;
try {
statement = conn
.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE,
ResultSet.CONCUR_UPDATABLE);
statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_UPDATABLE);
resultSet = statement.executeQuery(sql);
FileOperateHelper
.fileWrite(filePath+".log", sql+ "\r\n"+"OK \r\n");
/*
* if(resultSet.next()){ System.out.println(resultSet.getInt(1)); }
*/
FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n"
+ "OK \r\n");
} catch (SQLException e) {
FileOperateHelper
.fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n");
FileOperateHelper.fileWrite(filePath + ".log",
sql + "\r\n" + e.getMessage() + "\r\n");
log.error(Custom4exception.OracleSQL_Except, e);
}
finally{
} finally {
if (null != statement) {
try {
statement.close();
@ -88,11 +94,12 @@ public class OracleConnector {
return resultSet;
}
public synchronized static ResultSet getSQLExecResultSet(String url, String user,
String password, String sql, String filePath) {
ResultSet result = null;
public synchronized static ResultSet getSQLExecResultSet(String url,
String user, String password, String sql, String filePath) {
ResultSet result = null;
try {
result = getSQLExecResultSet(connectionBuilder(url, user, password, null), sql, filePath);
result = getSQLExecResultSet(
connectionBuilder(url, user, password, null), sql, filePath);
} catch (CustomException e) {
log.error(Custom4exception.OracleSQL_Except, e);
}
@ -101,11 +108,13 @@ public class OracleConnector {
/**
* 执行对oracle数据库的增
*
* @param conn
* @param sql
* @return true:执行的不返回集合数据的sql成功 是否执行成功
* @return true:执行的不返回集合数据的sql成功 是否执行成功
*/
public synchronized static boolean execOracleSQL(Connection conn, String sql, String filePath) {
public synchronized static boolean execOracleSQL(Connection conn,
String sql, String filePath) {
if (null != filePath) {
filePath = filePath.replace(".log", "");
}
@ -113,16 +122,16 @@ public class OracleConnector {
Statement statement = null;
try {
statement = conn.createStatement();
flag =statement.execute(sql);
FileOperateHelper
.fileWrite(filePath+".log", sql+ "\r\n"+ flag +" \r\n");
statement.executeUpdate(sql);
flag = true;
FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + flag
+ " \r\n");
} catch (SQLException e) {
flag = false;
FileOperateHelper
.fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n");
FileOperateHelper.fileWrite(filePath + ".log",
sql + "\r\n" + e.getMessage() + "\r\n");
log.error(Custom4exception.OracleSQL_Except, e);
}
finally{
} finally {
if (null != statement) {
try {
statement.close();
@ -132,15 +141,18 @@ public class OracleConnector {
}
}
return flag;
return flag ;
}
/**
* 执行对oracle数据库的返回集合数据的sql
*
* @param conn
* @param sql
* @return true:执行结果大于1即有数据 是否执行成功
* @return true:执行结果大于1即有数据 是否执行成功
*/
public synchronized static boolean execUpdateOracleSQL(Connection conn, String sql, String filePath) {
public synchronized static boolean execUpdateOracleSQL(Connection conn,
String sql, String filePath) {
if (null != filePath) {
filePath = filePath.replace(".log", "");
}
@ -148,19 +160,17 @@ public class OracleConnector {
Statement statement = null;
try {
statement = conn.createStatement();
if(statement.executeUpdate(sql) > 0)
{
flag = true;
FileOperateHelper
.fileWrite(filePath+".log", sql+ "\r\n"+"OK \r\n");
}
statement.executeUpdate(sql);
flag = true;
FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n"
+ "OK \r\n");
} catch (SQLException e) {
flag = false;
FileOperateHelper
.fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n");
FileOperateHelper.fileWrite(filePath + ".log",
sql + "\r\n" + e.getMessage() + "\r\n");
log.error(Custom4exception.OracleSQL_Except, e);
}
finally{
} finally {
if (null != statement) {
try {
statement.close();

View File

@ -33,4 +33,10 @@ public interface IOracleExtractService {
boolean extractStandardTable(String name, List<OracleConnectorParams> dataInfolist,
GatherOracleInfo oracleConnect) throws Exception;
/**
*
* @param ocp
*/
public void updateDataExtractStatus(OracleConnectorParams ocp, int status);
}

View File

@ -16,7 +16,7 @@ import com.platform.utils.FileOperateHelper;
public class OracleExtractHelper {
public static Logger log = Configs.DAILY_ROLLING_LOGGER.getLogger(OracleExtractHelper.class);
public static Logger log = Logger.getLogger(OracleExtractHelper.class);
/**
* 判断dblink是否已经存在
@ -26,22 +26,39 @@ public class OracleExtractHelper {
* dblink的名称
* @return
*/
private boolean hasSameNameDBLink(Connection conn, String linkName, String filePath) {
private boolean hasSameNameDBLink(Connection conn, String linkName,
String filePath) {
boolean flag = false;
String sql = "SELECT * FROM ALL_DB_LINKS WHERE DB_LINK='" + linkName+"'";
String sql = "SELECT COUNT(*) c FROM ALL_DB_LINKS WHERE DB_LINK='"
+ linkName + "'";
ResultSet rSet = null;
Statement statement = null;
try {
rSet = OracleConnector.getSQLExecResultSet(conn, sql, null);
rSet.last();
if (rSet.getRow() > 0)
flag = true;
FileOperateHelper
.fileWrite(filePath, sql+ "\r\n"+"OK \r\n");
statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_UPDATABLE);
rSet = statement.executeQuery(sql);
if (rSet.next()) {
if (rSet.getInt("c") > 0)
flag = true;
else
flag = false;
}
FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n"
+ "OK \r\n");
} catch (Exception e) {
FileOperateHelper
.fileWrite(filePath, sql+ "\r\n"+ e.getMessage() + "\r\n连接异常 \r\n");
FileOperateHelper.fileWrite(filePath, sql + "\r\n" + e.getMessage()
+ "\r\n连接异常 \r\n");
log.error(Custom4exception.threadVolume_Oracle_Except, e);
} finally {
if (null != statement) {
try {
statement.close();
} catch (SQLException e) {
log.error(Custom4exception.OracleSQL_Except, e);
}
}
}
return flag;
}
@ -49,12 +66,13 @@ public class OracleExtractHelper {
/**
* 创建dblink
*
* @param conn 汇总库的连接
* @param oc 采集库
* dblink连接参数实体
* @param conn
* 汇总库的连接
* @param oc
* 采集库 dblink连接参数实体
*/
public void createDBLink(Connection conn, OracleConnectorParams oc) {
String linkName = "LINKTO" + oc.getName();
String linkName = "LINKTO_J" + oc.getName().replaceAll("-", "_");
String sql = "CREATE PUBLIC DATABASE LINK "
+ linkName
+ " CONNECT TO "
@ -66,24 +84,30 @@ public class OracleExtractHelper {
+ ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName()
+ ")))\'";
if (null != oc) {
if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_LOG_LOCALTION + oc.getName()+".log")) { // 如果dblink已经存在,先删除dblink在创建dblink
String deleteSql = "DROP PUBLIC DATABASE LINK "
+ linkName;
//删除 dblink
if (OracleConnector.execOracleSQL(conn, deleteSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())) {
if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())){
OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_LOG_LOCALTION
+ oc.getName())) { // 如果dblink已经存在,先删除dblink在创建dblink
String deleteSql = "DROP PUBLIC DATABASE LINK " + linkName;
// 删除 dblink
if (OracleConnector.execOracleSQL(conn, deleteSql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName())) {
if (!OracleConnector.execOracleSQL(conn, sql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName())) {
OracleConnector.execUpdateOracleSQL(conn, sql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName());
}
} else {
Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!");
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ oc.getName()+".log", "删除已有的DBLink失败,无法创建新的DBLink!"+"\r\n");
+ oc.getName() + ".log",
"删除已有的DBLink失败,无法创建新的DBLink!" + "\r\n");
}
} else {
// 否则创建dblink
if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())){
OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
if (!OracleConnector.execOracleSQL(conn, sql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName())) {
OracleConnector.execUpdateOracleSQL(conn, sql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName());
}
}
}
@ -92,52 +116,66 @@ public class OracleExtractHelper {
/**
* 创建表空间
*
* @param conn 汇总库连接
* @param conn
* 汇总库连接
* @param collectOracle
* @param oc 汇总库信息
* @param oc
* 汇总库信息
* @return
*/
public boolean createTableSpace(Connection conn, OracleConnectorParams collectOracle, GatherOracleInfo oc) {
public boolean createTableSpace(Connection conn,
OracleConnectorParams collectOracle, GatherOracleInfo oc) {
String tmpSql = "select TABLESPACE_NAME from dba_tablespaces where TABLESPACE_NAME = '"
+ oc.getTableName() + "'";
// 存在 表空间
if (OracleConnector.execUpdateOracleSQL(conn, tmpSql, Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName())) {
if (OracleConnector.execUpdateOracleSQL(conn, tmpSql,
Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName())) {
return true;
} else {
String sql = "create tablespace " + oc.getTableName()
+ " datafile '" + Configs.GATHER_TABLESPACE_PATH
+ oc.getTableName() + ".dbf"
+ "' size 512M autoextend on next 512M maxsize unlimited";
return OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName());
return OracleConnector.execOracleSQL(conn, sql,
Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName());
}
}
/**
* 创建用户并授权
*
* @param conn 汇总库连接
* @param oc 汇总库信息
* @param conn
* 汇总库连接
* @param oc
* 汇总库信息
*/
public void createUser(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) {
String strTUser = oc.getName() + totalOracle.getSuffix();
public void createUser(Connection conn, OracleConnectorParams oc,
GatherOracleInfo totalOracle) {
String strTUser = "J" + oc.getName().replaceAll("-", "_")
+ totalOracle.getSuffix();
String sql = "Create user " + strTUser + " default tablespace "
+ totalOracle.getTableName() + " identified by "
+ Configs.GATHER_TABLE_PASSWORD;
String grantSql = "grant connect, resource, dba to " + strTUser;
OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION
+ oc.getName());
OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, grantSql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName());
}
/**
* 创建用户并授权
*
* @param conn 汇总库连接
* @param oc 汇总库信息
* @param conn
* 汇总库连接
* @param oc
* 汇总库信息
*/
public void createOnlyUser(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) {
public void createOnlyUser(Connection conn, OracleConnectorParams oc,
GatherOracleInfo totalOracle) {
String strTUser = Configs.GATHER_STANDARD_USER_NAME;
String sql = "Create user " + strTUser + " default tablespace "
@ -145,18 +183,23 @@ public class OracleExtractHelper {
+ Configs.GATHER_TABLE_PASSWORD;
String grantSql = "grant connect, resource, dba to " + strTUser;
OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, sql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, grantSql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
}
/**
* 创建用户并授权
*
* @param conn 汇总库连接
* @param oc 汇总库信息
* @param conn
* 汇总库连接
* @param oc
* 汇总库信息
*/
public void createTable(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) {
public void createTable(Connection conn, OracleConnectorParams oc,
GatherOracleInfo totalOracle) {
String strTUser = oc.getName() + totalOracle.getSuffix();
String sql = "Create table " + strTUser + " default tablespace "
@ -164,47 +207,67 @@ public class OracleExtractHelper {
+ Configs.GATHER_TABLE_PASSWORD;
String grantSql = "grant connect, resource, dba to " + strTUser;
OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION
+ oc.getName());
OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, grantSql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName());
}
/**
* 执行汇总操作
*
* @param conn 汇总库连接
* @param oc 采集库
* @param conn
* 汇总库连接
* @param oc
* 采集库
*/
public void extractColleDB(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) {
String strTUser = oc.getName() + totalOracle.getSuffix();
public void extractColleDB(Connection conn, OracleConnectorParams oc,
GatherOracleInfo totalOracle) {
String strTUser = "J" + oc.getName().replaceAll("-", "_")
+ totalOracle.getSuffix();
String sql = "SELECT 'create table "
+ strTUser
+ ".J'|| substr(t.OWNER||'_'||t.TABLE_NAME,0,29)||' as select * from '||t.OWNER||'.'||t.TABLE_NAME||"
+ "'@LINKTO"
+ oc.getName()
+ ";' FROM dba_tables@LINKTO"
+ oc.getName()
+ "'@LINKTO_J"
+ oc.getName().replaceAll("-", "_")
+ ";' FROM dba_tables@LINKTO_J"
+ oc.getName().replaceAll("-", "_")
+ " t WHERE t.TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX')"
+ " and t.owner||t.table_name not in (select owner||table_name from dba_tables@LINKTO"
+ oc.getName() + " where 'data_type'='CLOB')";
ResultSet rsSet = OracleConnector.getSQLExecResultSet(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
try {
while (rsSet.next()) {
try {
rsSet.getRow();
String resultSql = rsSet.getString(1).replace(";", "");
OracleConnector.execUpdateOracleSQL(conn, resultSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName());
} catch (SQLException e) {
log.error(Custom4exception.threadVolume_Oracle_Except, e);
}
+ " and t.owner||t.table_name not in (select owner||table_name from dba_tables@LINKTO_J"
+ oc.getName().replaceAll("-", "_")
+ " where 'data_type'='CLOB')";
/*
* ResultSet rsSet = OracleConnector.getSQLExecResultSet(conn, sql,
* Configs.EXTRACT_LOG_LOCALTION + oc.getName());
*/
Statement statement = null;
try {
statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_UPDATABLE);
ResultSet resultSet = statement.executeQuery(sql);
FileOperateHelper.fileWrite(
Configs.EXTRACT_LOG_LOCALTION + oc.getName() + ".log", sql
+ "\r\n" + "OK \r\n");
while (resultSet.next()) {
try {
String resultSql = resultSet.getString(1).replace(";", "");
OracleConnector.execUpdateOracleSQL(conn, resultSql,
Configs.EXTRACT_LOG_LOCALTION + oc.getName());
} catch (SQLException e) {
log.error(Custom4exception.threadVolume_Oracle_Except, e);
}
} catch (SQLException e) {
log.error(Custom4exception.threadVolume_Oracle_Except, e);
}
} catch (SQLException e) {
log.error(Custom4exception.threadVolume_Oracle_Except, e);
}
}
/** 测试是否能否连接上
/**
* 测试是否能否连接上
*
* @return true能连接上
*/
public boolean testConnect(Connection conn) {
@ -212,8 +275,8 @@ public class OracleExtractHelper {
boolean flag = false;
try {
Statement statement = conn.createStatement();
if(statement.executeUpdate(testSql) > 0)
flag = true;
if (statement.executeUpdate(testSql) > 0)
flag = true;
} catch (SQLException e) {
flag = false;
log.error(Custom4exception.threadVolume_Oracle_Except, e);
@ -224,20 +287,27 @@ public class OracleExtractHelper {
/**
* 执行抽取操作--支付表
*
* @param conn 汇总库连接
* @param oc 采集库
* @param conn
* 汇总库连接
* @param oc
* 采集库
*/
public void extractStandardPayTable(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) {
public void extractStandardPayTable(Connection conn,
OracleConnectorParams oc, GatherOracleInfo totalOracle) {
String strTUser = Configs.GATHER_STANDARD_USER_NAME;
createPay(conn, oc);
String sql = "insert into " + strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME + " select * from "
+ strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME +"@LINKTOST"
+ oc.getName();
String sql = "insert into " + strTUser + "."
+ Configs.GATHER_STANDARD_PAY_TABLE_NAME + " select * from "
+ strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME
+ "@LINKTOST" + oc.getName();
String resultSql = sql.replace(";", "");
OracleConnector.execOracleSQL(conn, resultSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, resultSql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
}
/** 执行抽取操作--执行表
/**
* 执行抽取操作--执行表
*
* @param conn
* @param collectOracle
* @param oracleModel
@ -247,26 +317,30 @@ public class OracleExtractHelper {
createExec(conn, collectOracle);
String strTUser = Configs.GATHER_STANDARD_USER_NAME;
String sql = "insert into " + strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME + " select * from "
+ strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME +"@LINKTOST"
+ collectOracle.getName();
String sql = "insert into " + strTUser + "."
+ Configs.GATHER_STANDARD_EXEC_TABLE_NAME + " select * from "
+ strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME
+ "@LINKTOST" + collectOracle.getName();
String resultSql = sql.replace(";", "");
OracleConnector.execOracleSQL(conn, resultSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + collectOracle.getName());
OracleConnector.execOracleSQL(
conn,
resultSql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName());
}
private void createPay(Connection conn, OracleConnectorParams oc) {
String payCmd = "CREATE TABLE u_bzbjy.zfxxb(XZQHDM Varchar(255),XZQHMC Varchar(255),PZBH Varchar(255),LYZBKZH Varchar(255),"
+ "ZFDATE Varchar(255),YSDWCODE Varchar(255),YSDWNAME Varchar(255),YWGKCS Varchar(255),XMCODE Varchar(255),XMNAME Varchar(255),"
+"XMLBCODE Varchar(255),XMLBNAME Varchar(255),ZB_NO Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255),"
+"JJFLNAME Varchar(255),ZJXZCODE Varchar(255),ZJXZNAME Varchar(255),JSBFFSNAME Varchar(255),SKR Varchar(255),SKRYH Varchar(255),"
+ "XMLBCODE Varchar(255),XMLBNAME Varchar(255),ZB_NO Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255),"
+ "JJFLNAME Varchar(255),ZJXZCODE Varchar(255),ZJXZNAME Varchar(255),JSBFFSNAME Varchar(255),SKR Varchar(255),SKRYH Varchar(255),"
+ "SKRZHZH Varchar(255),FKZHCODE Varchar(255),FKZHNAME Varchar(255),FKYHCODE Varchar(255),FKYHNAME Varchar(255),QSZHCODE Varchar(255),"
+ "QSZHNAME Varchar(255),QSYHCODE Varchar(255),QSYHNAME Varchar(255),JE Numeric(18,2), SFTK Varchar(255),NIAN Varchar(255),ZY Varchar(255))";
try {
OracleConnector.execOracleSQL(conn, payCmd, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, payCmd,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
} catch (Exception e) {
log.error(Custom4exception.threadVolume_Oracle_Except, e);
}
@ -275,9 +349,10 @@ public class OracleExtractHelper {
/**
* 创建dblink
*
* @param conn 汇总库的连接
* @param oc 采集库
* dblink连接参数实体
* @param conn
* 汇总库的连接
* @param oc
* 采集库 dblink连接参数实体
*/
public void createStandardDBLink(Connection conn, OracleConnectorParams oc) {
String linkName = "LINKTOST" + oc.getName();
@ -292,26 +367,45 @@ public class OracleExtractHelper {
+ ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName()
+ ")))\'";
if (null != oc) {
if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()+".log")) { // 如果dblink已经存在,先删除dblink在创建dblink
String deleteSql = "DROP PUBLIC DATABASE LINK "
+ linkName;
//删除 dblink
if (OracleConnector.execOracleSQL(conn, deleteSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) {
// OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())){
OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
if (hasSameNameDBLink(conn, linkName,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()
+ ".log")) { // 如果dblink已经存在,先删除dblink在创建dblink
String deleteSql = "DROP PUBLIC DATABASE LINK " + linkName;
// 删除 dblink
if (OracleConnector.execOracleSQL(conn, deleteSql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) {
// OracleConnector.execOracleSQL(conn, sql,
// Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
if (!OracleConnector.execOracleSQL(
conn,
sql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ oc.getName())) {
OracleConnector.execUpdateOracleSQL(
conn,
sql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ oc.getName());
}
} else {
Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!");
FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ oc.getName()+".log", "删除已有的DBLink失败,无法创建新的DBLink!"+"\r\n");
FileOperateHelper.fileWrite(
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ oc.getName() + ".log",
"删除已有的DBLink失败,无法创建新的DBLink!" + "\r\n");
}
} else {
// 否则创建dblink
// OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())){
OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
// OracleConnector.execOracleSQL(conn, sql,
// Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
if (!OracleConnector.execOracleSQL(conn, sql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) {
OracleConnector.execUpdateOracleSQL(
conn,
sql,
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ oc.getName());
}
}
}
@ -326,7 +420,8 @@ public class OracleExtractHelper {
+ "ZBKYJE Numeric(18,2),ZYZFBZ Varchar(255),BZ Varchar(255))";
try {
OracleConnector.execOracleSQL(conn, execCmd, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
OracleConnector.execOracleSQL(conn, execCmd,
Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName());
} catch (Exception e) {
log.error(Custom4exception.threadVolume_Oracle_Except, e);
}

View File

@ -0,0 +1,107 @@
package com.platform.service;
import java.sql.Connection;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import com.base.Custom4exception;
import com.platform.entities.AbstractOracleExtractTask;
import com.platform.entities.GatherOracleInfo;
import com.platform.entities.OracleConnectorParams;
import com.platform.oracle.OracleConnector;
import com.platform.utils.Configs;
import com.platform.utils.Constant;
import com.platform.utils.DateForm;
import com.platform.utils.FileOperateHelper;
public class OracleExtractTask extends AbstractOracleExtractTask {
public static Logger log = Logger.getLogger(OracleExtractTask.class);
private GatherOracleInfo gatherOracleInfo;
private OracleConnectorParams oc;
private IOracleExtractService OracleExtract;
public OracleExtractTask(String name, GatherOracleInfo gatherOracleInfo,
OracleConnectorParams oc, IOracleExtractService OracleExtract) {
super(name);
this.gatherOracleInfo = gatherOracleInfo;
this.oc = oc;
this.OracleExtract = OracleExtract;
}
public OracleExtractTask(GatherOracleInfo gatherOracleInfo,
OracleConnectorParams oc, IOracleExtractService OracleExtract) {
this.gatherOracleInfo = gatherOracleInfo;
this.oc = oc;
this.OracleExtract = OracleExtract;
}
private OracleExtractHelper oracleExtract = new OracleExtractHelper();
@Override
public void handler() {
// TODO Auto-generated method stub
try {
Connection conn = OracleConnector.connectionBuilder(
createConnectUrl(), gatherOracleInfo.getUser(),
gatherOracleInfo.getPassword(), oc);
if (null != conn) {
String cmd = "kubectl label --overwrite rc " + oc.getName()
+ " isExtract=1";
OracleExtract.updateDataExtractStatus(oc, 1); // 更新数据库的状态
FileOperateHelper
.fileWrite(
Configs.EXTRACT_LOG_LOCALTION + oc.getName()
+ ".log",
"["
+ DateForm
.date2StringBysecond(new Date())
+ "]>>>>>>>>>>>>>>>>开始汇总 >>>>>>>>>>>>>>>>>>\r\n");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
oracleExtract.createDBLink(conn, oc); // 创建dblink
oracleExtract.createTableSpace(conn, oc, gatherOracleInfo); // 创建表空间
oracleExtract.createUser(conn, oc, gatherOracleInfo);// 创建用户并授权
oracleExtract.extractColleDB(conn, oc, gatherOracleInfo);// 执行抽取
cmd = "kubectl label --overwrite rc " + oc.getName()
+ " isExtract=2";
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
FileOperateHelper
.fileWrite(
Configs.EXTRACT_LOG_LOCALTION + oc.getName()
+ ".log",
"["
+ DateForm
.date2StringBysecond(new Date())
+ "]>>>>>>>>>>>>>>>>汇总结束 >>>>>>>>>>>>>>>>>>\r\n\r\n");
OracleExtract.updateDataExtractStatus(oc, 2); // 更新数据库的状态
}
} catch (Exception e) {
log.error(Custom4exception.OracleSQL_Except, e);
}
}
private String createConnectUrl() {
String answer = "";
if (null != gatherOracleInfo) {
answer = "jdbc:oracle:thin:@" + gatherOracleInfo.getIp() + ":"
+ gatherOracleInfo.getPort() + ":"
+ gatherOracleInfo.getDatabaseName();
}
return answer;
}
}

View File

@ -12,7 +12,7 @@ public class LogReadServiceImpl implements ILogRead {
@Override
public String readLog(String filename) throws Exception {
String result = FileOperateHelper.fileReader(Configs.EXTRACT_LOG_LOCALTION + "J" + filename.replace("-", "_")+".log");
String result = FileOperateHelper.fileReader(Configs.EXTRACT_LOG_LOCALTION + filename+".log");
return result;
}

View File

@ -31,7 +31,8 @@ public class OracleExtractServiceImpl implements IOracleExtractService {
/**
* 日志
*/
public final static Logger log = Logger.getLogger(OracleExtractServiceImpl.class);
public final static Logger log = Logger
.getLogger(OracleExtractServiceImpl.class);
@Resource(name = "dataInfoDao")
private DataInfoDao dataInfoDao;
@ -51,244 +52,330 @@ public class OracleExtractServiceImpl implements IOracleExtractService {
private OracleConnector connect = new OracleConnector();
@Override
public boolean extractOracle(String name, List<OracleConnectorParams> dataInfolist,
GatherOracleInfo oracleConnect) throws Exception {
public boolean extractOracle(String name,
List<OracleConnectorParams> datainfos, GatherOracleInfo oracleModel)
throws Exception {
boolean isSuccess = false;
try{
//map转 bean(汇总库信息-带tableName的)
GatherOracleInfo oracleModel = oracleConnect;
//采集库连接参数
List<OracleConnectorParams> datainfos = dataInfolist;
try {
// map转 bean(汇总库信息-带tableName的)
// GatherOracleInfo oracleModel = oracleConnect;
// 采集库连接参数
// List<OracleConnectorParams> datainfos = dataInfolist;
if (datainfos.size() == 0) {
return false;
}
Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0));
Connection conn = OracleConnector.connectionBuilder(
"jdbc:oracle:thin:@" + oracleModel.getIp() + ":"
+ oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(),
oracleModel.getUser(), oracleModel.getPassword(),
datainfos.get(0));
if (null == conn) {
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n");
+ datainfos.get(0).getName() + ".log",
"创建oracle连接失败: [" + conn + "]\r\n");
return false;
}
for (OracleConnectorParams collectOracle : datainfos) {
String replicasName = collectOracle.getName();
try{
if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){
DataInfoEntity data = new DataInfoEntity();
data.setId(Integer.valueOf(collectOracle.getDataId()));
data.setExtractStatus(1);
dataInfoDao.updateExtract(data);
collectOracle.setName("J" + collectOracle.getName().replace("-", "_"));
String cmd = "kubectl label --overwrite rc "
+ replicasName + " isExtract=1";
//sql日志记录时间
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ collectOracle.getName()+".log", "\r\n 开始汇总 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
oracleExtract.createDBLink(conn, collectOracle); //创建dblink
oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间
oracleExtract.createUser(conn, collectOracle, oracleModel);//创建用户并授权
oracleExtract.extractColleDB(conn, collectOracle, oracleModel);//执行抽取
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
cmd = "kubectl label --overwrite rc "
+ replicasName + " isExtract=2";
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setExtractStatus(2);
dataInfoDao.updateExtract(data);
}
}catch(Exception e){
try {
if (null != collectOracle.getDataId()
&& !"".equals(collectOracle.getDataId())) {
DataInfoEntity data = new DataInfoEntity();
data.setId(Integer.valueOf(collectOracle.getDataId()));
data.setExtractStatus(1);
dataInfoDao.updateExtract(data);
collectOracle.setName("J"
+ collectOracle.getName().replace("-", "_"));
String cmd = "kubectl label --overwrite rc "
+ replicasName + " isExtract=1";
// sql日志记录时间
FileOperateHelper
.fileWrite(
Configs.EXTRACT_LOG_LOCALTION
+ collectOracle.getName()
+ ".log",
"\r\n 开始汇总 \r\n"
+ DateForm
.date2StringBysecond(new Date())
+ "\r\n");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "1");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
oracleExtract.createDBLink(conn, collectOracle); // 创建dblink
oracleExtract.createTableSpace(conn, collectOracle,
oracleModel); // 创建表空间
oracleExtract.createUser(conn, collectOracle,
oracleModel);// 创建用户并授权
oracleExtract.extractColleDB(conn, collectOracle,
oracleModel);// 执行抽取
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "2");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
cmd = "kubectl label --overwrite rc " + replicasName
+ " isExtract=2";
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setExtractStatus(2);
dataInfoDao.updateExtract(data);
}
} catch (Exception e) {
log.error(Custom4exception.OracleSQL_Except, e);
}
finally{
//sql日志记录时间
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ collectOracle.getName()+".log", "\r\n 汇总结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
String cmd = "kubectl label --overwrite rc "
+ replicasName + " isExtract=2";
} finally {
// sql日志记录时间
FileOperateHelper.fileWrite(
Configs.EXTRACT_LOG_LOCALTION
+ collectOracle.getName() + ".log",
"\r\n 汇总结束 \r\n"
+ DateForm.date2StringBysecond(new Date())
+ "\r\n");
String cmd = "kubectl label --overwrite rc " + replicasName
+ " isExtract=2";
Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
}
}
isSuccess = true;
}catch(Exception e){
} catch (Exception e) {
new CustomException(Custom4exception.OracleSQL_Except, e);
}
return isSuccess;
}
@Override
public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) throws Exception {
public boolean isConnectTotalOracle(GatherOracleInfo oracleModel)
throws Exception {
boolean isConnect = false;
Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(), null);
Connection conn = OracleConnector.connectionBuilder(
"jdbc:oracle:thin:@" + oracleModel.getIp() + ":"
+ oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(), oracleModel.getUser(),
oracleModel.getPassword(), null);
if (null == conn) {
isConnect = false;
throw new CustomException(Custom4exception.connect_Oracle_Except, null, oracleModel);
// FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
// + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + "]\r\n");
}else {
throw new CustomException(Custom4exception.connect_Oracle_Except,
null, oracleModel);
// FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
// + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn +
// "]\r\n");
} else {
isConnect = oracleExtract.testConnect(conn);
}
return isConnect;
}
@Override
public boolean extractStandardTable(String name, List<OracleConnectorParams> dataInfolist,
public boolean extractStandardTable(String name,
List<OracleConnectorParams> dataInfolist,
GatherOracleInfo oracleConnect) throws Exception {
boolean isSuccess = false;
try{
//map转 bean(汇总库信息-带tableName的)
GatherOracleInfo oracleModel = oracleConnect;
//采集库连接参数
try {
// map转 bean(汇总库信息-带tableName的)
GatherOracleInfo oracleModel = oracleConnect;
// 采集库连接参数
List<OracleConnectorParams> datainfos = dataInfolist;
if (datainfos.size() == 0) {
return false;
}
Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0));
Connection conn = OracleConnector.connectionBuilder(
"jdbc:oracle:thin:@" + oracleModel.getIp() + ":"
+ oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(),
oracleModel.getUser(), oracleModel.getPassword(),
dataInfolist.get(0));
if (null == conn) {
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n");
+ dataInfolist.get(0).getName() + ".log",
"创建oracle连接失败: [" + conn + "]\r\n");
return false;
}
for (OracleConnectorParams collectOracle : datainfos) {
String replicasName = collectOracle.getName();
try{
if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){
DataInfoEntity data = new DataInfoEntity();
data.setId(Integer.valueOf(collectOracle.getDataId()));
//设置为 标准表 抽取中
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX);
data.setStandardExtractStatus("1");
dataInfoDao.update(data);
collectOracle.setName("CQ" + collectOracle.getName().replace("-", "_"));
String cmd = "kubectl annotate --overwrite rc "
+ replicasName + " standardExtractStatus=1";
//sql日志记录时间
FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName()+".log", "\r\n 开始抽取标准表 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
oracleExtract.createStandardDBLink(conn, collectOracle); //创建dblink
oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间
oracleExtract.createOnlyUser(conn, collectOracle, oracleModel);//创建 抽取标准表的 用户并授权
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
if (null != tmpdata) {
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getPayResultLast())
|| Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getPayResultLast())
|| Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getPayResultLast())) {
//抽取中
data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX);
dataInfoDao.update(data);
boolean isExtrac = true;
try{
oracleExtract.extractStandardPayTable(conn, collectOracle, oracleModel);//执行抽取
}catch(Exception e){
//改回 校验存在的状态
data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
//抽取成功
data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
}
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getExecResultLast())
|| Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getExecResultLast())
|| Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getExecResultLast())) {
//抽取中
data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX);
dataInfoDao.update(data);
boolean isExtrac = true;
try{
oracleExtract.extractStandardExecTable(conn, collectOracle, oracleModel);//执行抽取
}catch(Exception e){
//改回 校验存在的状态
data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
}
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
cmd = "kubectl annotate --overwrite rc "
+ replicasName + " standardExtractStatus=2";
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
try {
if (null != collectOracle.getDataId()
&& !"".equals(collectOracle.getDataId())) {
DataInfoEntity data = new DataInfoEntity();
data.setId(Integer.valueOf(collectOracle.getDataId()));
// 设置为 标准表 抽取中
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX);
data.setStandardExtractStatus("1");
dataInfoDao.update(data);
collectOracle.setName("CQ"
+ collectOracle.getName().replace("-", "_"));
String cmd = "kubectl annotate --overwrite rc "
+ replicasName + " standardExtractStatus=1";
// sql日志记录时间
FileOperateHelper
.fileWrite(
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName()
+ ".log",
"\r\n 开始抽取标准表 \r\n"
+ DateForm
.date2StringBysecond(new Date())
+ "\r\n");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN);
data.setStandardExtractStatus("2");
dataInfoDao.update(data);
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "1");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink
oracleExtract.createTableSpace(conn, collectOracle,
oracleModel); // 创建表空间
oracleExtract.createOnlyUser(conn, collectOracle,
oracleModel);// 创建 抽取标准表的 用户并授权
DataInfoEntity tmpdata = dataInfoDao.findById(data
.getId());
if (null != tmpdata) {
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata
.getPayResultLast())
|| Constant.CHECKOUT_STATUS_FIVE
.equals(tmpdata.getPayResultLast())
|| Constant.CHECKOUT_STATUS_SIX
.equals(tmpdata.getPayResultLast())) {
// 抽取中
data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX);
dataInfoDao.update(data);
boolean isExtrac = true;
try {
oracleExtract.extractStandardPayTable(conn,
collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
// 抽取成功
data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
}
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata
.getExecResultLast())
|| Constant.CHECKOUT_STATUS_FIVE
.equals(tmpdata.getExecResultLast())
|| Constant.CHECKOUT_STATUS_SIX
.equals(tmpdata.getExecResultLast())) {
// 抽取中
data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX);
dataInfoDao.update(data);
boolean isExtrac = true;
try {
oracleExtract.extractStandardExecTable(
conn, collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
}
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "2");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
cmd = "kubectl annotate --overwrite rc "
+ replicasName + " standardExtractStatus=2";
rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN);
data.setStandardExtractStatus("2");
dataInfoDao.update(data);
}
}
}
}catch(Exception e){
} catch (Exception e) {
log.error(Custom4exception.OracleSQL_Except, e);
}
finally{
//sql日志记录时间
FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName()+".log", "\r\n 抽取标准表结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
} finally {
// sql日志记录时间
FileOperateHelper.fileWrite(
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName() + ".log",
"\r\n 抽取标准表结束 \r\n"
+ DateForm.date2StringBysecond(new Date())
+ "\r\n");
String cmd = "kubectl annotate --overwrite rc "
+ replicasName + " standardExtractStatus=2";
Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
}
}
isSuccess = true;
}catch(Exception e){
} catch (Exception e) {
log.error(Custom4exception.OracleSQL_Except, e);
}
return isSuccess;
}
// @Override
// public boolean extractOracle(String name, List<OracleConnectorParams> dataInfos, GatherOracleInfo oracleConnect) throws Exception {
// boolean isSuccess = false;
// try{
// //map转 bean(汇总库信息-带tableName的)
//// GatherOracleInfo oracleModel = (GatherOracleInfo) Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect);
//
// //采集库连接参数
//// List<OracleConnectorParams> datainfos = new ArrayList<OracleConnectorParams>();
//// for (Map<String, String> map : dataInfoMap) {
//// OracleConnectorParams dataInfoEntity = (OracleConnectorParams) Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect);
//// datainfos.add(dataInfoEntity);
//// }
//
// Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/"
// + oracleConnect.getDatabaseName(), oracleConnect.getUser(), oracleConnect.getPassword());
//
// for (OracleConnectorParams collectOracle : dataInfos) {
//
// oracleExtract.createDBLink(conn, collectOracle);
// oracleExtract.createTableSpace(conn, oracleConnect);
// oracleExtract.createUser(conn, oracleConnect);
// oracleExtract.extractColleDB(conn, collectOracle);
// }
// isSuccess = true;
// }catch(Exception e){
//
// }
// return false;
// }
@Override
public void updateDataExtractStatus(OracleConnectorParams ocp, int status) {
DataInfoEntity data = new DataInfoEntity();
data.setId(Integer.valueOf(ocp.getDataId()));
// 设置为 标准表 抽取中
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX);
data.setExtractStatus(status);
try {
dataInfoDao.update(data);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// @Override
// public boolean extractOracle(String name, List<OracleConnectorParams>
// dataInfos, GatherOracleInfo oracleConnect) throws Exception {
// boolean isSuccess = false;
// try{
// //map转 bean(汇总库信息-带tableName的)
// // GatherOracleInfo oracleModel = (GatherOracleInfo)
// Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect);
//
// //采集库连接参数
// // List<OracleConnectorParams> datainfos = new
// ArrayList<OracleConnectorParams>();
// // for (Map<String, String> map : dataInfoMap) {
// // OracleConnectorParams dataInfoEntity = (OracleConnectorParams)
// Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect);
// // datainfos.add(dataInfoEntity);
// // }
//
// Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@"
// + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/"
// + oracleConnect.getDatabaseName(), oracleConnect.getUser(),
// oracleConnect.getPassword());
//
// for (OracleConnectorParams collectOracle : dataInfos) {
//
// oracleExtract.createDBLink(conn, collectOracle);
// oracleExtract.createTableSpace(conn, oracleConnect);
// oracleExtract.createUser(conn, oracleConnect);
// oracleExtract.extractColleDB(conn, collectOracle);
// }
// isSuccess = true;
// }catch(Exception e){
//
// }
// return false;
// }
}