移除Cometd,相关功能改为采用ajax轮训方式重新实现,简单高效

This commit is contained in:
datagear 2020-11-13 18:24:36 +08:00
parent 829dff0b19
commit efc4076d94
20 changed files with 337 additions and 342 deletions

View File

@ -88,7 +88,6 @@ import org.datagear.web.sqlpad.SqlpadExecutionService;
import org.datagear.web.util.ChangelogResolver;
import org.datagear.web.util.DirectoryFactory;
import org.datagear.web.util.DirectoryHtmlChartPluginManagerInitializer;
import org.datagear.web.util.MessageChannel;
import org.datagear.web.util.SqlDriverChecker;
import org.datagear.web.util.TableCache;
import org.datagear.web.util.XmlDriverEntityManagerInitializer;
@ -618,12 +617,6 @@ public class CoreConfig implements InitializingBean
return new ChangelogResolver();
}
@Bean
public MessageChannel messageChannel()
{
return new MessageChannel();
}
@Bean
public SqlSelectManager sqlSelectManager()
{
@ -635,7 +628,7 @@ public class CoreConfig implements InitializingBean
public SqlpadExecutionService sqlpadExecutionService()
{
SqlpadExecutionService bean = new SqlpadExecutionService(this.connectionSource(), this.messageSource(),
this.messageChannel(), this.sqlHistoryService(), this.sqlSelectManager());
this.sqlHistoryService(), this.sqlSelectManager());
return bean;
}

View File

