[dataexchange]完善CSV导入实现

This commit is contained in:
datagear 2019-05-27 20:44:03 +08:00
parent 40f8cf8802
commit b712fbef93
13 changed files with 719 additions and 41 deletions

View File

@ -20,12 +20,15 @@ public abstract class Import
/** 导入出错时是否终止 */
private boolean abortOnError;
/** 导入报告 */
private ImportReporter importReporter;
public Import()
{
super();
}
public Import(Connection connection, boolean abortOnError)
public Import(Connection connection, boolean abortOnError, ImportReporter importReporter)
{
super();
this.connection = connection;
@ -51,4 +54,19 @@ public abstract class Import
{
this.abortOnError = abortOnError;
}
public boolean hasImportReporter()
{
return (this.importReporter != null);
}
public ImportReporter getImportReporter()
{
return importReporter;
}
public void setImportReporter(ImportReporter importReporter)
{
this.importReporter = importReporter;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange;
/**
* 数据导入报告
*
* @author datagear@163.com
*
*/
public interface ImportReporter
{
/**
* 报告导入异常
*
* @param e
*/
void report(DataImportException e);
}

View File

@ -13,7 +13,7 @@ package org.datagear.dataexchange;
* @author datagear@163.com
*
*/
public class UnsupportedExportException extends DataImportException
public class UnsupportedExportException extends DataExportException
{
private static final long serialVersionUID = 1L;

View File

@ -17,18 +17,22 @@ import java.sql.SQLXML;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.datagear.dataexchange.DataImportException;
import org.datagear.dataexchange.DevotedDataImporter;
import org.datagear.dataexchange.Import;
import org.datagear.dataexchange.ImportReporter;
import org.datagear.dataexchange.support.DataFormat.BinaryFormat;
import org.datagear.dbinfo.ColumnInfo;
import org.datagear.dbinfo.DatabaseInfoResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 抽象文本{@linkplain DevotedDataImporter}
@ -39,43 +43,189 @@ import org.datagear.dbinfo.DatabaseInfoResolver;
*/
public abstract class AbstractTextDevotedDataImporter<T extends Import> extends AbstractDevotedDataImporter<T>
{
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractTextDevotedDataImporter.class);
public AbstractTextDevotedDataImporter()
{
super();
}
/**
* 构建{@linkplain DataFormatContext}
* 构建{@linkplain InsertContext}
*
* @param impt
* @param table
* @return
*/
protected SetParameterContext buildSetParameterContext(AbstractTextImport impt)
protected InsertContext buildInsertContext(AbstractTextImport impt, String table)
{
return new SetParameterContext(impt.getDataFormat());
return new InsertContext(impt.getDataFormat(), table);
}
/**
* 执行插入{@linkplain PreparedStatement}
*
* @param impt
* @param st
* @param insertContext
* @throws InsertSqlException
*/
protected void executeInsertPreparedStatement(AbstractTextImport impt, PreparedStatement st,
InsertContext insertContext) throws InsertSqlException
{
try
{
st.executeUpdate();
}
catch (SQLException e)
{
InsertSqlException e1 = new InsertSqlException(insertContext.getTable(), insertContext.getDataIndex(), e);
if (impt.isAbortOnError())
throw e1;
else
{
if (impt.hasImportReporter())
impt.getImportReporter().report(e1);
}
}
finally
{
insertContext.incrementDataIndex();
insertContext.clearCloseResources();
}
}
/**
* 设置插入预编译SQL语句{@linkplain PreparedStatement}参数
* <p>
* 如果{@linkplain AbstractTextImport#isAbortOnError()}{@code false}此方法将不会抛出{@linkplain SetInsertPreparedColumnValueException}
* </p>
*
* @param impt
* @param st
* @param parameterColumnInfos
* @param parameterValues
* @param setParameterContext
* @throws SQLException
* @param columnInfos
* @param columnValues
* @param insertContext
* @throws SetInsertPreparedColumnValueException
*/
protected void setInsertPreparedStatementParameters(AbstractTextImport impt, PreparedStatement st,
ColumnInfo[] parameterColumnInfos, String[] parameterValues, SetParameterContext setParameterContext)
throws SQLException
protected void setInsertPreparedColumnValues(AbstractTextImport impt, PreparedStatement st,
ColumnInfo[] columnInfos, String[] columnValues, InsertContext insertContext)
throws SetInsertPreparedColumnValueException
{
for (int i = 0; i < parameterColumnInfos.length; i++)
{
ColumnInfo columnInfo = parameterColumnInfos[i];
String rawValue = (parameterValues == null || parameterValues.length - 1 < i ? null : parameterValues[i]);
boolean abortOnError = impt.isAbortOnError();
ImportReporter importReporter = (impt.hasImportReporter() ? impt.getImportReporter() : null);
String table = insertContext.getTable();
int dataIndex = insertContext.getDataIndex();
setPreparedStatementParameter(impt.getConnection(), st, i + 1, columnInfo.getType(), rawValue,
setParameterContext);
for (int i = 0; i < columnInfos.length; i++)
{
ColumnInfo columnInfo = columnInfos[i];
String columnName = columnInfo.getName();
int sqlType = columnInfo.getType();
int parameterIndex = i + 1;
String rawValue = (columnValues == null || columnValues.length - 1 < i ? null : columnValues[i]);
try
{
setPreparedStatementParameter(impt.getConnection(), st, parameterIndex, sqlType, rawValue,
insertContext);
}
catch (SQLException e)
{
SetInsertPreparedColumnValueException e1 = new SetInsertPreparedColumnValueException(table, dataIndex,
columnName, rawValue, e);
if (abortOnError)
throw e1;
else
{
setParameterNull(st, parameterIndex, sqlType);
if (importReporter != null)
importReporter.report(e1);
}
}
catch (ParseException e)
{
IllegalSourceValueException e1 = new IllegalSourceValueException(table, dataIndex, columnName, rawValue,
e);
if (abortOnError)
throw e1;
else
{
setParameterNull(st, parameterIndex, sqlType);
if (importReporter != null)
importReporter.report(e1);
}
}
catch (DecoderException e)
{
IllegalSourceValueException e1 = new IllegalSourceValueException(table, dataIndex, columnName, rawValue,
e);
if (abortOnError)
throw e1;
else
{
setParameterNull(st, parameterIndex, sqlType);
if (importReporter != null)
importReporter.report(e1);
}
}
catch (UnsupportedSqlTypeException e)
{
SetInsertPreparedColumnValueException e1 = new SetInsertPreparedColumnValueException(table, dataIndex,
columnName, rawValue, e);
if (abortOnError)
throw e1;
else
{
setParameterNull(st, parameterIndex, sqlType);
if (importReporter != null)
importReporter.report(e1);
}
}
catch (Exception e)
{
SetInsertPreparedColumnValueException e1 = new SetInsertPreparedColumnValueException(table, dataIndex,
columnName, rawValue, e);
if (abortOnError)
throw e1;
else
{
setParameterNull(st, parameterIndex, sqlType);
if (importReporter != null)
importReporter.report(e1);
}
}
}
}
/**
* 设置{@linkplain PreparedStatement}参数{@code null}
*
* @param st
* @param parameterIndex
* @param sqlType
*/
protected void setParameterNull(PreparedStatement st, int parameterIndex, int sqlType)
{
try
{
st.setNull(parameterIndex, sqlType);
}
catch (SQLException e)
{
LOGGER.error("set PreparedStatement parameter null for sql type [" + sqlType + "]", e);
}
}
@ -92,10 +242,14 @@ public abstract class AbstractTextDevotedDataImporter<T extends Import> extends
* @param sqlType
* @param parameterValue
* @param setParameterContext
* @throws Exception
* @throws SQLException
* @throws ParseException
* @throws DecoderException
* @throws UnsupportedSqlTypeException
*/
protected void setPreparedStatementParameter(Connection cn, PreparedStatement st, int parameterIndex, int sqlType,
String parameterValue, SetParameterContext setParameterContext) throws Exception
String parameterValue, InsertContext setParameterContext)
throws SQLException, ParseException, DecoderException, UnsupportedSqlTypeException
{
if (parameterValue == null)
{
@ -255,7 +409,7 @@ public abstract class AbstractTextDevotedDataImporter<T extends Import> extends
default:
throw new DataImportException("The JDBC sql type [" + sqlType + "] is not supported");
throw new UnsupportedSqlTypeException(sqlType);
}
}
@ -328,20 +482,88 @@ public abstract class AbstractTextDevotedDataImporter<T extends Import> extends
return Base64.decodeBase64(value);
}
/**
* 移除{@code null}列信息位置对应的列值
*
* @param rawColumnInfos
* @param noNullColumnInfos
* @param rawColumnValues
* @return
*/
protected String[] removeNullColumnInfoValues(ColumnInfo[] rawColumnInfos, ColumnInfo[] noNullColumnInfos,
String[] rawColumnValues)
{
if (noNullColumnInfos == rawColumnInfos || noNullColumnInfos.length == rawColumnInfos.length)
return rawColumnValues;
String[] newColumnValues = new String[noNullColumnInfos.length];
int index = 0;
for (int i = 0; i < rawColumnInfos.length; i++)
{
if (rawColumnInfos[i] == null)
continue;
newColumnValues[index++] = rawColumnValues[i];
}
return newColumnValues;
}
/**
* 移除{@linkplain ColumnInfo}数组中的{@code null}元素
* <p>
* 如果没有{@code null}元素将返回原数组
* </p>
*
* @param columnInfos
* @return
*/
protected ColumnInfo[] removeNulls(ColumnInfo[] columnInfos)
{
boolean noNull = true;
for (ColumnInfo columnInfo : columnInfos)
{
if (columnInfo == null)
{
noNull = false;
break;
}
}
if (noNull)
return columnInfos;
List<ColumnInfo> list = new ArrayList<ColumnInfo>(columnInfos.length);
for (ColumnInfo columnInfo : columnInfos)
{
if (columnInfo != null)
list.add(columnInfo);
}
return list.toArray(new ColumnInfo[list.size()]);
}
/**
* 获取表指定列信息数组
* <p>
* 如果指定位置的列不存在返回数组对应位置将为{@code null}
* 当指定位置的列不存在时如果{@code nullIfInexistentColumn}{@code true}返回数组对应位置将为{@code null}
* 否则将立刻抛出{@linkplain ColumnNotFoundException}
* </p>
*
* @param cn
* @param table
* @param columnNames
* @param nullIfInexistentColumn
* @param databaseInfoResolver
* @return
* @throws ColumnNotFoundException
*/
protected ColumnInfo[] getColumnInfos(Connection cn, String table, String[] columnNames,
DatabaseInfoResolver databaseInfoResolver)
boolean nullIfInexistentColumn, DatabaseInfoResolver databaseInfoResolver) throws ColumnNotFoundException
{
ColumnInfo[] columnInfos = new ColumnInfo[columnNames.length];
@ -360,6 +582,9 @@ public abstract class AbstractTextDevotedDataImporter<T extends Import> extends
}
}
if (!nullIfInexistentColumn && columnInfo == null)
throw new ColumnNotFoundException(table, columnNames[i]);
columnInfos[i] = columnInfo;
}
@ -367,23 +592,56 @@ public abstract class AbstractTextDevotedDataImporter<T extends Import> extends
}
/**
* 设置{@linkplain PreparedStatement}参数支持上下文
* 插入SQL语句{@linkplain PreparedStatement}参数支持上下文
*
* @author datagear@163.com
*
*/
protected static class SetParameterContext extends DataFormatContext
protected static class InsertContext extends DataFormatContext
{
private List<Closeable> closeResources = new LinkedList<Closeable>();
public SetParameterContext()
private String table;
private int dataIndex = 0;
public InsertContext()
{
super();
}
public SetParameterContext(DataFormat dataFormat)
public InsertContext(DataFormat dataFormat, String table)
{
super(dataFormat);
this.table = table;
}
public String getTable()
{
return table;
}
public void setTable(String table)
{
this.table = table;
}
public int getDataIndex()
{
return dataIndex;
}
public void setDataIndex(int dataIndex)
{
this.dataIndex = dataIndex;
}
/**
* 数据索引加{@code 1}
*/
public void incrementDataIndex()
{
this.dataIndex += 1;
}
/**

View File

@ -8,6 +8,7 @@ import java.io.Reader;
import java.sql.Connection;
import org.datagear.dataexchange.Import;
import org.datagear.dataexchange.ImportReporter;
/**
* 抽象文本数据导入
@ -23,16 +24,21 @@ public abstract class AbstractTextImport extends Import
/** 文本数据格式 */
private DataFormat dataFormat;
/** 是否忽略不存在的列 */
private boolean ignoreInexistentColumn;
public AbstractTextImport()
{
super();
}
public AbstractTextImport(Connection connection, boolean abortOnError, Reader reader, DataFormat dataFormat)
public AbstractTextImport(Connection connection, boolean abortOnError, ImportReporter importReporter, Reader reader,
DataFormat dataFormat, boolean ignoreInexistentColumn)
{
super(connection, abortOnError);
super(connection, abortOnError, importReporter);
this.reader = reader;
this.dataFormat = dataFormat;
this.ignoreInexistentColumn = ignoreInexistentColumn;
}
public Reader getReader()
@ -54,4 +60,14 @@ public abstract class AbstractTextImport extends Import
{
this.dataFormat = dataFormat;
}
public boolean isIgnoreInexistentColumn()
{
return ignoreInexistentColumn;
}
public void setIgnoreInexistentColumn(boolean ignoreInexistentColumn)
{
this.ignoreInexistentColumn = ignoreInexistentColumn;
}
}

