[dataexchange]添加ConnectionFactory类设计,负责在数据交换中获取和回收数据库连接,使得调用者可以灵活控制线程、连接分配。

This commit is contained in:
datagear 2019-06-03 11:08:59 +08:00
parent cd1910f9d4
commit fb4aa75186
19 changed files with 257 additions and 133 deletions

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange;
import java.sql.Connection;
import java.sql.SQLException;
/**
* 数据库连接工厂
* <p>
* 它负责在数据交换中获取和回收数据库连接使得调用者可以灵活控制线程连接分配
* </p>
* <p>
* {@linkplain DataImporter}{@linkplain DataExporter}实现类在使用完这里获取的数据库连接后不应该直接调用{@linkplain Connection#close()}方法
* 而要调用这里的{@linkplain #reclaimConnection(Connection)}方法
* </p>
*
* @author datagear@163.com
*
*/
public interface ConnectionFactory
{
/**
* 获取数据库连接
*
* @return
* @throws SQLException
*/
Connection getConnection() throws SQLException;
/**
* 回收数据库连接
*
* @param cn
* @throws SQLException
*/
void reclaimConnection(Connection cn) throws SQLException;
}

View File

@ -4,8 +4,6 @@
package org.datagear.dataexchange;
import javax.sql.DataSource;
/**
* 数据交换
*
@ -15,7 +13,7 @@ import javax.sql.DataSource;
public abstract class DataExchange
{
/** 数据交换数据源 */
private DataSource dataSource;
private ConnectionFactory connectionFactory;
/** 出错时是否终止 */
private boolean abortOnError;
@ -25,21 +23,21 @@ public abstract class DataExchange
super();
}
public DataExchange(DataSource dataSource, boolean abortOnError)
public DataExchange(ConnectionFactory connectionFactory, boolean abortOnError)
{
super();
this.dataSource = dataSource;
this.connectionFactory = connectionFactory;
this.abortOnError = abortOnError;
}
public DataSource getDataSource()
public ConnectionFactory getConnectionFactory()
{
return dataSource;
return connectionFactory;
}
public void setDataSource(DataSource dataSource)
public void setConnectionFactory(ConnectionFactory connectionFactory)
{
this.dataSource = dataSource;
this.connectionFactory = connectionFactory;
}
public boolean isAbortOnError()

View File