@ -116,15 +116,14 @@ public class DataExchangeController extends AbstractSchemaConnController
@Autowired
private DataExchangeService<DataExchange> dataExchangeService;
@Autowired
private MessageChannel messageChannel;
@Autowired
private DBMetaResolver dbMetaResolver;
@Autowired
private File tempDirectory;
private MessageChannel messageChannel = new MessageChannel();
public DataExchangeController()
{
super();
@ -145,7 +144,7 @@ public class DataExchangeController extends AbstractSchemaConnController
return messageChannel;
}
public void setMessageChannel(MessageChannel messageChannel)
protected void setMessageChannel(MessageChannel messageChannel)
{
this.messageChannel = messageChannel;
}
@ -211,7 +210,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames());
springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name());
@ -267,8 +265,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String importChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length];
@ -283,8 +279,8 @@ public class DataExchangeController extends AbstractSchemaConnController
dataImportForm.getImportOption(), tableNames[i], readerFactory);
MessageSubTextValueDataImportListener listener = new MessageSubTextValueDataImportListener(
this.messageChannel, importChannelId, getMessageSource(), locale,
subDataExchangeIds[i], csvDataImport.getImportOption().getExceptionResolve());
this.messageChannel, dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i],
csvDataImport.getImportOption().getExceptionResolve());
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
evalSendDataExchangingMessageInterval(subDataExchangeIds.length, csvDataImport));
@ -312,7 +308,7 @@ public class DataExchangeController extends AbstractSchemaConnController
Collections.addAll(subDataExchangeSet, subDataExchanges);
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet,
importChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -344,7 +340,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames());
springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name());
@ -395,8 +390,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String importChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length];
@ -410,8 +403,8 @@ public class DataExchangeController extends AbstractSchemaConnController
SqlDataImport sqlDataImport = new SqlDataImport(connectionFactory, dataImportForm.getImportOption(),
readerFactory);
MessageSubDataImportListener listener = new MessageSubDataImportListener(this.messageChannel, importChannelId,
getMessageSource(), locale, subDataExchangeIds[i],
MessageSubDataImportListener listener = new MessageSubDataImportListener(this.messageChannel,
dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i],
dataImportForm.getImportOption().getExceptionResolve());
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
@ -428,7 +421,7 @@ public class DataExchangeController extends AbstractSchemaConnController
Collections.addAll(subDataExchangeSet, subDataExchanges);
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet,
importChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -461,7 +454,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames());
springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name());
@ -526,8 +518,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String importChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length];
@ -542,8 +532,8 @@ public class DataExchangeController extends AbstractSchemaConnController
importForm.getImportOption(), (tableNames == null ? null : tableNames[i]), readerFactory);
MessageSubTextValueDataImportListener listener = new MessageSubTextValueDataImportListener(
this.messageChannel, importChannelId, getMessageSource(), locale,
subDataExchangeIds[i], jsonDataImport.getImportOption().getExceptionResolve());
this.messageChannel, dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i],
jsonDataImport.getImportOption().getExceptionResolve());
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
evalSendDataExchangingMessageInterval(subDataExchangeIds.length, jsonDataImport));
@ -574,7 +564,7 @@ public class DataExchangeController extends AbstractSchemaConnController
Collections.addAll(subDataExchangeSet, subDataExchanges);
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet,
importChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -606,7 +596,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
return "/dataexchange/import_excel";
}
@ -659,8 +648,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String importChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length];
@ -674,8 +661,8 @@ public class DataExchangeController extends AbstractSchemaConnController
excelDataImport.setUnifiedTable(tableNames[i]);
MessageSubTextValueDataImportListener listener = new MessageSubTextValueDataImportListener(
this.messageChannel, importChannelId, getMessageSource(), locale,
subDataExchangeIds[i], excelDataImport.getImportOption().getExceptionResolve());
this.messageChannel, dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i],
excelDataImport.getImportOption().getExceptionResolve());
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
evalSendDataExchangingMessageInterval(subDataExchangeIds.length, excelDataImport));
@ -703,7 +690,7 @@ public class DataExchangeController extends AbstractSchemaConnController
Collections.addAll(subDataExchangeSet, subDataExchanges);
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet,
importChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -874,7 +861,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames());
springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name());
setParamInitSqlsAttribute(request, springModel);
@ -915,8 +901,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String exportChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
Set<SubDataExchange> subDataExchanges = new HashSet<>();
@ -932,9 +916,8 @@ public class DataExchangeController extends AbstractSchemaConnController
CsvDataExport csvDataExport = new CsvDataExport(connectionFactory, exportForm.getDataFormat(),
exportForm.getExportOption(), query, writerFactory);
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(
this.messageChannel, exportChannelId, getMessageSource(), getLocale(request),
subDataExchangeIds[i]);
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel,
dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]);
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
evalSendDataExchangingMessageInterval(subDataExchangeIds.length, csvDataExport));
@ -945,7 +928,7 @@ public class DataExchangeController extends AbstractSchemaConnController
}
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges,
exportChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -982,7 +965,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames());
springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name());
setParamInitSqlsAttribute(request, springModel);
@ -1022,8 +1004,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String exportChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
Set<SubDataExchange> subDataExchanges = new HashSet<>();
@ -1038,9 +1018,8 @@ public class DataExchangeController extends AbstractSchemaConnController
ExcelDataExport excelDataExport = new ExcelDataExport(connectionFactory, exportForm.getDataFormat(),
exportForm.getExportOption(), query, writerFactory);
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(
this.messageChannel, exportChannelId, getMessageSource(), getLocale(request),
subDataExchangeIds[i]);
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel,
dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]);
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
evalSendDataExchangingMessageInterval(subDataExchangeIds.length, excelDataExport));
@ -1051,7 +1030,7 @@ public class DataExchangeController extends AbstractSchemaConnController
}
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges,
exportChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -1093,7 +1072,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames());
springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name());
setParamInitSqlsAttribute(request, springModel);
@ -1137,8 +1115,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String exportChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
Set<SubDataExchange> subDataExchanges = new HashSet<>();
@ -1154,9 +1130,8 @@ public class DataExchangeController extends AbstractSchemaConnController
SqlDataExport sqlDataExport = new SqlDataExport(connectionFactory, exportForm.getDataFormat(),
exportForm.getExportOption(), query, tableNames[i], writerFactory);
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(
this.messageChannel, exportChannelId, getMessageSource(), getLocale(request),
subDataExchangeIds[i]);
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel,
dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]);
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
evalSendDataExchangingMessageInterval(subDataExchangeIds.length, sqlDataExport));
@ -1167,7 +1142,7 @@ public class DataExchangeController extends AbstractSchemaConnController
}
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges,
exportChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -1205,7 +1180,6 @@ public class DataExchangeController extends AbstractSchemaConnController
springModel.addAttribute("defaultDataFormat", defaultDataFormat);
springModel.addAttribute("dataExchangeId", dataExchangeId);
springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId));
springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames());
springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name());
setParamInitSqlsAttribute(request, springModel);
@ -1258,8 +1232,6 @@ public class DataExchangeController extends AbstractSchemaConnController
ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema));
String exportChannelId = getDataExchangeChannelId(dataExchangeId);
Locale locale = getLocale(request);
Set<SubDataExchange> subDataExchanges = new HashSet<>();
@ -1275,9 +1247,8 @@ public class DataExchangeController extends AbstractSchemaConnController
JsonDataExport csvDataExport = new JsonDataExport(connectionFactory, exportForm.getDataFormat(),
exportOption, query, writerFactory, (tableNames == null ? null : tableNames[i]));
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(
this.messageChannel, exportChannelId, getMessageSource(), getLocale(request),
subDataExchangeIds[i]);
MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel,
dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]);
listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i]));
listener.setSendExchangingMessageInterval(
evalSendDataExchangingMessageInterval(subDataExchangeIds.length, csvDataExport));
@ -1288,7 +1259,7 @@ public class DataExchangeController extends AbstractSchemaConnController
}
BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges,
exportChannelId, locale);
dataExchangeId, locale);
this.dataExchangeService.exchange(batchDataExchange);
@ -1353,6 +1324,21 @@ public class DataExchangeController extends AbstractSchemaConnController
}
}
@RequestMapping(value = "/{schemaId}/message", produces = CONTENT_TYPE_JSON)
@ResponseBody
public List<Object> message(HttpServletRequest request, HttpServletResponse response,
org.springframework.ui.Model springModel, @PathVariable("schemaId") String schemaId,
@RequestParam("dataExchangeId") String dataExchangeId,
@RequestParam(value = "messageCount", required = false) Integer messageCount) throws Throwable
{
if (messageCount == null)
messageCount = 50;
if (messageCount < 1)
messageCount = 1;
return this.messageChannel.pull(dataExchangeId, messageCount);
}
protected String[] setParamInitSqlsAttribute(HttpServletRequest request, org.springframework.ui.Model springModel)
{
String[] initSqls = request.getParameterValues("initSqls");
@ -1530,8 +1516,8 @@ public class DataExchangeController extends AbstractSchemaConnController
{
BatchDataExchange batchDataExchange = new SimpleBatchDataExchange(connectionFactory, subDataExchanges);
MessageBatchDataExchangeListener listener = new MessageBatchDataExchangeListener(this.messageChannel,
channel, getMessageSource(), locale);
MessageBatchDataExchangeListener listener = new MessageBatchDataExchangeListener(this.messageChannel, channel,
getMessageSource(), locale);
batchDataExchange.setListener(listener);
return batchDataExchange;
@ -1637,7 +1623,7 @@ public class DataExchangeController extends AbstractSchemaConnController
/**
* 计算导入/导出中消息发送间隔
* <p>
* 如果发送频率过快当数据交换很多时会出现cometd卡死的情况
* 如果发送频率过快当数据交换很多时会出现卡死的情况
* </p>
*
* @param total
@ -1782,17 +1768,6 @@ public class DataExchangeController extends AbstractSchemaConnController
return FileUtil.getDirectory(this.tempDirectory, "dataExchange", true);
}
/**
* 获取指定数据交换操作ID对应的cometd通道ID
*
* @param dataExchangeId
* @return
*/
protected String getDataExchangeChannelId(String dataExchangeId)
{
return "/dataexchange/channel/" + dataExchangeId;
}
public static class DataImportFileInfo extends FileInfo
{
private static final long serialVersionUID = 1L;

View File

@ -37,7 +37,6 @@ import org.datagear.web.sqlpad.SqlpadExecutionService.CommitMode;
import org.datagear.web.sqlpad.SqlpadExecutionService.ExceptionHandleMode;
import org.datagear.web.sqlpad.SqlpadExecutionService.SqlCommand;
import org.datagear.web.sqlpad.SqlpadExecutionSubmit;
import org.datagear.web.util.MessageChannel;
import org.datagear.web.util.OperationMessage;
import org.datagear.web.util.WebUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -65,9 +64,6 @@ public class SqlpadController extends AbstractSchemaConnController
@Autowired
private SqlSelectManager sqlSelectManager;
@Autowired
private MessageChannel messageChannel;
@Autowired
private SqlpadExecutionService sqlpadExecutionService;
@ -94,16 +90,6 @@ public class SqlpadController extends AbstractSchemaConnController
this.sqlSelectManager = sqlSelectManager;
}
public MessageChannel getMessageChannel()
{
return messageChannel;
}
public void setMessageChannel(MessageChannel messageChannel)
{
this.messageChannel = messageChannel;
}
public SqlpadExecutionService getSqlpadExecutionService()
{
return sqlpadExecutionService;
@ -165,10 +151,8 @@ public class SqlpadController extends AbstractSchemaConnController
initSql = "";
String sqlpadId = generateSqlpadId(request, response);
String sqlpadChannelId = this.sqlpadExecutionService.getSqlpadChannelId(sqlpadId);
springModel.addAttribute("sqlpadId", sqlpadId);
springModel.addAttribute("sqlpadChannelId", sqlpadChannelId);
springModel.addAttribute("sqlResultRowMapper", buildDefaultLOBRowMapper());
springModel.addAttribute("initSql", initSql);
@ -208,13 +192,6 @@ public class SqlpadController extends AbstractSchemaConnController
if (exceptionHandleMode == null)
exceptionHandleMode = ExceptionHandleMode.ABORT;
if (overTimeThreashold == null)
overTimeThreashold = 10;
else if (overTimeThreashold < 1)
overTimeThreashold = 1;
else if (overTimeThreashold > 60)
overTimeThreashold = 60;
if (resultsetFetchSize == null)
resultsetFetchSize = DEFAULT_SQL_RESULTSET_FETCH_SIZE;
@ -245,9 +222,15 @@ public class SqlpadController extends AbstractSchemaConnController
@ResponseBody
public List<Object> message(HttpServletRequest request, HttpServletResponse response,
org.springframework.ui.Model springModel, @PathVariable("schemaId") String schemaId,
@RequestParam("sqlpadChannelId") String sqlpadChannelId) throws Throwable
@RequestParam("sqlpadId") String sqlpadId,
@RequestParam(value = "messageCount", required = false) Integer messageCount) throws Throwable
{
return this.messageChannel.pull(sqlpadChannelId, 10);
if (messageCount == null)
messageCount = 50;
if (messageCount < 1)
messageCount = 1;
return this.sqlpadExecutionService.message(sqlpadId, messageCount);
}
@RequestMapping(value = "/{schemaId}/select", produces = CONTENT_TYPE_JSON)

View File

@ -47,8 +47,6 @@ public class SqlpadExecutionService extends PersistenceSupport
private MessageSource messageSource;
private MessageChannel messageChannel;
private SqlHistoryService sqlHistoryService;
private SqlSelectManager sqlSelectManager;
@ -57,6 +55,9 @@ public class SqlpadExecutionService extends PersistenceSupport
private SchemaConnectionSupport schemaConnectionSupport = new SchemaConnectionSupport();
private MessageChannel _messageChannel = new MessageChannel(
SqlpadExecutionSubmit.MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES * 60);
private ExecutorService _executorService = Executors.newCachedThreadPool();
private ConcurrentMap<String, SqlpadExecutionRunnable> _sqlpadExecutionRunnableMap = new ConcurrentHashMap<>();
@ -67,13 +68,11 @@ public class SqlpadExecutionService extends PersistenceSupport
}
public SqlpadExecutionService(ConnectionSource connectionSource, MessageSource messageSource,
MessageChannel messageChannel, SqlHistoryService sqlHistoryService,
SqlSelectManager sqlSelectManager)
SqlHistoryService sqlHistoryService, SqlSelectManager sqlSelectManager)
{
super();
this.connectionSource = connectionSource;
this.messageSource = messageSource;
this.messageChannel = messageChannel;
this.sqlHistoryService = sqlHistoryService;
this.sqlSelectManager = sqlSelectManager;
}
@ -98,14 +97,9 @@ public class SqlpadExecutionService extends PersistenceSupport
this.messageSource = messageSource;
}
public MessageChannel getMessageChannel()
protected MessageChannel getMessageChannel()
{
return messageChannel;
}
public void setMessageChannel(MessageChannel messageChannel)
{
this.messageChannel = messageChannel;
return this._messageChannel;
}
public SqlHistoryService getSqlHistoryService()
@ -156,9 +150,7 @@ public class SqlpadExecutionService extends PersistenceSupport
*/
public boolean submit(SqlpadExecutionSubmit submit)
{
String sqlpadChannelId = getSqlpadChannelId(submit.getSqlpadId());
SqlpadExecutionRunnable sqlpadExecutionRunnable = new SqlpadExecutionRunnable(submit, sqlpadChannelId);
SqlpadExecutionRunnable sqlpadExecutionRunnable = new SqlpadExecutionRunnable(submit);
SqlpadExecutionRunnable old = this._sqlpadExecutionRunnableMap.putIfAbsent(submit.getSqlpadId(),
sqlpadExecutionRunnable);
@ -190,6 +182,19 @@ public class SqlpadExecutionService extends PersistenceSupport
return true;
}
/**
* 获取反馈消息
*
* @param <T>
* @param sqlpadId
* @param count
* @return
*/
public <T> List<T> message(String sqlpadId, int count)
{
return this._messageChannel.pull(sqlpadId, count);
}
/**
* 关闭
*/
@ -198,17 +203,6 @@ public class SqlpadExecutionService extends PersistenceSupport
this._executorService.shutdown();
}
/**
* 获取指定SQL工作台ID对应的cometd通道ID
*
* @param sqlpadId
* @return
*/
public String getSqlpadChannelId(String sqlpadId)
{
return "/sqlpad/channel/" + sqlpadId;
}
/**
* 获取指定{@linkplain Schema}{@linkplain Connection}
*
@ -229,7 +223,7 @@ public class SqlpadExecutionService extends PersistenceSupport
*/
protected void sendStartMessage(String channel, int sqlCount)
{
this.messageChannel.push(channel, new StartMessageData(sqlCount));
this._messageChannel.push(channel, new StartMessageData(sqlCount));
}
/**
@ -241,7 +235,7 @@ public class SqlpadExecutionService extends PersistenceSupport
*/
protected void sendSqlSuccessMessage(String channel, SqlStatement sqlStatement, int sqlStatementIndex)
{
this.messageChannel.push(channel,
this._messageChannel.push(channel,
new SqlSuccessMessageData(sqlStatement, sqlStatementIndex, SqlResultType.NONE));
}
@ -260,7 +254,7 @@ public class SqlpadExecutionService extends PersistenceSupport
SqlResultType.UPDATE_COUNT);
sqlSuccessMessageData.setUpdateCount(updateCount);
this.messageChannel.push(channel, sqlSuccessMessageData);
this._messageChannel.push(channel, sqlSuccessMessageData);
}
/**
@ -278,7 +272,7 @@ public class SqlpadExecutionService extends PersistenceSupport
SqlResultType.RESULT_SET);
sqlSuccessMessageData.setSqlSelectResult(sqlSelectResult);
this.messageChannel.push(channel, sqlSuccessMessageData);
this._messageChannel.push(channel, sqlSuccessMessageData);
}
/**
@ -295,7 +289,7 @@ public class SqlpadExecutionService extends PersistenceSupport
{
SQLExceptionMessageData messageData = new SQLExceptionMessageData(sqlStatement, sqlStatementIndex, content);
this.messageChannel.push(channel, messageData);
this._messageChannel.push(channel, messageData);
}
/**
@ -311,7 +305,7 @@ public class SqlpadExecutionService extends PersistenceSupport
{
SQLExceptionMessageData messageData = new SQLExceptionMessageData(sqlStatement, sqlStatementIndex, content);
this.messageChannel.push(channel, messageData);
this._messageChannel.push(channel, messageData);
}
/**
@ -328,7 +322,7 @@ public class SqlpadExecutionService extends PersistenceSupport
if (trace)
messageData.setDetailTrace(t);
this.messageChannel.push(channel, messageData);
this._messageChannel.push(channel, messageData);
}
/**
@ -340,7 +334,7 @@ public class SqlpadExecutionService extends PersistenceSupport
protected void sendExceptionMessage(String channel, String content)
{
ExceptionMessageData messageData = new ExceptionMessageData(content);
this.messageChannel.push(channel, messageData);
this._messageChannel.push(channel, messageData);
}
/**
@ -352,7 +346,7 @@ public class SqlpadExecutionService extends PersistenceSupport
*/
protected void sendSqlCommandMessage(String channel, SqlCommand sqlCommand, String content)
{
this.messageChannel.push(channel, new SqlCommandMessageData(sqlCommand, content));
this._messageChannel.push(channel, new SqlCommandMessageData(sqlCommand, content));
}
/**
@ -369,7 +363,7 @@ public class SqlpadExecutionService extends PersistenceSupport
SqlCommandMessageData sqlCommandMessageData = new SqlCommandMessageData(sqlCommand, content);
sqlCommandMessageData.setSqlExecutionStat(sqlExecutionStat);
this.messageChannel.push(channel, sqlCommandMessageData);
this._messageChannel.push(channel, sqlCommandMessageData);
}
/**
@ -380,7 +374,7 @@ public class SqlpadExecutionService extends PersistenceSupport
*/
protected void sendTextMessage(String channel, String text)
{
this.messageChannel.push(channel, new TextMessageData(text));
this._messageChannel.push(channel, new TextMessageData(text));
}
/**
@ -395,7 +389,7 @@ public class SqlpadExecutionService extends PersistenceSupport
TextMessageData textMessageData = new TextMessageData(text);
textMessageData.setCssClass(cssClass);
this.messageChannel.push(channel, textMessageData);
this._messageChannel.push(channel, textMessageData);
}
/**
@ -412,7 +406,7 @@ public class SqlpadExecutionService extends PersistenceSupport
textMessageData.setCssClass(cssClass);
textMessageData.setSqlExecutionStat(sqlExecutionStat);
this.messageChannel.push(channel, textMessageData);
this._messageChannel.push(channel, textMessageData);
}
/**
@ -425,7 +419,7 @@ public class SqlpadExecutionService extends PersistenceSupport
*/
protected void sendFinishMessage(String channel)
{
this.messageChannel.push(channel, new FinishMessageData());
this._messageChannel.push(channel, new FinishMessageData());
}
/**
@ -442,7 +436,7 @@ public class SqlpadExecutionService extends PersistenceSupport
FinishMessageData finishMessageData = new FinishMessageData();
finishMessageData.setSqlExecutionStat(sqlExecutionStat);
this.messageChannel.push(channel, finishMessageData);
this._messageChannel.push(channel, finishMessageData);
}
/**
@ -476,8 +470,6 @@ public class SqlpadExecutionService extends PersistenceSupport
*/
protected class SqlpadExecutionRunnable extends SqlpadExecutionSubmit implements Runnable
{
private String sqlpadChannelId;
/** 发送给此Runnable的SQL命令 */
private volatile SqlCommand sqlCommand;
@ -486,20 +478,9 @@ public class SqlpadExecutionService extends PersistenceSupport
super();
}
public SqlpadExecutionRunnable(SqlpadExecutionSubmit submit, String sqlpadChannelId)
public SqlpadExecutionRunnable(SqlpadExecutionSubmit submit)
{
super(submit);
this.sqlpadChannelId = sqlpadChannelId;
}
public String getSqlpadChannelId()
{
return sqlpadChannelId;
}
public void setSqlpadChannelId(String sqlpadChannelId)
{
this.sqlpadChannelId = sqlpadChannelId;
}
public SqlCommand getSqlCommand()
@ -518,7 +499,7 @@ public class SqlpadExecutionService extends PersistenceSupport
Connection cn = null;
Statement st = null;
sendStartMessage(this.sqlpadChannelId, getSqlStatements().size());
sendStartMessage(getSqlpadId(), getSqlStatements().size());
try
{
@ -529,10 +510,10 @@ public class SqlpadExecutionService extends PersistenceSupport
}
catch (Throwable t)
{
sendExceptionMessage(sqlpadChannelId, t,
getMessage(getLocale(), "sqlpad.executionConnectionException"), false);
sendExceptionMessage(getSqlpadId(), t, getMessage(getLocale(), "sqlpad.executionConnectionException"),
false);
sendFinishMessage(this.sqlpadChannelId);
sendFinishMessage(getSqlpadId());
_sqlpadExecutionRunnableMap.remove(getSqlpadId());
@ -558,7 +539,7 @@ public class SqlpadExecutionService extends PersistenceSupport
if (!SqlpadExecutionService.this.sqlPermissionChecker.hasPermission(getUser(), getSchema(),
sqlStatement))
{
sendSqlExceptionMessage(sqlpadChannelId, sqlStatement, i,
sendSqlExceptionMessage(getSqlpadId(), sqlStatement, i,
getMessage(getLocale(), "sqlpad.executionSQLPermissionDenied"));
sqlExecutionStat.increaseExceptionCount();
@ -576,7 +557,7 @@ public class SqlpadExecutionService extends PersistenceSupport
{
sqlExecutionStat.increaseExceptionCount();
sendSqlExceptionMessage(sqlpadChannelId, sqlStatement, i, e,
sendSqlExceptionMessage(getSqlpadId(), sqlStatement, i, e,
getMessage(getLocale(), "sqlpad.executionSQLException", e.getMessage()));
if (ExceptionHandleMode.IGNORE.equals(getExceptionHandleMode()))
@ -607,8 +588,7 @@ public class SqlpadExecutionService extends PersistenceSupport
}
catch (Throwable t)
{
sendExceptionMessage(sqlpadChannelId, t,
getMessage(getLocale(), "sqlpad.executionErrorOccure"), true);
sendExceptionMessage(getSqlpadId(), t, getMessage(getLocale(), "sqlpad.executionErrorOccure"), true);
}
finally
{
@ -617,7 +597,7 @@ public class SqlpadExecutionService extends PersistenceSupport
sqlExecutionStat.setTaskDuration(System.currentTimeMillis() - startTime);
sendFinishMessage(this.sqlpadChannelId, sqlExecutionStat);
sendFinishMessage(getSqlpadId(), sqlExecutionStat);
_sqlpadExecutionRunnableMap.remove(getSqlpadId());
}
@ -660,8 +640,7 @@ public class SqlpadExecutionService extends PersistenceSupport
// 暂停超时
if (SqlCommand.PAUSE.equals(this.sqlCommand))
{
sendTextMessage(this.sqlpadChannelId,
getMessage(getLocale(), "sqlpad.pauseOverTime"));
sendTextMessage(getSqlpadId(), getMessage(getLocale(), "sqlpad.pauseOverTime"));
this.sqlCommand = SqlCommand.RESUME;
}
@ -732,7 +711,7 @@ public class SqlpadExecutionService extends PersistenceSupport
{
if (!sendWatingMessage)
{
sendTextMessage(this.sqlpadChannelId,
sendTextMessage(getSqlpadId(),
getMessage(getLocale(), "sqlpad.waitingForCommitOrRollback", getOverTimeThreashold()),
"message-content-highlight", sqlExecutionStat);
@ -745,8 +724,7 @@ public class SqlpadExecutionService extends PersistenceSupport
// 等待超时
if (!SqlCommand.COMMIT.equals(this.sqlCommand) && !SqlCommand.ROLLBACK.equals(this.sqlCommand))
{
sendTextMessage(this.sqlpadChannelId,
getMessage(getLocale(), "sqlpad.waitOverTime"));
sendTextMessage(getSqlpadId(), getMessage(getLocale(), "sqlpad.waitOverTime"));
this.sqlCommand = (sqlExecutionStat.getExceptionCount() > 0 ? SqlCommand.ROLLBACK : SqlCommand.COMMIT);
}
@ -800,8 +778,7 @@ public class SqlpadExecutionService extends PersistenceSupport
SqlSelectResult sqlSelectResult = SqlpadExecutionService.this.sqlSelectManager.select(cn, sql, rs, 1,
getResultsetFetchSize(), getResultsetRowMapper());
sendSqlSuccessMessage(this.sqlpadChannelId, sqlStatement,
sqlStatementIndex, sqlSelectResult);
sendSqlSuccessMessage(getSqlpadId(), sqlStatement, sqlStatementIndex, sqlSelectResult);
}
else
{
@ -810,14 +787,12 @@ public class SqlpadExecutionService extends PersistenceSupport
// 更新操作
if (updateCount > -1)
{
sendSqlSuccessMessage(this.sqlpadChannelId, sqlStatement,
sqlStatementIndex, updateCount);
sendSqlSuccessMessage(getSqlpadId(), sqlStatement, sqlStatementIndex, updateCount);
}
// 其他操作
else
{
sendSqlSuccessMessage(this.sqlpadChannelId, sqlStatement,
sqlStatementIndex);
sendSqlSuccessMessage(getSqlpadId(), sqlStatement, sqlStatementIndex);
}
}
}
@ -832,7 +807,7 @@ public class SqlpadExecutionService extends PersistenceSupport
{
String messageKey = "sqlpad.SqlCommand." + sqlCommand.toString() + ".ok";
SqlpadExecutionService.this.sendSqlCommandMessage(this.sqlpadChannelId, sqlCommand,
SqlpadExecutionService.this.sendSqlCommandMessage(getSqlpadId(), sqlCommand,
getMessage(getLocale(), messageKey, messageArgs));
}

View File

@ -26,6 +26,9 @@ import org.datagear.web.sqlpad.SqlpadExecutionService.ExceptionHandleMode;
*/
public class SqlpadExecutionSubmit
{
/** 暂停允许最大分钟数 */
public static final int MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES = 60;
private User user;
private Schema schema;
@ -40,6 +43,7 @@ public class SqlpadExecutionSubmit
private ExceptionHandleMode exceptionHandleMode;
/** 暂停超时过期分钟数 */
private int overTimeThreashold;
private int resultsetFetchSize;
@ -61,7 +65,7 @@ public class SqlpadExecutionSubmit
public SqlpadExecutionSubmit(User user, Schema schema, String sqlpadId, File sqlpadFileDirectory,
List<SqlStatement> sqlStatements, CommitMode commitMode, ExceptionHandleMode exceptionHandleMode,
int overTimeThreashold, int resultsetFetchSize, RowMapper resultsetRowMapper, Locale locale)
Integer overTimeThreashold, int resultsetFetchSize, RowMapper resultsetRowMapper, Locale locale)
{
super();
this.user = user;
@ -71,7 +75,7 @@ public class SqlpadExecutionSubmit
this.sqlStatements = sqlStatements;
this.commitMode = commitMode;
this.exceptionHandleMode = exceptionHandleMode;
this.overTimeThreashold = overTimeThreashold;
setOverTimeThreashold(overTimeThreashold);
this.resultsetFetchSize = resultsetFetchSize;
this.resultsetRowMapper = resultsetRowMapper;
this.locale = locale;
@ -152,8 +156,15 @@ public class SqlpadExecutionSubmit
return overTimeThreashold;
}
public void setOverTimeThreashold(int overTimeThreashold)
public void setOverTimeThreashold(Integer overTimeThreashold)
{
if (overTimeThreashold == null)
overTimeThreashold = 10;
else if (overTimeThreashold < 1)
overTimeThreashold = 1;
else if (overTimeThreashold > MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES)
overTimeThreashold = MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES;
this.overTimeThreashold = overTimeThreashold;
}

View File

@ -3444,71 +3444,161 @@
{
this.url = url;
this.messageHandler = messageHandler;
this._status = "";
this.options = $.extend(
{
interval: 50,
data: undefined
//轮询间隔
interval: 100,
//挂起状态时的轮询间隔
suspendInterval: 3000,
//当连续接收空消息这些秒数后,自动进入挂起状态,-1 表示不自动挂起
autoSuspendExpireSeconds: 10,
//自动挂起状态时的轮询间隔
autoSuspendInterval: 1500,
//ajax设置项
ajaxOptions: {}
},
options);
};
$.TaskClient.prototype =
{
//开始接收消息
//开始轮询接收消息
start: function()
{
if(this._status == "run")
if(this.isActive())
return false;
this._status = "run";
this._status = "active.run";
this._receiveAndHandleMessage();
return true;
},
isStart: function()
//挂起,进入慢轮询状态
suspend: function()
{
return (this._status == "run");
if(!this.isActive())
return false;
this._status = "active.suspend";
return true;
},
isFinish: function()
//唤醒,从慢轮询状态恢复
resume: function()
{
return (this._status == "finish");
if(!this.isSuspend())
return false;
this.stop();
this.start();
return true;
},
//停止轮询接收消息停止后可重新start
stop: function()
{
if(!this.isActive())
return false;
this._status = "stop";
if(this._timeoutId)
{
clearTimeout(this._timeoutId);
this._timeoutId = "";
}
return true;
},
isActive: function()
{
return (this._status && this._status.indexOf("active") == 0);
},
isSuspend: function()
{
return (this._status == "active.suspend");
},
_receiveAndHandleMessage: function()
{
if(this._status == "finish")
if(!this.isActive())
return;
var taskClient = this;
$.ajax({
type : "POST",
url : this.url,
data : this.options.data,
success : function(messages)
{
if(messages == null)
messages = [];
else if(!$.isArray(messages))
messages = [ messages ];
var isFinish = false;
for(var i=0; i<messages.length; i++)
var ajaxOptions = $.extend({}, this.options.ajaxOptions,
{
var myIsFinish = taskClient.messageHandler(messages[i]);
if(!isFinish && myIsFinish === true)
isFinish = true;
}
if(isFinish)
taskClient._status = "finish";
else
setTimeout(function(){ taskClient._receiveAndHandleMessage(); }, taskClient.options.interval);
}
});
type : "POST",
url : this.url,
data : this.options.data,
success : function(messages)
{
if(messages == null)
messages = [];
else if(!$.isArray(messages))
messages = [ messages ];
var isFinish = false;
for(var i=0; i<messages.length; i++)
{
var myIsFinish = taskClient.messageHandler(messages[i]);
if(!isFinish && myIsFinish === true)
isFinish = true;
}
if(isFinish)
taskClient._status = "stop";
//处理自动挂起
var autoSuspend = false;
if(taskClient.options.autoSuspendExpireSeconds > -1)
{
if(messages.length > 0)
{
autoSuspend = false;
taskClient._firstEmptyTime = null;
}
else
{
if(taskClient._firstEmptyTime
&& (new Date().getTime() - taskClient._firstEmptyTime)
>= taskClient.options.autoSuspendExpireSeconds*1000)
{
autoSuspend = true;
}
if(!taskClient._prevMessagesEmpty)
taskClient._firstEmptyTime = new Date().getTime();
}
taskClient._prevMessagesEmpty = (messages.length == 0);
}
if(taskClient.isActive())
{
var interval = (taskClient.isSuspend() || autoSuspend ?
taskClient.options.suspendInterval : taskClient.options.interval);
if(autoSuspend)
interval = taskClient.options.autoSuspendInterval;
taskClient._timeoutId = setTimeout(function()
{
taskClient._receiveAndHandleMessage();
},
interval);
}
}
});
$.ajax(ajaxOptions);
}
};
})

View File

@ -100,7 +100,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataExport_js.ftl" >
@ -113,7 +112,6 @@ Schema schema 数据库不允许为null
return po.toExportFileNameSuper(query, ".csv");
};
po.cometdInitIfNot();
po.initDataExportSteps();
po.initDataExchangeUIs();
po.initDataExportUIs();

View File

@ -102,7 +102,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataExport_js.ftl" >
@ -115,7 +114,6 @@ Schema schema 数据库不允许为null
return po.toExportFileNameSuper(query, ".xlsx");
};
po.cometdInitIfNot();
po.initDataExportSteps();
po.initDataExchangeUIs();
po.initDataExportUIs();

View File

@ -111,7 +111,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataExport_js.ftl" >
@ -158,7 +157,6 @@ Schema schema 数据库不允许为null
width : "20%"
});
po.cometdInitIfNot();
po.initDataExportSteps();
po.initDataExchangeUIs();
po.initDataExportUIs();