View File

@ -58,29 +58,38 @@ public class CsvDataImporter extends AbstractTextDevotedDataImporter<CsvImport>
long startTime = System.currentTimeMillis();
CSVParser csvParser = buildCSVParser(impt);
SetParameterContext setParameterContext = buildSetParameterContext(impt);
InsertContext insertContext = buildInsertContext(impt, impt.getTable());
Connection cn = impt.getConnection();
ColumnInfo[] columnInfos = null;
ColumnInfo[] rawColumnInfos = null;
ColumnInfo[] noNullColumnInfos = null;
PreparedStatement st = null;
try
{
for (CSVRecord csvRecord : csvParser)
{
if (columnInfos == null)
if (rawColumnInfos == null)
{
columnInfos = resolveColumnInfos(impt, csvRecord);
String sql = buildInsertPreparedSql(cn, impt.getTable(), columnInfos);
rawColumnInfos = resolveColumnInfos(impt, csvRecord);
noNullColumnInfos = removeNulls(rawColumnInfos);
// 没有任何列
if (noNullColumnInfos == null || noNullColumnInfos.length == 0)
throw new NoneColumnFoundException(impt.getTable());
String sql = buildInsertPreparedSql(cn, impt.getTable(), noNullColumnInfos);
st = cn.prepareStatement(sql);
}
else
{
String[] recordValues = resolveCSVRecordValues(impt, csvRecord);
setPreparedStatementParameters(impt, st, columnInfos, recordValues, setParameterContext);
String[] columnnValues = resolveCSVRecordValues(impt, csvRecord, rawColumnInfos, noNullColumnInfos);
st.executeUpdate();
setInsertPreparedColumnValues(impt, st, noNullColumnInfos, columnnValues, insertContext);
executeInsertPreparedStatement(impt, st, insertContext);
}
}
}
@ -100,6 +109,10 @@ public class CsvDataImporter extends AbstractTextDevotedDataImporter<CsvImport>
/**
* {@linkplain CSVRecord}解析列信息数组
* <p>
* 当指定名称的列不存在时如果{@code CsvImport#isIgnoreInexistentColumn()}{@code true}返回数组对应位置将为{@code null}
* 否则将立刻抛出{@linkplain ColumnNotFoundException}
* </p>
*
* @param impt
* @param csvRecord
@ -110,7 +123,25 @@ public class CsvDataImporter extends AbstractTextDevotedDataImporter<CsvImport>
{
String[] columnNames = resolveCSVRecordValues(impt, csvRecord);
return getColumnInfos(impt.getConnection(), impt.getTable(), columnNames, this.databaseInfoResolver);
return getColumnInfos(impt.getConnection(), impt.getTable(), columnNames, impt.isIgnoreInexistentColumn(),
this.databaseInfoResolver);
}
/**
* 解析{@linkplain CSVRecord}值数组
*
* @param impt
* @param csvRecord
* @param rawColumnInfos
* @param noNullColumnInfos
* @return
*/
protected String[] resolveCSVRecordValues(CsvImport impt, CSVRecord csvRecord, ColumnInfo[] rawColumnInfos,
ColumnInfo[] noNullColumnInfos)
{
String[] values = resolveCSVRecordValues(impt, csvRecord);
return removeNullColumnInfoValues(rawColumnInfos, noNullColumnInfos, values);
}
/**

View File

@ -7,6 +7,8 @@ package org.datagear.dataexchange.support;
import java.io.Reader;
import java.sql.Connection;
import org.datagear.dataexchange.ImportReporter;
/**
* CSV导入
*
@ -23,9 +25,10 @@ public class CsvImport extends AbstractTextImport
super();
}
public CsvImport(Connection connection, boolean abortOnError, Reader reader, DataFormat dataFormat, String table)
public CsvImport(Connection connection, boolean abortOnError, ImportReporter importReporter, Reader reader,
DataFormat dataFormat, boolean ignoreInexistentColumn, String table)
{
super(connection, abortOnError, reader, dataFormat);
super(connection, abortOnError, importReporter, reader, dataFormat, ignoreInexistentColumn);
this.table = table;
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange.support;
/**
* JDBC操作异常处理方式
*
* @author datagear@163.com
*
*/
public enum ExceptionResolve
{
/** 提交并终止 */
ABORT,
/** 忽略并继续 */
IGNORE,
/** 回滚并终止 */
ROLLBACK
}