@ -4,8 +4,6 @@
package org.datagear.dataexchange;
import javax.sql.DataSource;
/**
* 导出端
*
@ -21,9 +19,9 @@ public abstract class DataExport extends DataExchange
super();
}
public DataExport(DataSource dataSource, boolean abortOnError)
public DataExport(ConnectionFactory connectionFactory, boolean abortOnError)
{
super(dataSource, abortOnError);
super(connectionFactory, abortOnError);
}
public boolean hasDataExportReporter()

View File

@ -4,8 +4,6 @@
package org.datagear.dataexchange;
import javax.sql.DataSource;
/**
* 导入源
*
@ -22,9 +20,9 @@ public abstract class DataImport extends DataExchange
super();
}
public DataImport(DataSource dataSource, boolean abortOnError)
public DataImport(ConnectionFactory connectionFactory, boolean abortOnError)
{
super(dataSource, abortOnError);
super(connectionFactory, abortOnError);
}
public boolean hasDataImportReporter()

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange;
import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.datagear.connection.ConnectionSource;
/**
* 数据源{@linkplain ConnectionSource}
*
* @author datagear@163.com
*
*/
public class DataSourceConnectionFactory implements ConnectionFactory
{
private DataSource dataSource;
public DataSourceConnectionFactory()
{
super();
}
public DataSourceConnectionFactory(DataSource dataSource)
{
super();
this.dataSource = dataSource;
}
public DataSource getDataSource()
{
return dataSource;
}
public void setDataSource(DataSource dataSource)
{
this.dataSource = dataSource;
}
@Override
public Connection getConnection() throws SQLException
{
return this.dataSource.getConnection();
}
@Override
public void reclaimConnection(Connection cn) throws SQLException
{
cn.close();
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2018 datagear.org. All Rights Reserved.
*/
package org.datagear.dataexchange;
import java.sql.Connection;
import java.sql.SQLException;
/**
* 简单{@linkplain ConnectionFactory}
*
* @author datagear@163.com
*
*/
public class SimpleConnectionFactory implements ConnectionFactory
{
private Connection connection;
private boolean closeOnReclaim;
public SimpleConnectionFactory()
{
super();
}
public SimpleConnectionFactory(Connection connection, boolean closeOnReclaim)
{
super();
this.connection = connection;
this.closeOnReclaim = closeOnReclaim;
}
public void setConnection(Connection connection)
{
this.connection = connection;
}
@Override
public Connection getConnection() throws SQLException
{
return this.connection;
}
public boolean isCloseOnReclaim()
{
return closeOnReclaim;
}
public void setCloseOnReclaim(boolean closeOnReclaim)
{
this.closeOnReclaim = closeOnReclaim;
}
@Override
public void reclaimConnection(Connection cn) throws SQLException
{
if (this.connection != cn)
throw new IllegalStateException();
if (this.closeOnReclaim && this.connection != null)
cn.close();
}
}

View File

@ -9,11 +9,14 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataExport;
import org.datagear.dataexchange.DataExportException;
import org.datagear.dataexchange.DevotedDataExporter;
import org.datagear.dbinfo.ColumnInfo;
import org.datagear.dbinfo.DatabaseInfoResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 抽象{@linkplain DevotedDataExporter}
@ -28,6 +31,8 @@ import org.datagear.dbinfo.DatabaseInfoResolver;
public abstract class AbstractDevotedDataExporter<T extends DataExport> extends DataExchangerSupport
implements DevotedDataExporter<T>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDevotedDataExporter.class);
public AbstractDevotedDataExporter()
{
super();
@ -39,6 +44,24 @@ public abstract class AbstractDevotedDataExporter<T extends DataExport> extends
return true;
}
/**
* 回收数据库连接
*
* @param connectionFactory
* @param cn
*/
protected void reclaimConnection(ConnectionFactory connectionFactory, Connection cn)
{
try
{
connectionFactory.reclaimConnection(cn);
}
catch (SQLException e)
{
LOGGER.error("reclaimConnection error", e);
}
}
/**
* 获取{@linkplain ResultSet}列信息
*

View File

@ -13,6 +13,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataImport;
import org.datagear.dataexchange.DevotedDataImporter;
import org.datagear.dbinfo.ColumnInfo;
@ -46,6 +47,24 @@ public abstract class AbstractDevotedDataImporter<T extends DataImport> extends
return true;
}
/**
* 回收数据库连接
*
* @param connectionFactory
* @param cn
*/
protected void reclaimConnection(ConnectionFactory connectionFactory, Connection cn)
{
try
{
connectionFactory.reclaimConnection(cn);
}
catch (SQLException e)
{
LOGGER.error("reclaimConnection error", e);
}
}
/**
* 执行下一个插入操作
*

View File

@ -6,8 +6,7 @@ package org.datagear.dataexchange.support;
import java.io.Writer;
import javax.sql.DataSource;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataExport;
/**
@ -25,9 +24,10 @@ public class CsvDataExport extends QueryTextDataExport
super();
}
public CsvDataExport(DataSource dataSource, boolean abortOnError, DataFormat dataFormat, Query query, Writer writer)
public CsvDataExport(ConnectionFactory connectionFactory, boolean abortOnError, DataFormat dataFormat, Query query,
Writer writer)
{
super(dataSource, abortOnError, dataFormat, query);
super(connectionFactory, abortOnError, dataFormat, query);
this.writer = writer;
}

View File

@ -12,7 +12,7 @@ import java.sql.SQLTransientException;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.datagear.connection.JdbcUtil;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataExportException;
import org.datagear.dataexchange.DataExportReporter;
import org.datagear.dataexchange.DataExportResult;
@ -43,6 +43,8 @@ public class CsvDataExporter extends AbstractTextDevotedDataExporter<CsvDataExpo
{
DataExportResult dataExportResult = new DataExportResult();
ConnectionFactory connectionFactory = expt.getConnectionFactory();
long startTime = System.currentTimeMillis();
boolean abortOnError = expt.isAbortOnError();
@ -54,7 +56,7 @@ public class CsvDataExporter extends AbstractTextDevotedDataExporter<CsvDataExpo
try
{
cn = expt.getDataSource().getConnection();
cn = connectionFactory.getConnection();
ResultSet rs = expt.getQuery().execute(cn);
@ -126,7 +128,7 @@ public class CsvDataExporter extends AbstractTextDevotedDataExporter<CsvDataExpo
}
finally
{
JdbcUtil.closeConnection(cn);
reclaimConnection(connectionFactory, cn);
}
dataExportResult.setDuration(System.currentTimeMillis() - startTime);

View File

@ -6,8 +6,7 @@ package org.datagear.dataexchange.support;
import java.io.Reader;
import javax.sql.DataSource;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataImport;
/**
@ -26,10 +25,10 @@ public class CsvDataImport extends TableTextDataImport
super();
}
public CsvDataImport(DataSource dataSource, boolean abortOnError, DataFormat dataFormat, String table,
public CsvDataImport(ConnectionFactory connectionFactory, boolean abortOnError, DataFormat dataFormat, String table,
boolean ignoreInexistentColumn, Reader reader)
{
super(dataSource, abortOnError, dataFormat, table, ignoreInexistentColumn);
super(connectionFactory, abortOnError, dataFormat, table, ignoreInexistentColumn);
this.reader = reader;
}

View File

@ -13,6 +13,7 @@ import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.datagear.connection.JdbcUtil;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataImportException;
import org.datagear.dataexchange.DataImportResult;
import org.datagear.dataexchange.DevotedDataImporter;
@ -42,6 +43,8 @@ public class CsvDataImporter extends AbstractTextDevotedDataImporter<CsvDataImpo
{
DataImportResult dataImportResult = new DataImportResult();
ConnectionFactory connectionFactory = impt.getConnectionFactory();
long startTime = System.currentTimeMillis();
CSVParser csvParser = buildCSVParser(impt);
@ -56,7 +59,7 @@ public class CsvDataImporter extends AbstractTextDevotedDataImporter<CsvDataImpo
try
{
cn = impt.getDataSource().getConnection();
cn = connectionFactory.getConnection();
for (CSVRecord csvRecord : csvParser)
{
@ -89,7 +92,7 @@ public class CsvDataImporter extends AbstractTextDevotedDataImporter<CsvDataImpo
finally
{
JdbcUtil.closeStatement(st);
JdbcUtil.closeConnection(cn);
reclaimConnection(connectionFactory, cn);
}
dataImportResult.setDuration(System.currentTimeMillis() - startTime);

View File

@ -4,7 +4,7 @@
package org.datagear.dataexchange.support;
import javax.sql.DataSource;
import org.datagear.dataexchange.ConnectionFactory;
/**
* 查询{@linkplain TextDataExport}
@ -22,9 +22,10 @@ public class QueryTextDataExport extends TextDataExport
super();
}
public QueryTextDataExport(DataSource dataSource, boolean abortOnError, DataFormat dataFormat, Query query)
public QueryTextDataExport(ConnectionFactory connectionFactory, boolean abortOnError, DataFormat dataFormat,
Query query)
{
super(dataSource, abortOnError, dataFormat);
super(connectionFactory, abortOnError, dataFormat);
this.query = query;
}

View File

@ -4,7 +4,7 @@
package org.datagear.dataexchange.support;
import javax.sql.DataSource;
import org.datagear.dataexchange.ConnectionFactory;
/**
* 单表{@linkplain TextDataImport}
@ -25,10 +25,10 @@ public abstract class TableTextDataImport extends TextDataImport
super();
}
public TableTextDataImport(DataSource dataSource, boolean abortOnError, DataFormat dataFormat, String table,
boolean ignoreInexistentColumn)
public TableTextDataImport(ConnectionFactory connectionFactory, boolean abortOnError, DataFormat dataFormat,
String table, boolean ignoreInexistentColumn)
{
super(dataSource, abortOnError, dataFormat);
super(connectionFactory, abortOnError, dataFormat);
this.table = table;
this.ignoreInexistentColumn = ignoreInexistentColumn;
}

View File

@ -4,8 +4,7 @@
package org.datagear.dataexchange.support;
import javax.sql.DataSource;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataExport;
/**
@ -24,9 +23,9 @@ public abstract class TextDataExport extends DataExport
super();
}
public TextDataExport(DataSource dataSource, boolean abortOnError, DataFormat dataFormat)
public TextDataExport(ConnectionFactory connectionFactory, boolean abortOnError, DataFormat dataFormat)
{
super(dataSource, abortOnError);
super(connectionFactory, abortOnError);
this.dataFormat = dataFormat;
}

View File

@ -4,8 +4,7 @@
package org.datagear.dataexchange.support;
import javax.sql.DataSource;
import org.datagear.dataexchange.ConnectionFactory;
import org.datagear.dataexchange.DataImport;
/**
@ -24,9 +23,9 @@ public abstract class TextDataImport extends DataImport
super();
}
public TextDataImport(DataSource dataSource, boolean abortOnError, DataFormat dataFormat)
public TextDataImport(ConnectionFactory connectionFactory, boolean abortOnError, DataFormat dataFormat)
{
super(dataSource, abortOnError);
super(connectionFactory, abortOnError);
this.dataFormat = dataFormat;
}

View File

@ -6,7 +6,6 @@ package org.datagear.dataexchange;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -14,8 +13,6 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.datagear.connection.JdbcUtil;
import org.datagear.dataexchange.support.CsvDataImporterTest;
import org.datagear.dbinfo.DatabaseInfoResolver;
@ -86,74 +83,4 @@ public abstract class DataexchangeTestSupport extends DBTestSupport
return CsvDataImporterTest.class.getClassLoader()
.getResourceAsStream("org/datagear/dataexchange/support/" + resourceName);
}
protected SimpleDataSource buildSimpleDataSource(Connection cn)
{
return new SimpleDataSource(cn);
}
protected static class SimpleDataSource implements DataSource
{
private Connection connection;
public SimpleDataSource(Connection connection)
{
super();
this.connection = connection;
}
public void setConnection(Connection connection)
{
this.connection = connection;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException
{
throw new UnsupportedExportException();
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException
{
throw new UnsupportedExportException();
}
@Override
public PrintWriter getLogWriter() throws SQLException
{
throw new UnsupportedExportException();
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException
{
throw new UnsupportedExportException();
}
@Override
public void setLoginTimeout(int seconds) throws SQLException
{
throw new UnsupportedExportException();
}
@Override
public int getLoginTimeout() throws SQLException
{
throw new UnsupportedExportException();
}
@Override
public Connection getConnection() throws SQLException
{
return this.connection;
}
@Override
public Connection getConnection(String username, String password) throws SQLException
{
throw new UnsupportedExportException();
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.commons.csv.CSVRecord;
import org.datagear.connection.IOUtil;
import org.datagear.connection.JdbcUtil;
import org.datagear.dataexchange.DataexchangeTestSupport;
import org.datagear.dataexchange.SimpleConnectionFactory;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -65,8 +66,8 @@ public class CsvDataExporterTest extends DataexchangeTestSupport
cn = getConnection();
reader = IOUtil.getReader(getTestResourceInputStream("CsvDataExporterTest.csv"), "UTF-8");
CsvDataImport impt = new CsvDataImport(buildSimpleDataSource(cn), true, dataFormat, TABLE_NAME, true,
reader);
CsvDataImport impt = new CsvDataImport(new SimpleConnectionFactory(cn, false), true, dataFormat, TABLE_NAME,
true, reader);
clearTable(cn, TABLE_NAME);
this.csvDataImporter.impt(impt);
@ -94,7 +95,7 @@ public class CsvDataExporterTest extends DataexchangeTestSupport
writer = new OutputStreamWriter(new FileOutputStream(outFile), "UTF-8");
CsvDataExport csvDataExport = new CsvDataExport(buildSimpleDataSource(cn), true, dataFormat,
CsvDataExport csvDataExport = new CsvDataExport(new SimpleConnectionFactory(cn, false), true, dataFormat,
new TableQuery(TABLE_NAME), writer);
this.csvDataExporter.expt(csvDataExport);

View File

@ -10,6 +10,7 @@ import java.sql.Connection;
import org.datagear.connection.IOUtil;
import org.datagear.connection.JdbcUtil;
import org.datagear.dataexchange.DataexchangeTestSupport;
import org.datagear.dataexchange.SimpleConnectionFactory;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -50,8 +51,8 @@ public class CsvDataImporterTest extends DataexchangeTestSupport
reader = IOUtil.getReader(getTestResourceInputStream("CsvDataImporterTest_ignoreInexistentColumn.csv"),
"UTF-8");
CsvDataImport impt = new CsvDataImport(buildSimpleDataSource(cn), true, dataFormat, TABLE_NAME, false,
reader);
CsvDataImport impt = new CsvDataImport(new SimpleConnectionFactory(cn, false), true, dataFormat, TABLE_NAME,
false, reader);
clearTable(cn, TABLE_NAME);
@ -81,15 +82,13 @@ public class CsvDataImporterTest extends DataexchangeTestSupport
reader = IOUtil.getReader(getTestResourceInputStream("CsvDataImporterTest_ignoreInexistentColumn.csv"),
"UTF-8");
CsvDataImport impt = new CsvDataImport(buildSimpleDataSource(cn), true, dataFormat, TABLE_NAME, true,
reader);
CsvDataImport impt = new CsvDataImport(new SimpleConnectionFactory(cn, false), true, dataFormat, TABLE_NAME,
true, reader);
clearTable(cn, TABLE_NAME);
this.csvDataImporter.impt(impt);
cn = getConnection();
int count = getCount(cn, TABLE_NAME);
Assert.assertEquals(3, count);
@ -114,15 +113,13 @@ public class CsvDataImporterTest extends DataexchangeTestSupport
cn = getConnection();
reader = IOUtil.getReader(getTestResourceInputStream("CsvDataImporterTest_abortOnError.csv"), "UTF-8");
CsvDataImport impt = new CsvDataImport(buildSimpleDataSource(cn), false, dataFormat, TABLE_NAME, true,
reader);
CsvDataImport impt = new CsvDataImport(new SimpleConnectionFactory(cn, false), false, dataFormat,
TABLE_NAME, true, reader);
clearTable(cn, TABLE_NAME);
this.csvDataImporter.impt(impt);
cn = getConnection();
int count = getCount(cn, TABLE_NAME);
Assert.assertEquals(2, count);
@ -147,8 +144,8 @@ public class CsvDataImporterTest extends DataexchangeTestSupport
cn = getConnection();
reader = IOUtil.getReader(getTestResourceInputStream("CsvDataImporterTest_abortOnError.csv"), "UTF-8");
CsvDataImport impt = new CsvDataImport(buildSimpleDataSource(cn), true, dataFormat, TABLE_NAME, true,
reader);
CsvDataImport impt = new CsvDataImport(new SimpleConnectionFactory(cn, false), true, dataFormat, TABLE_NAME,
true, reader);
clearTable(cn, TABLE_NAME);