View File

@ -111,7 +111,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataExport_js.ftl" >
@ -148,7 +147,6 @@ Schema schema 数据库不允许为null
});
po.dataExportTableColumns[0].width = "30%";
po.cometdInitIfNot();
po.initDataExportSteps();
po.initDataExchangeUIs();
po.initDataExportUIs();

View File

@ -121,7 +121,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataImport_js.ftl" >
@ -157,7 +156,6 @@ Schema schema 数据库不允许为null
});
po.dataImportTableColumns[1].width = "25%";
po.cometdInitIfNot();
po.initDataImportSteps();
po.initDataExchangeUIs();
po.initDataImportUIs();

View File

@ -123,7 +123,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataImport_js.ftl" >
@ -159,7 +158,6 @@ Schema schema 数据库不允许为null
});
po.dataImportTableColumns[1].width = "25%";
po.cometdInitIfNot();
po.initDataImportSteps();
po.initDataExchangeUIs();
po.initDataImportUIs();

View File

@ -132,7 +132,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataImport_js.ftl" >
@ -202,7 +201,6 @@ Schema schema 数据库不允许为null
});
po.dataImportTableColumns[2].width = "auto";
po.cometdInitIfNot();
po.initDataImportSteps();
po.initDataExchangeUIs();
po.initDataImportUIs();