View File

@ -0,0 +1,39 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange.support;
/**
* 导入源值不合法异常
*
* @author datagear@163.com
*
*/
public class IllegalSourceValueException extends SetInsertPreparedColumnValueException
{
private static final long serialVersionUID = 1L;
public IllegalSourceValueException(String table, int dataIndex, String columnName, Object sourceValue)
{
super(table, dataIndex, columnName, sourceValue);
}
public IllegalSourceValueException(String table, int dataIndex, String columnName, Object sourceValue,
String message)
{
super(table, dataIndex, columnName, sourceValue, message);
}
public IllegalSourceValueException(String table, int dataIndex, String columnName, Object sourceValue,
Throwable cause)
{
super(table, dataIndex, columnName, sourceValue, cause);
}
public IllegalSourceValueException(String table, int dataIndex, String columnName, Object sourceValue,
String message, Throwable cause)
{
super(table, dataIndex, columnName, sourceValue, message, cause);
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange.support;
import java.sql.SQLException;
import org.datagear.dataexchange.DataImportException;
/**
* 导入时执行插入SQL异常
*
* @author datagear@163.com
*
*/
public class InsertSqlException extends DataImportException
{
private static final long serialVersionUID = 1L;
private String table;
private int dataIndex;
public InsertSqlException(String table, int dataIndex, SQLException e)
{
super(e);
this.table = table;
this.dataIndex = dataIndex;
}
public String getTable()
{
return table;
}
protected void setTable(String table)
{
this.table = table;
}
public int getDataIndex()
{
return dataIndex;
}
protected void setDataIndex(int dataIndex)
{
this.dataIndex = dataIndex;
}
public SQLException getSQLException()
{
return (SQLException) getCause();
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange.support;
import org.datagear.dataexchange.DataImportException;
/**
* 导入数据时在表中没有找到任何对应列异常
*
* @author datagear@163.com
*
*/
public class NoneColumnFoundException extends DataImportException
{
private static final long serialVersionUID = 1L;
private String table;
public NoneColumnFoundException(String table)
{
super();
this.table = table;
}
public NoneColumnFoundException(String table, String message)
{
super(message);
this.table = table;
}
public NoneColumnFoundException(String table, Throwable cause)
{
super(cause);
this.table = table;
}
public NoneColumnFoundException(String table, String message, Throwable cause)
{
super(message, cause);
this.table = table;
}
public String getTable()
{
return table;
}
protected void setTable(String table)
{
this.table = table;
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange.support;
import java.sql.PreparedStatement;
import org.datagear.dataexchange.DataImportException;
/**
* 导入数据时设置{@linkplain PreparedStatement}列值异常
*
* @author datagear@163.com
*
*/
public class SetInsertPreparedColumnValueException extends DataImportException
{
private static final long serialVersionUID = 1L;
private String table;
private int dataIndex;
private String columnName;
private Object sourceValue;
public SetInsertPreparedColumnValueException(String table, int dataIndex, String columnName, Object sourceValue)
{
super();
this.table = table;
this.dataIndex = dataIndex;
this.columnName = columnName;
this.sourceValue = sourceValue;
}
public SetInsertPreparedColumnValueException(String table, int dataIndex, String columnName, Object sourceValue,
String message)
{
super(message);
this.table = table;
this.dataIndex = dataIndex;
this.columnName = columnName;
this.sourceValue = sourceValue;
}
public SetInsertPreparedColumnValueException(String table, int dataIndex, String columnName, Object sourceValue,
Throwable cause)
{
super(cause);
this.table = table;
this.dataIndex = dataIndex;
this.columnName = columnName;
this.sourceValue = sourceValue;
}
public SetInsertPreparedColumnValueException(String table, int dataIndex, String columnName, Object sourceValue,
String message, Throwable cause)
{
super(message, cause);
this.table = table;
this.dataIndex = dataIndex;
this.columnName = columnName;
this.sourceValue = sourceValue;
}
public String getTable()
{
return table;
}
protected void setTable(String table)
{
this.table = table;
}
public int getDataIndex()
{
return dataIndex;
}
protected void setDataIndex(int dataIndex)
{
this.dataIndex = dataIndex;
}
public String getColumnName()
{
return columnName;
}
public void setColumnName(String columnName)
{
this.columnName = columnName;
}
public Object getSourceValue()
{
return sourceValue;
}
protected void setSourceValue(Object sourceValue)
{
this.sourceValue = sourceValue;
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange.support;
/**
* 不支持指定SQL类型异常
*
* @author datagear@163.com
*
*/
public class UnsupportedSqlTypeException extends Exception
{
private static final long serialVersionUID = 1L;
private int sqlType;
public UnsupportedSqlTypeException(int sqlType)
{
super("Import data of JDBC sql type [" + sqlType + "] is not supported");
this.sqlType = sqlType;
}
public UnsupportedSqlTypeException(int sqlType, String message)
{
super(message);
this.sqlType = sqlType;
}
public UnsupportedSqlTypeException(int sqlType, Throwable cause)
{
super(cause);
this.sqlType = sqlType;
}
public UnsupportedSqlTypeException(int sqlType, String message, Throwable cause)
{
super(message, cause);
this.sqlType = sqlType;
}
public int getSqlType()
{
return sqlType;
}
protected void setSqlType(int sqlType)
{
this.sqlType = sqlType;
}
}