diff --git a/datagear-web/src/main/java/org/datagear/web/config/CoreConfig.java b/datagear-web/src/main/java/org/datagear/web/config/CoreConfig.java index e543cacf..e54c94ea 100644 --- a/datagear-web/src/main/java/org/datagear/web/config/CoreConfig.java +++ b/datagear-web/src/main/java/org/datagear/web/config/CoreConfig.java @@ -88,7 +88,6 @@ import org.datagear.web.sqlpad.SqlpadExecutionService; import org.datagear.web.util.ChangelogResolver; import org.datagear.web.util.DirectoryFactory; import org.datagear.web.util.DirectoryHtmlChartPluginManagerInitializer; -import org.datagear.web.util.MessageChannel; import org.datagear.web.util.SqlDriverChecker; import org.datagear.web.util.TableCache; import org.datagear.web.util.XmlDriverEntityManagerInitializer; @@ -618,12 +617,6 @@ public class CoreConfig implements InitializingBean return new ChangelogResolver(); } - @Bean - public MessageChannel messageChannel() - { - return new MessageChannel(); - } - @Bean public SqlSelectManager sqlSelectManager() { @@ -635,7 +628,7 @@ public class CoreConfig implements InitializingBean public SqlpadExecutionService sqlpadExecutionService() { SqlpadExecutionService bean = new SqlpadExecutionService(this.connectionSource(), this.messageSource(), - this.messageChannel(), this.sqlHistoryService(), this.sqlSelectManager()); + this.sqlHistoryService(), this.sqlSelectManager()); return bean; } diff --git a/datagear-web/src/main/java/org/datagear/web/controller/DataExchangeController.java b/datagear-web/src/main/java/org/datagear/web/controller/DataExchangeController.java index 7cc47000..1ad52bd4 100644 --- a/datagear-web/src/main/java/org/datagear/web/controller/DataExchangeController.java +++ b/datagear-web/src/main/java/org/datagear/web/controller/DataExchangeController.java @@ -116,15 +116,14 @@ public class DataExchangeController extends AbstractSchemaConnController @Autowired private DataExchangeService dataExchangeService; - @Autowired - private MessageChannel messageChannel; - @Autowired private DBMetaResolver dbMetaResolver; @Autowired private File tempDirectory; + private MessageChannel messageChannel = new MessageChannel(); + public DataExchangeController() { super(); @@ -145,7 +144,7 @@ public class DataExchangeController extends AbstractSchemaConnController return messageChannel; } - public void setMessageChannel(MessageChannel messageChannel) + protected void setMessageChannel(MessageChannel messageChannel) { this.messageChannel = messageChannel; } @@ -211,7 +210,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames()); springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name()); @@ -267,8 +265,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String importChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length]; @@ -283,8 +279,8 @@ public class DataExchangeController extends AbstractSchemaConnController dataImportForm.getImportOption(), tableNames[i], readerFactory); MessageSubTextValueDataImportListener listener = new MessageSubTextValueDataImportListener( - this.messageChannel, importChannelId, getMessageSource(), locale, - subDataExchangeIds[i], csvDataImport.getImportOption().getExceptionResolve()); + this.messageChannel, dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i], + csvDataImport.getImportOption().getExceptionResolve()); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( evalSendDataExchangingMessageInterval(subDataExchangeIds.length, csvDataImport)); @@ -312,7 +308,7 @@ public class DataExchangeController extends AbstractSchemaConnController Collections.addAll(subDataExchangeSet, subDataExchanges); BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet, - importChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -344,7 +340,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames()); springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name()); @@ -395,8 +390,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String importChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length]; @@ -410,8 +403,8 @@ public class DataExchangeController extends AbstractSchemaConnController SqlDataImport sqlDataImport = new SqlDataImport(connectionFactory, dataImportForm.getImportOption(), readerFactory); - MessageSubDataImportListener listener = new MessageSubDataImportListener(this.messageChannel, importChannelId, - getMessageSource(), locale, subDataExchangeIds[i], + MessageSubDataImportListener listener = new MessageSubDataImportListener(this.messageChannel, + dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i], dataImportForm.getImportOption().getExceptionResolve()); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( @@ -428,7 +421,7 @@ public class DataExchangeController extends AbstractSchemaConnController Collections.addAll(subDataExchangeSet, subDataExchanges); BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet, - importChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -461,7 +454,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames()); springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name()); @@ -526,8 +518,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String importChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length]; @@ -542,8 +532,8 @@ public class DataExchangeController extends AbstractSchemaConnController importForm.getImportOption(), (tableNames == null ? null : tableNames[i]), readerFactory); MessageSubTextValueDataImportListener listener = new MessageSubTextValueDataImportListener( - this.messageChannel, importChannelId, getMessageSource(), locale, - subDataExchangeIds[i], jsonDataImport.getImportOption().getExceptionResolve()); + this.messageChannel, dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i], + jsonDataImport.getImportOption().getExceptionResolve()); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( evalSendDataExchangingMessageInterval(subDataExchangeIds.length, jsonDataImport)); @@ -574,7 +564,7 @@ public class DataExchangeController extends AbstractSchemaConnController Collections.addAll(subDataExchangeSet, subDataExchanges); BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet, - importChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -606,7 +596,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); return "/dataexchange/import_excel"; } @@ -659,8 +648,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String importChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); SubDataExchange[] subDataExchanges = new SubDataExchange[subDataExchangeIds.length]; @@ -674,8 +661,8 @@ public class DataExchangeController extends AbstractSchemaConnController excelDataImport.setUnifiedTable(tableNames[i]); MessageSubTextValueDataImportListener listener = new MessageSubTextValueDataImportListener( - this.messageChannel, importChannelId, getMessageSource(), locale, - subDataExchangeIds[i], excelDataImport.getImportOption().getExceptionResolve()); + this.messageChannel, dataExchangeId, getMessageSource(), locale, subDataExchangeIds[i], + excelDataImport.getImportOption().getExceptionResolve()); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( evalSendDataExchangingMessageInterval(subDataExchangeIds.length, excelDataImport)); @@ -703,7 +690,7 @@ public class DataExchangeController extends AbstractSchemaConnController Collections.addAll(subDataExchangeSet, subDataExchanges); BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchangeSet, - importChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -874,7 +861,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames()); springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name()); setParamInitSqlsAttribute(request, springModel); @@ -915,8 +901,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String exportChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); Set subDataExchanges = new HashSet<>(); @@ -932,9 +916,8 @@ public class DataExchangeController extends AbstractSchemaConnController CsvDataExport csvDataExport = new CsvDataExport(connectionFactory, exportForm.getDataFormat(), exportForm.getExportOption(), query, writerFactory); - MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener( - this.messageChannel, exportChannelId, getMessageSource(), getLocale(request), - subDataExchangeIds[i]); + MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel, + dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( evalSendDataExchangingMessageInterval(subDataExchangeIds.length, csvDataExport)); @@ -945,7 +928,7 @@ public class DataExchangeController extends AbstractSchemaConnController } BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges, - exportChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -982,7 +965,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames()); springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name()); setParamInitSqlsAttribute(request, springModel); @@ -1022,8 +1004,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String exportChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); Set subDataExchanges = new HashSet<>(); @@ -1038,9 +1018,8 @@ public class DataExchangeController extends AbstractSchemaConnController ExcelDataExport excelDataExport = new ExcelDataExport(connectionFactory, exportForm.getDataFormat(), exportForm.getExportOption(), query, writerFactory); - MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener( - this.messageChannel, exportChannelId, getMessageSource(), getLocale(request), - subDataExchangeIds[i]); + MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel, + dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( evalSendDataExchangingMessageInterval(subDataExchangeIds.length, excelDataExport)); @@ -1051,7 +1030,7 @@ public class DataExchangeController extends AbstractSchemaConnController } BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges, - exportChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -1093,7 +1072,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames()); springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name()); setParamInitSqlsAttribute(request, springModel); @@ -1137,8 +1115,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String exportChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); Set subDataExchanges = new HashSet<>(); @@ -1154,9 +1130,8 @@ public class DataExchangeController extends AbstractSchemaConnController SqlDataExport sqlDataExport = new SqlDataExport(connectionFactory, exportForm.getDataFormat(), exportForm.getExportOption(), query, tableNames[i], writerFactory); - MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener( - this.messageChannel, exportChannelId, getMessageSource(), getLocale(request), - subDataExchangeIds[i]); + MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel, + dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( evalSendDataExchangingMessageInterval(subDataExchangeIds.length, sqlDataExport)); @@ -1167,7 +1142,7 @@ public class DataExchangeController extends AbstractSchemaConnController } BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges, - exportChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -1205,7 +1180,6 @@ public class DataExchangeController extends AbstractSchemaConnController springModel.addAttribute("defaultDataFormat", defaultDataFormat); springModel.addAttribute("dataExchangeId", dataExchangeId); - springModel.addAttribute("dataExchangeChannelId", getDataExchangeChannelId(dataExchangeId)); springModel.addAttribute("availableCharsetNames", getAvailableCharsetNames()); springModel.addAttribute("defaultCharsetName", Charset.defaultCharset().name()); setParamInitSqlsAttribute(request, springModel); @@ -1258,8 +1232,6 @@ public class DataExchangeController extends AbstractSchemaConnController ConnectionFactory connectionFactory = new DataSourceConnectionFactory(new SchemaDataSource(schema)); - String exportChannelId = getDataExchangeChannelId(dataExchangeId); - Locale locale = getLocale(request); Set subDataExchanges = new HashSet<>(); @@ -1275,9 +1247,8 @@ public class DataExchangeController extends AbstractSchemaConnController JsonDataExport csvDataExport = new JsonDataExport(connectionFactory, exportForm.getDataFormat(), exportOption, query, writerFactory, (tableNames == null ? null : tableNames[i])); - MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener( - this.messageChannel, exportChannelId, getMessageSource(), getLocale(request), - subDataExchangeIds[i]); + MessageSubTextDataExportListener listener = new MessageSubTextDataExportListener(this.messageChannel, + dataExchangeId, getMessageSource(), getLocale(request), subDataExchangeIds[i]); listener.setLogFile(getTempSubDataExchangeLogFile(logDirectory, subDataExchangeIds[i])); listener.setSendExchangingMessageInterval( evalSendDataExchangingMessageInterval(subDataExchangeIds.length, csvDataExport)); @@ -1288,7 +1259,7 @@ public class DataExchangeController extends AbstractSchemaConnController } BatchDataExchange batchDataExchange = buildBatchDataExchange(connectionFactory, subDataExchanges, - exportChannelId, locale); + dataExchangeId, locale); this.dataExchangeService.exchange(batchDataExchange); @@ -1353,6 +1324,21 @@ public class DataExchangeController extends AbstractSchemaConnController } } + @RequestMapping(value = "/{schemaId}/message", produces = CONTENT_TYPE_JSON) + @ResponseBody + public List message(HttpServletRequest request, HttpServletResponse response, + org.springframework.ui.Model springModel, @PathVariable("schemaId") String schemaId, + @RequestParam("dataExchangeId") String dataExchangeId, + @RequestParam(value = "messageCount", required = false) Integer messageCount) throws Throwable + { + if (messageCount == null) + messageCount = 50; + if (messageCount < 1) + messageCount = 1; + + return this.messageChannel.pull(dataExchangeId, messageCount); + } + protected String[] setParamInitSqlsAttribute(HttpServletRequest request, org.springframework.ui.Model springModel) { String[] initSqls = request.getParameterValues("initSqls"); @@ -1530,8 +1516,8 @@ public class DataExchangeController extends AbstractSchemaConnController { BatchDataExchange batchDataExchange = new SimpleBatchDataExchange(connectionFactory, subDataExchanges); - MessageBatchDataExchangeListener listener = new MessageBatchDataExchangeListener(this.messageChannel, - channel, getMessageSource(), locale); + MessageBatchDataExchangeListener listener = new MessageBatchDataExchangeListener(this.messageChannel, channel, + getMessageSource(), locale); batchDataExchange.setListener(listener); return batchDataExchange; @@ -1637,7 +1623,7 @@ public class DataExchangeController extends AbstractSchemaConnController /** * 计算导入/导出中消息发送间隔。 *

- * 如果发送频率过快,当数据交换很多时会出现cometd卡死的情况。 + * 如果发送频率过快,当数据交换很多时会出现卡死的情况。 *

* * @param total @@ -1782,17 +1768,6 @@ public class DataExchangeController extends AbstractSchemaConnController return FileUtil.getDirectory(this.tempDirectory, "dataExchange", true); } - /** - * 获取指定数据交换操作ID对应的cometd通道ID。 - * - * @param dataExchangeId - * @return - */ - protected String getDataExchangeChannelId(String dataExchangeId) - { - return "/dataexchange/channel/" + dataExchangeId; - } - public static class DataImportFileInfo extends FileInfo { private static final long serialVersionUID = 1L; diff --git a/datagear-web/src/main/java/org/datagear/web/controller/SqlpadController.java b/datagear-web/src/main/java/org/datagear/web/controller/SqlpadController.java index 093b88d1..59a1e630 100644 --- a/datagear-web/src/main/java/org/datagear/web/controller/SqlpadController.java +++ b/datagear-web/src/main/java/org/datagear/web/controller/SqlpadController.java @@ -37,7 +37,6 @@ import org.datagear.web.sqlpad.SqlpadExecutionService.CommitMode; import org.datagear.web.sqlpad.SqlpadExecutionService.ExceptionHandleMode; import org.datagear.web.sqlpad.SqlpadExecutionService.SqlCommand; import org.datagear.web.sqlpad.SqlpadExecutionSubmit; -import org.datagear.web.util.MessageChannel; import org.datagear.web.util.OperationMessage; import org.datagear.web.util.WebUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -65,9 +64,6 @@ public class SqlpadController extends AbstractSchemaConnController @Autowired private SqlSelectManager sqlSelectManager; - @Autowired - private MessageChannel messageChannel; - @Autowired private SqlpadExecutionService sqlpadExecutionService; @@ -94,16 +90,6 @@ public class SqlpadController extends AbstractSchemaConnController this.sqlSelectManager = sqlSelectManager; } - public MessageChannel getMessageChannel() - { - return messageChannel; - } - - public void setMessageChannel(MessageChannel messageChannel) - { - this.messageChannel = messageChannel; - } - public SqlpadExecutionService getSqlpadExecutionService() { return sqlpadExecutionService; @@ -165,10 +151,8 @@ public class SqlpadController extends AbstractSchemaConnController initSql = ""; String sqlpadId = generateSqlpadId(request, response); - String sqlpadChannelId = this.sqlpadExecutionService.getSqlpadChannelId(sqlpadId); springModel.addAttribute("sqlpadId", sqlpadId); - springModel.addAttribute("sqlpadChannelId", sqlpadChannelId); springModel.addAttribute("sqlResultRowMapper", buildDefaultLOBRowMapper()); springModel.addAttribute("initSql", initSql); @@ -208,13 +192,6 @@ public class SqlpadController extends AbstractSchemaConnController if (exceptionHandleMode == null) exceptionHandleMode = ExceptionHandleMode.ABORT; - if (overTimeThreashold == null) - overTimeThreashold = 10; - else if (overTimeThreashold < 1) - overTimeThreashold = 1; - else if (overTimeThreashold > 60) - overTimeThreashold = 60; - if (resultsetFetchSize == null) resultsetFetchSize = DEFAULT_SQL_RESULTSET_FETCH_SIZE; @@ -245,9 +222,15 @@ public class SqlpadController extends AbstractSchemaConnController @ResponseBody public List message(HttpServletRequest request, HttpServletResponse response, org.springframework.ui.Model springModel, @PathVariable("schemaId") String schemaId, - @RequestParam("sqlpadChannelId") String sqlpadChannelId) throws Throwable + @RequestParam("sqlpadId") String sqlpadId, + @RequestParam(value = "messageCount", required = false) Integer messageCount) throws Throwable { - return this.messageChannel.pull(sqlpadChannelId, 10); + if (messageCount == null) + messageCount = 50; + if (messageCount < 1) + messageCount = 1; + + return this.sqlpadExecutionService.message(sqlpadId, messageCount); } @RequestMapping(value = "/{schemaId}/select", produces = CONTENT_TYPE_JSON) diff --git a/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionService.java b/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionService.java index f2a3d619..c70e9f3a 100644 --- a/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionService.java +++ b/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionService.java @@ -47,8 +47,6 @@ public class SqlpadExecutionService extends PersistenceSupport private MessageSource messageSource; - private MessageChannel messageChannel; - private SqlHistoryService sqlHistoryService; private SqlSelectManager sqlSelectManager; @@ -57,6 +55,9 @@ public class SqlpadExecutionService extends PersistenceSupport private SchemaConnectionSupport schemaConnectionSupport = new SchemaConnectionSupport(); + private MessageChannel _messageChannel = new MessageChannel( + SqlpadExecutionSubmit.MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES * 60); + private ExecutorService _executorService = Executors.newCachedThreadPool(); private ConcurrentMap _sqlpadExecutionRunnableMap = new ConcurrentHashMap<>(); @@ -67,13 +68,11 @@ public class SqlpadExecutionService extends PersistenceSupport } public SqlpadExecutionService(ConnectionSource connectionSource, MessageSource messageSource, - MessageChannel messageChannel, SqlHistoryService sqlHistoryService, - SqlSelectManager sqlSelectManager) + SqlHistoryService sqlHistoryService, SqlSelectManager sqlSelectManager) { super(); this.connectionSource = connectionSource; this.messageSource = messageSource; - this.messageChannel = messageChannel; this.sqlHistoryService = sqlHistoryService; this.sqlSelectManager = sqlSelectManager; } @@ -98,14 +97,9 @@ public class SqlpadExecutionService extends PersistenceSupport this.messageSource = messageSource; } - public MessageChannel getMessageChannel() + protected MessageChannel getMessageChannel() { - return messageChannel; - } - - public void setMessageChannel(MessageChannel messageChannel) - { - this.messageChannel = messageChannel; + return this._messageChannel; } public SqlHistoryService getSqlHistoryService() @@ -156,9 +150,7 @@ public class SqlpadExecutionService extends PersistenceSupport */ public boolean submit(SqlpadExecutionSubmit submit) { - String sqlpadChannelId = getSqlpadChannelId(submit.getSqlpadId()); - - SqlpadExecutionRunnable sqlpadExecutionRunnable = new SqlpadExecutionRunnable(submit, sqlpadChannelId); + SqlpadExecutionRunnable sqlpadExecutionRunnable = new SqlpadExecutionRunnable(submit); SqlpadExecutionRunnable old = this._sqlpadExecutionRunnableMap.putIfAbsent(submit.getSqlpadId(), sqlpadExecutionRunnable); @@ -190,6 +182,19 @@ public class SqlpadExecutionService extends PersistenceSupport return true; } + /** + * 获取反馈消息。 + * + * @param + * @param sqlpadId + * @param count + * @return + */ + public List message(String sqlpadId, int count) + { + return this._messageChannel.pull(sqlpadId, count); + } + /** * 关闭。 */ @@ -198,17 +203,6 @@ public class SqlpadExecutionService extends PersistenceSupport this._executorService.shutdown(); } - /** - * 获取指定SQL工作台ID对应的cometd通道ID。 - * - * @param sqlpadId - * @return - */ - public String getSqlpadChannelId(String sqlpadId) - { - return "/sqlpad/channel/" + sqlpadId; - } - /** * 获取指定{@linkplain Schema}的{@linkplain Connection}。 * @@ -229,7 +223,7 @@ public class SqlpadExecutionService extends PersistenceSupport */ protected void sendStartMessage(String channel, int sqlCount) { - this.messageChannel.push(channel, new StartMessageData(sqlCount)); + this._messageChannel.push(channel, new StartMessageData(sqlCount)); } /** @@ -241,7 +235,7 @@ public class SqlpadExecutionService extends PersistenceSupport */ protected void sendSqlSuccessMessage(String channel, SqlStatement sqlStatement, int sqlStatementIndex) { - this.messageChannel.push(channel, + this._messageChannel.push(channel, new SqlSuccessMessageData(sqlStatement, sqlStatementIndex, SqlResultType.NONE)); } @@ -260,7 +254,7 @@ public class SqlpadExecutionService extends PersistenceSupport SqlResultType.UPDATE_COUNT); sqlSuccessMessageData.setUpdateCount(updateCount); - this.messageChannel.push(channel, sqlSuccessMessageData); + this._messageChannel.push(channel, sqlSuccessMessageData); } /** @@ -278,7 +272,7 @@ public class SqlpadExecutionService extends PersistenceSupport SqlResultType.RESULT_SET); sqlSuccessMessageData.setSqlSelectResult(sqlSelectResult); - this.messageChannel.push(channel, sqlSuccessMessageData); + this._messageChannel.push(channel, sqlSuccessMessageData); } /** @@ -295,7 +289,7 @@ public class SqlpadExecutionService extends PersistenceSupport { SQLExceptionMessageData messageData = new SQLExceptionMessageData(sqlStatement, sqlStatementIndex, content); - this.messageChannel.push(channel, messageData); + this._messageChannel.push(channel, messageData); } /** @@ -311,7 +305,7 @@ public class SqlpadExecutionService extends PersistenceSupport { SQLExceptionMessageData messageData = new SQLExceptionMessageData(sqlStatement, sqlStatementIndex, content); - this.messageChannel.push(channel, messageData); + this._messageChannel.push(channel, messageData); } /** @@ -328,7 +322,7 @@ public class SqlpadExecutionService extends PersistenceSupport if (trace) messageData.setDetailTrace(t); - this.messageChannel.push(channel, messageData); + this._messageChannel.push(channel, messageData); } /** @@ -340,7 +334,7 @@ public class SqlpadExecutionService extends PersistenceSupport protected void sendExceptionMessage(String channel, String content) { ExceptionMessageData messageData = new ExceptionMessageData(content); - this.messageChannel.push(channel, messageData); + this._messageChannel.push(channel, messageData); } /** @@ -352,7 +346,7 @@ public class SqlpadExecutionService extends PersistenceSupport */ protected void sendSqlCommandMessage(String channel, SqlCommand sqlCommand, String content) { - this.messageChannel.push(channel, new SqlCommandMessageData(sqlCommand, content)); + this._messageChannel.push(channel, new SqlCommandMessageData(sqlCommand, content)); } /** @@ -369,7 +363,7 @@ public class SqlpadExecutionService extends PersistenceSupport SqlCommandMessageData sqlCommandMessageData = new SqlCommandMessageData(sqlCommand, content); sqlCommandMessageData.setSqlExecutionStat(sqlExecutionStat); - this.messageChannel.push(channel, sqlCommandMessageData); + this._messageChannel.push(channel, sqlCommandMessageData); } /** @@ -380,7 +374,7 @@ public class SqlpadExecutionService extends PersistenceSupport */ protected void sendTextMessage(String channel, String text) { - this.messageChannel.push(channel, new TextMessageData(text)); + this._messageChannel.push(channel, new TextMessageData(text)); } /** @@ -395,7 +389,7 @@ public class SqlpadExecutionService extends PersistenceSupport TextMessageData textMessageData = new TextMessageData(text); textMessageData.setCssClass(cssClass); - this.messageChannel.push(channel, textMessageData); + this._messageChannel.push(channel, textMessageData); } /** @@ -412,7 +406,7 @@ public class SqlpadExecutionService extends PersistenceSupport textMessageData.setCssClass(cssClass); textMessageData.setSqlExecutionStat(sqlExecutionStat); - this.messageChannel.push(channel, textMessageData); + this._messageChannel.push(channel, textMessageData); } /** @@ -425,7 +419,7 @@ public class SqlpadExecutionService extends PersistenceSupport */ protected void sendFinishMessage(String channel) { - this.messageChannel.push(channel, new FinishMessageData()); + this._messageChannel.push(channel, new FinishMessageData()); } /** @@ -442,7 +436,7 @@ public class SqlpadExecutionService extends PersistenceSupport FinishMessageData finishMessageData = new FinishMessageData(); finishMessageData.setSqlExecutionStat(sqlExecutionStat); - this.messageChannel.push(channel, finishMessageData); + this._messageChannel.push(channel, finishMessageData); } /** @@ -476,8 +470,6 @@ public class SqlpadExecutionService extends PersistenceSupport */ protected class SqlpadExecutionRunnable extends SqlpadExecutionSubmit implements Runnable { - private String sqlpadChannelId; - /** 发送给此Runnable的SQL命令 */ private volatile SqlCommand sqlCommand; @@ -486,20 +478,9 @@ public class SqlpadExecutionService extends PersistenceSupport super(); } - public SqlpadExecutionRunnable(SqlpadExecutionSubmit submit, String sqlpadChannelId) + public SqlpadExecutionRunnable(SqlpadExecutionSubmit submit) { super(submit); - this.sqlpadChannelId = sqlpadChannelId; - } - - public String getSqlpadChannelId() - { - return sqlpadChannelId; - } - - public void setSqlpadChannelId(String sqlpadChannelId) - { - this.sqlpadChannelId = sqlpadChannelId; } public SqlCommand getSqlCommand() @@ -518,7 +499,7 @@ public class SqlpadExecutionService extends PersistenceSupport Connection cn = null; Statement st = null; - sendStartMessage(this.sqlpadChannelId, getSqlStatements().size()); + sendStartMessage(getSqlpadId(), getSqlStatements().size()); try { @@ -529,10 +510,10 @@ public class SqlpadExecutionService extends PersistenceSupport } catch (Throwable t) { - sendExceptionMessage(sqlpadChannelId, t, - getMessage(getLocale(), "sqlpad.executionConnectionException"), false); + sendExceptionMessage(getSqlpadId(), t, getMessage(getLocale(), "sqlpad.executionConnectionException"), + false); - sendFinishMessage(this.sqlpadChannelId); + sendFinishMessage(getSqlpadId()); _sqlpadExecutionRunnableMap.remove(getSqlpadId()); @@ -558,7 +539,7 @@ public class SqlpadExecutionService extends PersistenceSupport if (!SqlpadExecutionService.this.sqlPermissionChecker.hasPermission(getUser(), getSchema(), sqlStatement)) { - sendSqlExceptionMessage(sqlpadChannelId, sqlStatement, i, + sendSqlExceptionMessage(getSqlpadId(), sqlStatement, i, getMessage(getLocale(), "sqlpad.executionSQLPermissionDenied")); sqlExecutionStat.increaseExceptionCount(); @@ -576,7 +557,7 @@ public class SqlpadExecutionService extends PersistenceSupport { sqlExecutionStat.increaseExceptionCount(); - sendSqlExceptionMessage(sqlpadChannelId, sqlStatement, i, e, + sendSqlExceptionMessage(getSqlpadId(), sqlStatement, i, e, getMessage(getLocale(), "sqlpad.executionSQLException", e.getMessage())); if (ExceptionHandleMode.IGNORE.equals(getExceptionHandleMode())) @@ -607,8 +588,7 @@ public class SqlpadExecutionService extends PersistenceSupport } catch (Throwable t) { - sendExceptionMessage(sqlpadChannelId, t, - getMessage(getLocale(), "sqlpad.executionErrorOccure"), true); + sendExceptionMessage(getSqlpadId(), t, getMessage(getLocale(), "sqlpad.executionErrorOccure"), true); } finally { @@ -617,7 +597,7 @@ public class SqlpadExecutionService extends PersistenceSupport sqlExecutionStat.setTaskDuration(System.currentTimeMillis() - startTime); - sendFinishMessage(this.sqlpadChannelId, sqlExecutionStat); + sendFinishMessage(getSqlpadId(), sqlExecutionStat); _sqlpadExecutionRunnableMap.remove(getSqlpadId()); } @@ -660,8 +640,7 @@ public class SqlpadExecutionService extends PersistenceSupport // 暂停超时 if (SqlCommand.PAUSE.equals(this.sqlCommand)) { - sendTextMessage(this.sqlpadChannelId, - getMessage(getLocale(), "sqlpad.pauseOverTime")); + sendTextMessage(getSqlpadId(), getMessage(getLocale(), "sqlpad.pauseOverTime")); this.sqlCommand = SqlCommand.RESUME; } @@ -732,7 +711,7 @@ public class SqlpadExecutionService extends PersistenceSupport { if (!sendWatingMessage) { - sendTextMessage(this.sqlpadChannelId, + sendTextMessage(getSqlpadId(), getMessage(getLocale(), "sqlpad.waitingForCommitOrRollback", getOverTimeThreashold()), "message-content-highlight", sqlExecutionStat); @@ -745,8 +724,7 @@ public class SqlpadExecutionService extends PersistenceSupport // 等待超时 if (!SqlCommand.COMMIT.equals(this.sqlCommand) && !SqlCommand.ROLLBACK.equals(this.sqlCommand)) { - sendTextMessage(this.sqlpadChannelId, - getMessage(getLocale(), "sqlpad.waitOverTime")); + sendTextMessage(getSqlpadId(), getMessage(getLocale(), "sqlpad.waitOverTime")); this.sqlCommand = (sqlExecutionStat.getExceptionCount() > 0 ? SqlCommand.ROLLBACK : SqlCommand.COMMIT); } @@ -800,8 +778,7 @@ public class SqlpadExecutionService extends PersistenceSupport SqlSelectResult sqlSelectResult = SqlpadExecutionService.this.sqlSelectManager.select(cn, sql, rs, 1, getResultsetFetchSize(), getResultsetRowMapper()); - sendSqlSuccessMessage(this.sqlpadChannelId, sqlStatement, - sqlStatementIndex, sqlSelectResult); + sendSqlSuccessMessage(getSqlpadId(), sqlStatement, sqlStatementIndex, sqlSelectResult); } else { @@ -810,14 +787,12 @@ public class SqlpadExecutionService extends PersistenceSupport // 更新操作 if (updateCount > -1) { - sendSqlSuccessMessage(this.sqlpadChannelId, sqlStatement, - sqlStatementIndex, updateCount); + sendSqlSuccessMessage(getSqlpadId(), sqlStatement, sqlStatementIndex, updateCount); } // 其他操作 else { - sendSqlSuccessMessage(this.sqlpadChannelId, sqlStatement, - sqlStatementIndex); + sendSqlSuccessMessage(getSqlpadId(), sqlStatement, sqlStatementIndex); } } } @@ -832,7 +807,7 @@ public class SqlpadExecutionService extends PersistenceSupport { String messageKey = "sqlpad.SqlCommand." + sqlCommand.toString() + ".ok"; - SqlpadExecutionService.this.sendSqlCommandMessage(this.sqlpadChannelId, sqlCommand, + SqlpadExecutionService.this.sendSqlCommandMessage(getSqlpadId(), sqlCommand, getMessage(getLocale(), messageKey, messageArgs)); } diff --git a/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionSubmit.java b/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionSubmit.java index 91c48df3..dc4d9ec5 100644 --- a/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionSubmit.java +++ b/datagear-web/src/main/java/org/datagear/web/sqlpad/SqlpadExecutionSubmit.java @@ -26,6 +26,9 @@ import org.datagear.web.sqlpad.SqlpadExecutionService.ExceptionHandleMode; */ public class SqlpadExecutionSubmit { + /** 暂停允许最大分钟数 */ + public static final int MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES = 60; + private User user; private Schema schema; @@ -40,6 +43,7 @@ public class SqlpadExecutionSubmit private ExceptionHandleMode exceptionHandleMode; + /** 暂停超时过期分钟数 */ private int overTimeThreashold; private int resultsetFetchSize; @@ -61,7 +65,7 @@ public class SqlpadExecutionSubmit public SqlpadExecutionSubmit(User user, Schema schema, String sqlpadId, File sqlpadFileDirectory, List sqlStatements, CommitMode commitMode, ExceptionHandleMode exceptionHandleMode, - int overTimeThreashold, int resultsetFetchSize, RowMapper resultsetRowMapper, Locale locale) + Integer overTimeThreashold, int resultsetFetchSize, RowMapper resultsetRowMapper, Locale locale) { super(); this.user = user; @@ -71,7 +75,7 @@ public class SqlpadExecutionSubmit this.sqlStatements = sqlStatements; this.commitMode = commitMode; this.exceptionHandleMode = exceptionHandleMode; - this.overTimeThreashold = overTimeThreashold; + setOverTimeThreashold(overTimeThreashold); this.resultsetFetchSize = resultsetFetchSize; this.resultsetRowMapper = resultsetRowMapper; this.locale = locale; @@ -152,8 +156,15 @@ public class SqlpadExecutionSubmit return overTimeThreashold; } - public void setOverTimeThreashold(int overTimeThreashold) + public void setOverTimeThreashold(Integer overTimeThreashold) { + if (overTimeThreashold == null) + overTimeThreashold = 10; + else if (overTimeThreashold < 1) + overTimeThreashold = 1; + else if (overTimeThreashold > MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES) + overTimeThreashold = MAX_PAUSE_OVER_TIME_THREASHOLD_MINUTES; + this.overTimeThreashold = overTimeThreashold; } diff --git a/datagear-web/src/main/resources/org/datagear/web/static/script/datagear-util.js b/datagear-web/src/main/resources/org/datagear/web/static/script/datagear-util.js index 079d9bc4..6b0f8e55 100644 --- a/datagear-web/src/main/resources/org/datagear/web/static/script/datagear-util.js +++ b/datagear-web/src/main/resources/org/datagear/web/static/script/datagear-util.js @@ -3444,71 +3444,161 @@ { this.url = url; this.messageHandler = messageHandler; + this._status = ""; this.options = $.extend( { - interval: 50, - data: undefined + //轮询间隔 + interval: 100, + //挂起状态时的轮询间隔 + suspendInterval: 3000, + //当连续接收空消息这些秒数后,自动进入挂起状态,-1 表示不自动挂起 + autoSuspendExpireSeconds: 10, + //自动挂起状态时的轮询间隔 + autoSuspendInterval: 1500, + //ajax设置项 + ajaxOptions: {} }, options); }; $.TaskClient.prototype = { - //开始接收消息 + //开始轮询接收消息 start: function() { - if(this._status == "run") + if(this.isActive()) return false; - this._status = "run"; - + this._status = "active.run"; this._receiveAndHandleMessage(); + + return true; }, - isStart: function() + //挂起,进入慢轮询状态 + suspend: function() { - return (this._status == "run"); + if(!this.isActive()) + return false; + + this._status = "active.suspend"; + + return true; }, - isFinish: function() + //唤醒,从慢轮询状态恢复 + resume: function() { - return (this._status == "finish"); + if(!this.isSuspend()) + return false; + + this.stop(); + this.start(); + + return true; + }, + + //停止轮询接收消息,停止后可重新start + stop: function() + { + if(!this.isActive()) + return false; + + this._status = "stop"; + if(this._timeoutId) + { + clearTimeout(this._timeoutId); + this._timeoutId = ""; + } + + return true; + }, + + isActive: function() + { + return (this._status && this._status.indexOf("active") == 0); + }, + + isSuspend: function() + { + return (this._status == "active.suspend"); }, _receiveAndHandleMessage: function() { - if(this._status == "finish") + if(!this.isActive()) return; var taskClient = this; - $.ajax({ - type : "POST", - url : this.url, - data : this.options.data, - success : function(messages) - { - if(messages == null) - messages = []; - else if(!$.isArray(messages)) - messages = [ messages ]; - - var isFinish = false; - - for(var i=0; i -1) + { + if(messages.length > 0) + { + autoSuspend = false; + taskClient._firstEmptyTime = null; + } + else + { + if(taskClient._firstEmptyTime + && (new Date().getTime() - taskClient._firstEmptyTime) + >= taskClient.options.autoSuspendExpireSeconds*1000) + { + autoSuspend = true; + } + + if(!taskClient._prevMessagesEmpty) + taskClient._firstEmptyTime = new Date().getTime(); + } + + taskClient._prevMessagesEmpty = (messages.length == 0); + } + + if(taskClient.isActive()) + { + var interval = (taskClient.isSuspend() || autoSuspend ? + taskClient.options.suspendInterval : taskClient.options.interval); + + if(autoSuspend) + interval = taskClient.options.autoSuspendInterval; + + taskClient._timeoutId = setTimeout(function() + { + taskClient._receiveAndHandleMessage(); + }, + interval); + } + } + }); + + $.ajax(ajaxOptions); } }; }) diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_csv.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_csv.ftl index e04a0b48..de24e27b 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_csv.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_csv.ftl @@ -100,7 +100,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataExport_js.ftl" > @@ -113,7 +112,6 @@ Schema schema 数据库,不允许为null return po.toExportFileNameSuper(query, ".csv"); }; - po.cometdInitIfNot(); po.initDataExportSteps(); po.initDataExchangeUIs(); po.initDataExportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_excel.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_excel.ftl index a2fc236b..216ee2e6 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_excel.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_excel.ftl @@ -102,7 +102,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataExport_js.ftl" > @@ -115,7 +114,6 @@ Schema schema 数据库,不允许为null return po.toExportFileNameSuper(query, ".xlsx"); }; - po.cometdInitIfNot(); po.initDataExportSteps(); po.initDataExchangeUIs(); po.initDataExportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_json.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_json.ftl index f49e65d4..da2b1428 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_json.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_json.ftl @@ -111,7 +111,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataExport_js.ftl" > @@ -158,7 +157,6 @@ Schema schema 数据库,不允许为null width : "20%" }); - po.cometdInitIfNot(); po.initDataExportSteps(); po.initDataExchangeUIs(); po.initDataExportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_sql.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_sql.ftl index 050d6bb0..8eae6b25 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_sql.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/export_sql.ftl @@ -111,7 +111,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataExport_js.ftl" > @@ -148,7 +147,6 @@ Schema schema 数据库,不允许为null }); po.dataExportTableColumns[0].width = "30%"; - po.cometdInitIfNot(); po.initDataExportSteps(); po.initDataExchangeUIs(); po.initDataExportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_csv.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_csv.ftl index 1cadb262..ae593aaf 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_csv.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_csv.ftl @@ -121,7 +121,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataImport_js.ftl" > @@ -157,7 +156,6 @@ Schema schema 数据库,不允许为null }); po.dataImportTableColumns[1].width = "25%"; - po.cometdInitIfNot(); po.initDataImportSteps(); po.initDataExchangeUIs(); po.initDataImportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_excel.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_excel.ftl index 8e915120..9b7ee848 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_excel.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_excel.ftl @@ -123,7 +123,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataImport_js.ftl" > @@ -159,7 +158,6 @@ Schema schema 数据库,不允许为null }); po.dataImportTableColumns[1].width = "25%"; - po.cometdInitIfNot(); po.initDataImportSteps(); po.initDataExchangeUIs(); po.initDataImportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_json.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_json.ftl index e1bbe650..9d83158f 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_json.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_json.ftl @@ -132,7 +132,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataImport_js.ftl" > @@ -202,7 +201,6 @@ Schema schema 数据库,不允许为null }); po.dataImportTableColumns[2].width = "auto"; - po.cometdInitIfNot(); po.initDataImportSteps(); po.initDataExchangeUIs(); po.initDataImportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_sql.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_sql.ftl index ee1220e3..1f365c5b 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_sql.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/import_sql.ftl @@ -97,7 +97,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_grid.ftl"> -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "include/dataExchange_js.ftl" > <#include "include/dataImport_js.ftl" > @@ -106,7 +105,6 @@ Schema schema 数据库,不允许为null { po.dependentNumberInputPlaceholder = "<@spring.message code='dataImport.dependentNumber.none' />"; - po.cometdInitIfNot(); po.initDataImportSteps(); po.initDataExchangeUIs(); po.initDataImportUIs(); diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExchange_js.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExchange_js.ftl index 48191278..f0e7ef0b 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExchange_js.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExchange_js.ftl @@ -50,6 +50,16 @@ po.subDataExchangeStatusColumnIndex 子数据交换表格中状态列索引 return po.nextSubDataExchangeIdSeq; }; + + po.dataExchangeTaskClient = new $.TaskClient("${contextPath}/dataexchange/"+po.schemaId+"/message", + function(message) + { + return po.handleDataExchangeMessage(message); + }, + { + data: { dataExchangeId: po.dataExchangeId } + } + ); po.calTableHeight = function() { @@ -228,9 +238,9 @@ po.subDataExchangeStatusColumnIndex 子数据交换表格中状态列索引 return true; }; - po.handleDataExchangeCometdMessage = function(message) + po.handleDataExchangeMessage = function(message) { - message = message.data; + var isFinish = false; var type = (message ? message.type : ""); if("Start" == type) @@ -251,15 +261,18 @@ po.subDataExchangeStatusColumnIndex 子数据交换表格中状态列索引 } else if("Finish" == type) { + isFinish = true; po.refreshSubDataExchangeStatus(); po.setDataExchangeProgress(100, message.duration); po.updateDataExchangePageStatus("finish"); } else - po.handleSubDataExchangeCometdMessage(message); + po.handleSubDataExchangeMessage(message); + + return isFinish; }; - po.handleSubDataExchangeCometdMessage = function(message) + po.handleSubDataExchangeMessage = function(message) { var subDataExchangeId = message.subDataExchangeId; diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExport_js.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExport_js.ftl index 9e99e9f6..04a4de76 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExport_js.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/dataexchange/include/dataExport_js.ftl @@ -2,7 +2,6 @@ 导出公用片段 String dataExchangeId 数据交换ID -String dataExchangeChannelId 数据交换cometd通道ID 依赖: dataExchange_js.ftl @@ -11,8 +10,6 @@ dataExchange_js.ftl diff --git a/datagear-web/src/main/resources/org/datagear/web/templates/sqlpad/sqlpad.ftl b/datagear-web/src/main/resources/org/datagear/web/templates/sqlpad/sqlpad.ftl index 29d847cc..904b5a24 100644 --- a/datagear-web/src/main/resources/org/datagear/web/templates/sqlpad/sqlpad.ftl +++ b/datagear-web/src/main/resources/org/datagear/web/templates/sqlpad/sqlpad.ftl @@ -170,7 +170,6 @@ Schema schema 数据库,不允许为null <#include "../include/page_js_obj.ftl"> <#include "../include/page_obj_tabs.ftl" > -<#include "../include/page_obj_cometd.ftl"> <#include "../include/page_obj_format_time.ftl" > <#include "../include/page_obj_data_permission.ftl"> <#include "../include/page_obj_data_permission_ds_table.ftl"> @@ -180,7 +179,6 @@ Schema schema 数据库,不允许为null { po.schemaId = "${schema.id}"; po.sqlpadId = "${sqlpadId}"; - po.sqlpadChannelId = "${sqlpadChannelId}"; po.sqlResultReadActualBinaryRows = parseInt("${sqlResultRowMapper.readActualBinaryRows}"); po.sqlResultBinaryPlaceholder = "${sqlResultRowMapper.binaryPlaceholder?js_string}"; @@ -197,7 +195,7 @@ Schema schema 数据库,不允许为null return po.handleMessage(message); }, { - data: { sqlpadChannelId: po.sqlpadChannelId } + data: { sqlpadId: po.sqlpadId } } ); @@ -309,6 +307,7 @@ Schema schema 数据库,不允许为null }, error : function() { + po.sqlpadTaskClient.stop(); po.updateExecuteSqlButtonState(po.element("#executeSqlButton"), "init"); } }); @@ -543,6 +542,12 @@ Schema schema 数据库,不允许为null po.resultMessageElement.scrollTop(po.resultMessageElement.prop("scrollHeight")); } + //如果在暂停,则应挂起(比如暂停时执行命令);否则,应唤醒(比如暂停超时) + if(po.isExecutionStatePaused()) + po.sqlpadTaskClient.suspend(); + else + po.sqlpadTaskClient.resume(); + return isFinish; }; @@ -890,6 +895,13 @@ Schema schema 数据库,不允许为null return resultsetFetchSize; }; + po.isExecutionStatePaused = function() + { + var executionState = po.element("#executeSqlButton").attr("execution-state"); + + return (executionState == "paused"); + }; + po.element("#executeSqlButton").click(function() { var $this = $(this); @@ -902,6 +914,7 @@ Schema schema 数据库,不允许为null } else if(executionState == "paused") { + po.sqlpadTaskClient.resume(); po.sendSqlCommand("RESUME", $this); } else @@ -923,6 +936,9 @@ Schema schema 数据库,不允许为null if(!sql) return; + if(po.sqlpadTaskClient.isActive()) + return; + var settingForm = po.element("#settingForm"); var commitMode = po.element("input[name='sqlCommitMode']:checked", settingForm).val(); @@ -941,21 +957,42 @@ Schema schema 数据库,不允许为null po.element("#stopSqlButton").click(function() { - po.sendSqlCommand("STOP", $(this)); + if(po.sqlpadTaskClient.isActive()) + { + //如果挂起,则应唤醒接收命令响应 + if(po.sqlpadTaskClient.isSuspend()) + po.sqlpadTaskClient.resume(); + + po.sendSqlCommand("STOP", $(this)); + } po.sqlEditor.focus(); }); - + po.element("#commitSqlButton").click(function() { - po.sendSqlCommand("COMMIT", $(this)); + if(po.sqlpadTaskClient.isActive()) + { + //如果挂起,则应唤醒接收命令响应 + if(po.sqlpadTaskClient.isSuspend()) + po.sqlpadTaskClient.resume(); + + po.sendSqlCommand("COMMIT", $(this)); + } po.sqlEditor.focus(); }); po.element("#rollbackSqlButton").click(function() { - po.sendSqlCommand("ROLLBACK", $(this)); + if(po.sqlpadTaskClient.isActive()) + { + //如果挂起,则应唤醒接收命令响应 + if(po.sqlpadTaskClient.isSuspend()) + po.sqlpadTaskClient.resume(); + + po.sendSqlCommand("ROLLBACK", $(this)); + } po.sqlEditor.focus(); }); diff --git a/pom.xml b/pom.xml index 6ccb12dd..fcc087c5 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,6 @@ 1.4 3.17 3.17 - 4.0.9 5.0.1 3.3.1 1.3.1