View File

@ -97,7 +97,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_grid.ftl">
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "include/dataExchange_js.ftl" >
<#include "include/dataImport_js.ftl" >
@ -106,7 +105,6 @@ Schema schema 数据库不允许为null
{
po.dependentNumberInputPlaceholder = "<@spring.message code='dataImport.dependentNumber.none' />";
po.cometdInitIfNot();
po.initDataImportSteps();
po.initDataExchangeUIs();
po.initDataImportUIs();

View File

@ -50,6 +50,16 @@ po.subDataExchangeStatusColumnIndex 子数据交换表格中状态列索引
return po.nextSubDataExchangeIdSeq;
};
po.dataExchangeTaskClient = new $.TaskClient("${contextPath}/dataexchange/"+po.schemaId+"/message",
function(message)
{
return po.handleDataExchangeMessage(message);
},
{
data: { dataExchangeId: po.dataExchangeId }
}
);
po.calTableHeight = function()
{
@ -228,9 +238,9 @@ po.subDataExchangeStatusColumnIndex 子数据交换表格中状态列索引
return true;
};
po.handleDataExchangeCometdMessage = function(message)
po.handleDataExchangeMessage = function(message)
{
message = message.data;
var isFinish = false;
var type = (message ? message.type : "");
if("Start" == type)
@ -251,15 +261,18 @@ po.subDataExchangeStatusColumnIndex 子数据交换表格中状态列索引
}
else if("Finish" == type)
{
isFinish = true;
po.refreshSubDataExchangeStatus();
po.setDataExchangeProgress(100, message.duration);
po.updateDataExchangePageStatus("finish");
}
else
po.handleSubDataExchangeCometdMessage(message);
po.handleSubDataExchangeMessage(message);
return isFinish;
};
po.handleSubDataExchangeCometdMessage = function(message)
po.handleSubDataExchangeMessage = function(message)
{
var subDataExchangeId = message.subDataExchangeId;

View File

@ -2,7 +2,6 @@
导出公用片段
String dataExchangeId 数据交换ID
String dataExchangeChannelId 数据交换cometd通道ID
依赖:
dataExchange_js.ftl
@ -11,8 +10,6 @@ dataExchange_js.ftl
<script type="text/javascript">
(function(po)
{
po.dataExchangeChannelId = "${dataExchangeChannelId}";
po.addSubDataExchange = function(query)
{
if(query == null)
@ -342,25 +339,26 @@ dataExchange_js.ftl
po.element("#${pageId}-form").submit(function()
{
if(po.dataExchangeTaskClient.isActive())
return;
po.dataExchangeTaskClient.start();
po.resetAllSubDataExchangeStatus();
po.cometdExecuteAfterSubscribe(po.dataExchangeChannelId,
function()
po.element("#${pageId}-form").ajaxSubmit(
{
po.element("#${pageId}-form").ajaxSubmit(
success: function(data)
{
success: function(data)
{
po.subDataExchangeFileNameMap = data.data;
if(!po.isDataExchangePageStatus("finish"))
po.updateDataExchangePageStatus("exchange");
}
});
},
function(message)
{
po.handleDataExchangeCometdMessage(message);
po.subDataExchangeFileNameMap = data.data;
if(!po.isDataExchangePageStatus("finish"))
po.updateDataExchangePageStatus("exchange");
},
error: function()
{
po.dataExchangeTaskClient.stop();
}
});
return false;

View File

@ -2,7 +2,6 @@
导入公用片段
String dataExchangeId 数据交换ID
String dataExchangeChannelId 数据交换cometd通道ID
依赖:
dataExchange_js.ftl
@ -11,7 +10,6 @@ dataExchange_js.ftl
<script type="text/javascript">
(function(po)
{
po.dataExchangeChannelId = "${dataExchangeChannelId}";
po.dependentNumberInputPlaceholder = "";
po.addSubDataExchangesForFileInfos = function(fileInfos)
@ -231,23 +229,24 @@ dataExchange_js.ftl
po.element("#${pageId}-form").submit(function()
{
if(po.dataExchangeTaskClient.isActive())
return;
po.dataExchangeTaskClient.start();
po.resetAllSubDataExchangeStatus();
po.cometdExecuteAfterSubscribe(po.dataExchangeChannelId,
function()
po.element("#${pageId}-form").ajaxSubmit(
{
po.element("#${pageId}-form").ajaxSubmit(
success: function()
{
success: function()
{
if(!po.isDataExchangePageStatus("finish"))
po.updateDataExchangePageStatus("exchange");
}
});
},
function(message)
{
po.handleDataExchangeCometdMessage(message);
if(!po.isDataExchangePageStatus("finish"))
po.updateDataExchangePageStatus("exchange");
},
error: function()
{
po.dataExchangeTaskClient.stop();
}
});
return false;

View File

@ -1,62 +0,0 @@
<#--
Cometd JS片段。
依赖:
page_js_obj.jsp
-->
<script type="text/javascript">
(function(po)
{
if(!window.cometdSubscribedMap)
window.cometdSubscribedMap = {};
po.cometdInitIfNot = function()
{
var cometd = $.cometd;
if(!window.cometdIsInit)
{
window.cometdIsInit = true;
cometd.configure(
{
//logLevel : "debug",
url : "${contextPath}/cometd"
});
cometd.handshake(function(handshakeReply)
{
window.cometdSubscribedMap = {};
window.cometdConnected = false;
if(handshakeReply.successful)
window.cometdConnected = true;
});
}
};
po.cometdExecuteAfterSubscribe = function(channelId, executeCallback, messageHandler)
{
if(!window.cometdConnected)
return false;
if(window.cometdSubscribedMap[channelId] != null)
executeCallback();
else
{
var cometd = $.cometd;
cometd.subscribe(channelId, messageHandler,
function(subscribeReply)
{
if(subscribeReply.successful)
{
window.cometdSubscribedMap[channelId] = "1";
executeCallback();
}
});
}
}
})
(${pageId});
</script>

View File

@ -170,7 +170,6 @@ Schema schema 数据库不允许为null
<#include "../include/page_js_obj.ftl">
<#include "../include/page_obj_tabs.ftl" >
<#include "../include/page_obj_cometd.ftl">
<#include "../include/page_obj_format_time.ftl" >
<#include "../include/page_obj_data_permission.ftl">
<#include "../include/page_obj_data_permission_ds_table.ftl">
@ -180,7 +179,6 @@ Schema schema 数据库不允许为null
{
po.schemaId = "${schema.id}";
po.sqlpadId = "${sqlpadId}";
po.sqlpadChannelId = "${sqlpadChannelId}";
po.sqlResultReadActualBinaryRows = parseInt("${sqlResultRowMapper.readActualBinaryRows}");
po.sqlResultBinaryPlaceholder = "${sqlResultRowMapper.binaryPlaceholder?js_string}";
@ -197,7 +195,7 @@ Schema schema 数据库不允许为null
return po.handleMessage(message);
},
{
data: { sqlpadChannelId: po.sqlpadChannelId }
data: { sqlpadId: po.sqlpadId }
}
);
@ -309,6 +307,7 @@ Schema schema 数据库不允许为null
},
error : function()
{
po.sqlpadTaskClient.stop();
po.updateExecuteSqlButtonState(po.element("#executeSqlButton"), "init");
}
});
@ -543,6 +542,12 @@ Schema schema 数据库不允许为null
po.resultMessageElement.scrollTop(po.resultMessageElement.prop("scrollHeight"));
}
//如果在暂停,则应挂起(比如暂停时执行命令);否则,应唤醒(比如暂停超时)
if(po.isExecutionStatePaused())
po.sqlpadTaskClient.suspend();
else
po.sqlpadTaskClient.resume();
return isFinish;
};
@ -890,6 +895,13 @@ Schema schema 数据库不允许为null
return resultsetFetchSize;
};
po.isExecutionStatePaused = function()
{
var executionState = po.element("#executeSqlButton").attr("execution-state");
return (executionState == "paused");
};
po.element("#executeSqlButton").click(function()
{
var $this = $(this);
@ -902,6 +914,7 @@ Schema schema 数据库不允许为null
}
else if(executionState == "paused")
{
po.sqlpadTaskClient.resume();
po.sendSqlCommand("RESUME", $this);
}
else
@ -923,6 +936,9 @@ Schema schema 数据库不允许为null
if(!sql)
return;
if(po.sqlpadTaskClient.isActive())
return;
var settingForm = po.element("#settingForm");
var commitMode = po.element("input[name='sqlCommitMode']:checked", settingForm).val();
@ -941,21 +957,42 @@ Schema schema 数据库不允许为null
po.element("#stopSqlButton").click(function()
{
po.sendSqlCommand("STOP", $(this));
if(po.sqlpadTaskClient.isActive())
{
//如果挂起,则应唤醒接收命令响应
if(po.sqlpadTaskClient.isSuspend())
po.sqlpadTaskClient.resume();
po.sendSqlCommand("STOP", $(this));
}
po.sqlEditor.focus();
});
po.element("#commitSqlButton").click(function()
{
po.sendSqlCommand("COMMIT", $(this));
if(po.sqlpadTaskClient.isActive())
{
//如果挂起,则应唤醒接收命令响应
if(po.sqlpadTaskClient.isSuspend())
po.sqlpadTaskClient.resume();
po.sendSqlCommand("COMMIT", $(this));
}
po.sqlEditor.focus();
});
po.element("#rollbackSqlButton").click(function()
{
po.sendSqlCommand("ROLLBACK", $(this));
if(po.sqlpadTaskClient.isActive())
{
//如果挂起,则应唤醒接收命令响应
if(po.sqlpadTaskClient.isSuspend())
po.sqlpadTaskClient.resume();
po.sendSqlCommand("ROLLBACK", $(this));
}
po.sqlEditor.focus();
});

View File

@ -19,7 +19,6 @@
<commons-csv.version>1.4</commons-csv.version>
<poi.version>3.17</poi.version>
<poi-ooxml.version>3.17</poi-ooxml.version>
<cometd.version>4.0.9</cometd.version>
<httpclient.version>5.0.1</httpclient.version>
<mybatis.version>3.3.1</mybatis.version>
<mybatis-spring.version>1.3.1</mybatis-spring.version>