From 78d4977f9f3982679b6eec90c01232834cb5fd70 Mon Sep 17 00:00:00 2001 From: h45hc47 Date: Mon, 22 Jun 2026 16:09:53 +0300 Subject: [PATCH] feat(event): run native queue and event plugin simultaneously --- common/src/main/resources/reference.conf | 1 + .../common/logsfilter/EventPluginLoader.java | 64 +++++++++++++------ .../java/org/tron/core/config/args/Args.java | 21 +++--- .../services/event/HistoryEventService.java | 10 ++- framework/src/main/resources/config.conf | 2 +- .../core/event/EventPluginLoaderTest.java | 8 ++- 6 files changed, 73 insertions(+), 33 deletions(-) diff --git a/common/src/main/resources/reference.conf b/common/src/main/resources/reference.conf index 688e1590788..ae0dfda08a3 100644 --- a/common/src/main/resources/reference.conf +++ b/common/src/main/resources/reference.conf @@ -758,6 +758,7 @@ event.subscribe = { enable = false native = { + # start the native ZMQ queue; if `path` is also set, the event plugin runs alongside it useNativeQueue = true bindport = 5555 sendqueuelength = 1000 diff --git a/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java b/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java index 7061b2e9d57..92abc6d9b85 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java +++ b/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java @@ -78,6 +78,9 @@ public class EventPluginLoader { @Getter private boolean useNativeQueue = false; + @Getter + private boolean useEventPlugin = false; + public static EventPluginLoader getInstance() { if (Objects.isNull(instance)) { synchronized (EventPluginLoader.class) { @@ -234,12 +237,25 @@ public boolean start(EventPluginConfig config) { this.triggerConfigList = config.getTriggerConfigList(); useNativeQueue = config.isUseNativeQueue(); + useEventPlugin = StringUtils.hasText(config.getPluginPath()); + + if (!useNativeQueue && !useEventPlugin) { + logger.error("no event output configured: native queue is off and plugin path is empty"); + return false; + } + + // Launch the plugin first so eventListeners are populated before any topic is + // registered. launchNativeQueue() also iterates the trigger config and (when the + // plugin is active) calls setPluginTopic(), which needs eventListeners to exist. + if (useEventPlugin && !launchEventPlugin(config)) { + return false; + } - if (config.isUseNativeQueue()) { - return launchNativeQueue(config); + if (useNativeQueue && !launchNativeQueue(config)) { + return false; } - return launchEventPlugin(config); + return true; } private void setPluginConfig() { @@ -271,7 +287,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) { blockLogTriggerSolidified = false; } - if (!useNativeQueue) { + if (useEventPlugin) { setPluginTopic(Trigger.BLOCK_TRIGGER, triggerConfig.getTopic()); } @@ -291,7 +307,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) { transactionLogTriggerSolidified = false; } - if (!useNativeQueue) { + if (useEventPlugin) { setPluginTopic(Trigger.TRANSACTION_TRIGGER, triggerConfig.getTopic()); } @@ -303,7 +319,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) { contractEventTriggerEnable = false; } - if (!useNativeQueue) { + if (useEventPlugin) { setPluginTopic(Trigger.CONTRACTEVENT_TRIGGER, triggerConfig.getTopic()); } @@ -319,7 +335,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) { contractLogTriggerRedundancy = false; } - if (!useNativeQueue) { + if (useEventPlugin) { setPluginTopic(Trigger.CONTRACTLOG_TRIGGER, triggerConfig.getTopic()); } } else if (EventPluginConfig.SOLIDITY_TRIGGER_NAME @@ -329,7 +345,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) { } else { solidityTriggerEnable = false; } - if (!useNativeQueue) { + if (useEventPlugin) { setPluginTopic(Trigger.SOLIDITY_TRIGGER, triggerConfig.getTopic()); } } else if (EventPluginConfig.SOLIDITY_EVENT_NAME @@ -340,7 +356,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) { solidityEventTriggerEnable = false; } - if (!useNativeQueue) { + if (useEventPlugin) { setPluginTopic(Trigger.SOLIDITY_EVENT_TRIGGER, triggerConfig.getTopic()); } } else if (EventPluginConfig.SOLIDITY_LOG_NAME @@ -354,7 +370,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) { solidityLogTriggerEnable = false; solidityLogTriggerRedundancy = false; } - if (!useNativeQueue) { + if (useEventPlugin) { setPluginTopic(Trigger.SOLIDITY_LOG_TRIGGER, triggerConfig.getTopic()); } } @@ -364,7 +380,8 @@ public void postSolidityTrigger(SolidityTrigger trigger) { if (useNativeQueue) { NativeMessageQueue.getInstance() .publishTrigger(toJsonString(trigger), trigger.getTriggerName()); - } else { + } + if (useEventPlugin) { eventListeners.forEach(listener -> listener.handleSolidityTrigger(toJsonString(trigger))); } @@ -427,6 +444,9 @@ public synchronized boolean isContractLogTriggerRedundancy() { } private void setPluginTopic(int eventType, String topic) { + if (Objects.isNull(eventListeners)) { + return; + } eventListeners.forEach(listener -> listener.setTopic(eventType, topic)); } @@ -485,7 +505,8 @@ public void postBlockTrigger(BlockLogTrigger trigger) { if (useNativeQueue) { NativeMessageQueue.getInstance() .publishTrigger(toJsonString(trigger), trigger.getTriggerName()); - } else { + } + if (useEventPlugin) { eventListeners.forEach(listener -> listener.handleBlockEvent(toJsonString(trigger))); } @@ -495,7 +516,8 @@ public void postSolidityLogTrigger(ContractLogTrigger trigger) { if (useNativeQueue) { NativeMessageQueue.getInstance() .publishTrigger(toJsonString(trigger), trigger.getTriggerName()); - } else { + } + if (useEventPlugin) { eventListeners.forEach(listener -> listener.handleSolidityLogTrigger(toJsonString(trigger))); } @@ -505,7 +527,8 @@ public void postSolidityEventTrigger(ContractEventTrigger trigger) { if (useNativeQueue) { NativeMessageQueue.getInstance() .publishTrigger(toJsonString(trigger), trigger.getTriggerName()); - } else { + } + if (useEventPlugin) { eventListeners.forEach(listener -> listener.handleSolidityEventTrigger(toJsonString(trigger))); } @@ -515,7 +538,8 @@ public void postTransactionTrigger(TransactionLogTrigger trigger) { if (useNativeQueue) { NativeMessageQueue.getInstance() .publishTrigger(toJsonString(trigger), trigger.getTriggerName()); - } else { + } + if (useEventPlugin) { eventListeners.forEach(listener -> listener.handleTransactionTrigger(toJsonString(trigger))); } } @@ -524,7 +548,8 @@ public void postContractLogTrigger(ContractLogTrigger trigger) { if (useNativeQueue) { NativeMessageQueue.getInstance() .publishTrigger(toJsonString(trigger), trigger.getTriggerName()); - } else { + } + if (useEventPlugin) { eventListeners.forEach(listener -> listener.handleContractLogTrigger(toJsonString(trigger))); } @@ -534,14 +559,17 @@ public void postContractEventTrigger(ContractEventTrigger trigger) { if (useNativeQueue) { NativeMessageQueue.getInstance() .publishTrigger(toJsonString(trigger), trigger.getTriggerName()); - } else { + } + if (useEventPlugin) { eventListeners.forEach(listener -> listener.handleContractEventTrigger(toJsonString(trigger))); } } public boolean isBusy() { - if (useNativeQueue) { + // Back-pressure only applies to the plugin's async queue. The native ZMQ queue + // has its own bounded send queue, so when the plugin is not active we never block. + if (!useEventPlugin) { return false; } int queueSize = 0; diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 2d6660f9a6a..4eab33e0c01 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -372,16 +372,17 @@ private static void applyEventConfig(EventConfig ec) { epc.setBindPort(nq.getBindport()); epc.setSendQueueLength(nq.getSendqueuelength()); - if (!nq.isUseNativeQueue()) { - if (StringUtils.isNotEmpty(ec.getPath())) { - epc.setPluginPath(ec.getPath().trim()); - } - if (StringUtils.isNotEmpty(ec.getServer())) { - epc.setServerAddress(ec.getServer().trim()); - } - if (StringUtils.isNotEmpty(ec.getDbconfig())) { - epc.setDbConfig(ec.getDbconfig().trim()); - } + // Plugin settings are always copied so the event plugin (e.g. MongoDB) can run + // alongside the native ZMQ queue. The native queue and the plugin are activated + // independently in EventPluginLoader (useNativeQueue + non-empty pluginPath). + if (StringUtils.isNotEmpty(ec.getPath())) { + epc.setPluginPath(ec.getPath().trim()); + } + if (StringUtils.isNotEmpty(ec.getServer())) { + epc.setServerAddress(ec.getServer().trim()); + } + if (StringUtils.isNotEmpty(ec.getDbconfig())) { + epc.setDbConfig(ec.getDbconfig().trim()); } // topics diff --git a/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java b/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java index 2eccb9fa2a9..72418f4b92e 100644 --- a/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java +++ b/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java @@ -67,12 +67,16 @@ private void syncEvent() { if (thread.isInterrupted() || isClosed) { throw new InterruptedException(); } - if (instance.isUseNativeQueue()) { - Thread.sleep(20); - } else if (instance.isBusy()) { + // isBusy() returns false unless the event plugin is active, so native-only + // mode keeps its original 20ms pacing. In combined mode we still honor the + // plugin's back-pressure so its async queue is not flooded during sync. + if (instance.isBusy()) { Thread.sleep(100); continue; } + if (instance.isUseNativeQueue()) { + Thread.sleep(20); + } BlockEvent blockEvent = blockEventGet.getBlockEvent(tmp); realtimeEventService.flush(blockEvent, false); solidEventService.flush(blockEvent); diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index d00f334f4ce..7e22f8c5d73 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -803,7 +803,7 @@ committee = { event.subscribe = { enable = false // enable event subscribe, replaces deprecated CLI flag --es native = { - useNativeQueue = true // if true, use native message queue, else use event plugin. + useNativeQueue = true // start the native ZMQ queue. If `path` below is also set, the event plugin runs alongside it (both receive every trigger). bindport = 5555 // bind port sendqueuelength = 1000 //max length of send queue } diff --git a/framework/src/test/java/org/tron/core/event/EventPluginLoaderTest.java b/framework/src/test/java/org/tron/core/event/EventPluginLoaderTest.java index 658d42f38d9..e9d4d49e809 100644 --- a/framework/src/test/java/org/tron/core/event/EventPluginLoaderTest.java +++ b/framework/src/test/java/org/tron/core/event/EventPluginLoaderTest.java @@ -17,11 +17,17 @@ public class EventPluginLoaderTest { public void testIsBusy() { EventPluginLoader eventPluginLoader = EventPluginLoader.getInstance(); + + // When the event plugin is not active, the node is never "busy" regardless of the + // native queue: back-pressure only throttles the plugin's async queue. + ReflectUtils.setFieldValue(eventPluginLoader, "useEventPlugin", false); ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", true); boolean flag = eventPluginLoader.isBusy(); Assert.assertFalse(flag); - ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", false); + // When the plugin is active (with or without the native queue), busy-ness is driven + // by the plugin queue size. + ReflectUtils.setFieldValue(eventPluginLoader, "useEventPlugin", true); IPluginEventListener p1 = mock(IPluginEventListener.class); List list = new ArrayList<>();