Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ event.subscribe = {
enable = false

native = {
# start the native ZMQ queue; if `path` is also set, the event plugin runs alongside it
useNativeQueue = true
bindport = 5555
sendqueuelength = 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class EventPluginLoader {
@Getter
private boolean useNativeQueue = false;

@Getter
private boolean useEventPlugin = false;

public static EventPluginLoader getInstance() {
if (Objects.isNull(instance)) {
synchronized (EventPluginLoader.class) {
Expand Down Expand Up @@ -234,12 +237,25 @@ public boolean start(EventPluginConfig config) {
this.triggerConfigList = config.getTriggerConfigList();

useNativeQueue = config.isUseNativeQueue();
useEventPlugin = StringUtils.hasText(config.getPluginPath());

if (!useNativeQueue && !useEventPlugin) {
logger.error("no event output configured: native queue is off and plugin path is empty");
return false;
}

// Launch the plugin first so eventListeners are populated before any topic is
// registered. launchNativeQueue() also iterates the trigger config and (when the
// plugin is active) calls setPluginTopic(), which needs eventListeners to exist.
if (useEventPlugin && !launchEventPlugin(config)) {
return false;
}

if (config.isUseNativeQueue()) {
return launchNativeQueue(config);
if (useNativeQueue && !launchNativeQueue(config)) {
return false;
}

return launchEventPlugin(config);
return true;
}

private void setPluginConfig() {
Expand Down Expand Up @@ -271,7 +287,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
blockLogTriggerSolidified = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.BLOCK_TRIGGER, triggerConfig.getTopic());
}

Expand All @@ -291,7 +307,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
transactionLogTriggerSolidified = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.TRANSACTION_TRIGGER, triggerConfig.getTopic());
}

Expand All @@ -303,7 +319,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
contractEventTriggerEnable = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.CONTRACTEVENT_TRIGGER, triggerConfig.getTopic());
}

Expand All @@ -319,7 +335,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
contractLogTriggerRedundancy = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.CONTRACTLOG_TRIGGER, triggerConfig.getTopic());
}
} else if (EventPluginConfig.SOLIDITY_TRIGGER_NAME
Expand All @@ -329,7 +345,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
} else {
solidityTriggerEnable = false;
}
if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.SOLIDITY_TRIGGER, triggerConfig.getTopic());
}
} else if (EventPluginConfig.SOLIDITY_EVENT_NAME
Expand All @@ -340,7 +356,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
solidityEventTriggerEnable = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.SOLIDITY_EVENT_TRIGGER, triggerConfig.getTopic());
}
} else if (EventPluginConfig.SOLIDITY_LOG_NAME
Expand All @@ -354,7 +370,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
solidityLogTriggerEnable = false;
solidityLogTriggerRedundancy = false;
}
if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.SOLIDITY_LOG_TRIGGER, triggerConfig.getTopic());
}
}
Expand All @@ -364,7 +380,8 @@ public void postSolidityTrigger(SolidityTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleSolidityTrigger(toJsonString(trigger)));
}
Expand Down Expand Up @@ -427,6 +444,9 @@ public synchronized boolean isContractLogTriggerRedundancy() {
}

private void setPluginTopic(int eventType, String topic) {
if (Objects.isNull(eventListeners)) {
return;
}
eventListeners.forEach(listener -> listener.setTopic(eventType, topic));
}

Expand Down Expand Up @@ -485,7 +505,8 @@ public void postBlockTrigger(BlockLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleBlockEvent(toJsonString(trigger)));
}
Expand All @@ -495,7 +516,8 @@ public void postSolidityLogTrigger(ContractLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleSolidityLogTrigger(toJsonString(trigger)));
}
Expand All @@ -505,7 +527,8 @@ public void postSolidityEventTrigger(ContractEventTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleSolidityEventTrigger(toJsonString(trigger)));
}
Expand All @@ -515,7 +538,8 @@ public void postTransactionTrigger(TransactionLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener -> listener.handleTransactionTrigger(toJsonString(trigger)));
}
}
Expand All @@ -524,7 +548,8 @@ public void postContractLogTrigger(ContractLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleContractLogTrigger(toJsonString(trigger)));
}
Expand All @@ -534,14 +559,17 @@ public void postContractEventTrigger(ContractEventTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleContractEventTrigger(toJsonString(trigger)));
}
}

public boolean isBusy() {
if (useNativeQueue) {
// Back-pressure only applies to the plugin's async queue. The native ZMQ queue
// has its own bounded send queue, so when the plugin is not active we never block.
if (!useEventPlugin) {
return false;
}
int queueSize = 0;
Expand Down
21 changes: 11 additions & 10 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,17 @@ private static void applyEventConfig(EventConfig ec) {
epc.setBindPort(nq.getBindport());
epc.setSendQueueLength(nq.getSendqueuelength());

if (!nq.isUseNativeQueue()) {
if (StringUtils.isNotEmpty(ec.getPath())) {
epc.setPluginPath(ec.getPath().trim());
}
if (StringUtils.isNotEmpty(ec.getServer())) {
epc.setServerAddress(ec.getServer().trim());
}
if (StringUtils.isNotEmpty(ec.getDbconfig())) {
epc.setDbConfig(ec.getDbconfig().trim());
}
// Plugin settings are always copied so the event plugin (e.g. MongoDB) can run
// alongside the native ZMQ queue. The native queue and the plugin are activated
// independently in EventPluginLoader (useNativeQueue + non-empty pluginPath).
if (StringUtils.isNotEmpty(ec.getPath())) {
epc.setPluginPath(ec.getPath().trim());
}
if (StringUtils.isNotEmpty(ec.getServer())) {
epc.setServerAddress(ec.getServer().trim());
}
if (StringUtils.isNotEmpty(ec.getDbconfig())) {
epc.setDbConfig(ec.getDbconfig().trim());
}

// topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ private void syncEvent() {
if (thread.isInterrupted() || isClosed) {
throw new InterruptedException();
}
if (instance.isUseNativeQueue()) {
Thread.sleep(20);
} else if (instance.isBusy()) {
// isBusy() returns false unless the event plugin is active, so native-only
// mode keeps its original 20ms pacing. In combined mode we still honor the
// plugin's back-pressure so its async queue is not flooded during sync.
if (instance.isBusy()) {
Thread.sleep(100);
continue;
}
if (instance.isUseNativeQueue()) {
Thread.sleep(20);
}
BlockEvent blockEvent = blockEventGet.getBlockEvent(tmp);
realtimeEventService.flush(blockEvent, false);
solidEventService.flush(blockEvent);
Expand Down
2 changes: 1 addition & 1 deletion framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ committee = {
event.subscribe = {
enable = false // enable event subscribe, replaces deprecated CLI flag --es
native = {
useNativeQueue = true // if true, use native message queue, else use event plugin.
useNativeQueue = true // start the native ZMQ queue. If `path` below is also set, the event plugin runs alongside it (both receive every trigger).
bindport = 5555 // bind port
sendqueuelength = 1000 //max length of send queue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ public class EventPluginLoaderTest {
public void testIsBusy() {

EventPluginLoader eventPluginLoader = EventPluginLoader.getInstance();

// When the event plugin is not active, the node is never "busy" regardless of the
// native queue: back-pressure only throttles the plugin's async queue.
ReflectUtils.setFieldValue(eventPluginLoader, "useEventPlugin", false);
ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", true);
boolean flag = eventPluginLoader.isBusy();
Assert.assertFalse(flag);

ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", false);
// When the plugin is active (with or without the native queue), busy-ness is driven
// by the plugin queue size.
ReflectUtils.setFieldValue(eventPluginLoader, "useEventPlugin", true);

IPluginEventListener p1 = mock(IPluginEventListener.class);
List<IPluginEventListener> list = new ArrayList<>();
Expand Down
Loading