diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 3fe1e878ffb..8bbc52e6d87 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -411,6 +411,9 @@ public class CommonParameter { @Setter public double rateLimiterDisconnect; // clearParam: 1.0 @Getter + @Setter + public boolean rateLimiterApiNonBlocking = false; + @Getter public RocksDbSettings rocksDBCustomSettings; @Getter public GenesisBlock genesisBlock; diff --git a/common/src/main/java/org/tron/core/config/args/RateLimiterConfig.java b/common/src/main/java/org/tron/core/config/args/RateLimiterConfig.java index eed5ef1898b..5eab6f6d92d 100644 --- a/common/src/main/java/org/tron/core/config/args/RateLimiterConfig.java +++ b/common/src/main/java/org/tron/core/config/args/RateLimiterConfig.java @@ -21,6 +21,7 @@ public class RateLimiterConfig { private P2pRateLimitConfig p2p = new P2pRateLimitConfig(); private List http = new ArrayList<>(); private List rpc = new ArrayList<>(); + private boolean apiNonBlocking = false; @Getter @Setter diff --git a/common/src/main/resources/reference.conf b/common/src/main/resources/reference.conf index 688e1590788..5e8ee2018cb 100644 --- a/common/src/main/resources/reference.conf +++ b/common/src/main/resources/reference.conf @@ -451,6 +451,7 @@ rate.limiter = { global.qps = 50000 global.ip.qps = 10000 global.api.qps = 1000 + apiNonBlocking = false } seed.node = { diff --git a/common/src/test/java/org/tron/core/config/args/RateLimiterConfigTest.java b/common/src/test/java/org/tron/core/config/args/RateLimiterConfigTest.java index 7b4d8a87d45..c3b827a8ba4 100644 --- a/common/src/test/java/org/tron/core/config/args/RateLimiterConfigTest.java +++ b/common/src/test/java/org/tron/core/config/args/RateLimiterConfigTest.java @@ -1,6 +1,7 @@ package org.tron.core.config.args; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.typesafe.config.Config; @@ -29,6 +30,7 @@ public void testDefaults() { assertEquals(1.0, rl.getP2p().getDisconnect(), 0.001); assertTrue(rl.getHttp().isEmpty()); assertTrue(rl.getRpc().isEmpty()); + assertFalse(rl.isApiNonBlocking()); } @Test @@ -40,7 +42,8 @@ public void testFromConfig() { + " http = [{ component = TestServlet, strategy = QpsRateLimiterAdapter," + " paramString = \"qps=10\" }]," + " rpc = [{ component = TestRpc, strategy = GlobalPreemptibleAdapter," - + " paramString = \"permit=1\" }]" + + " paramString = \"permit=1\" }]," + + " apiNonBlocking = true" + "}"); RateLimiterConfig rl = RateLimiterConfig.fromConfig(config); assertEquals(100, rl.getGlobal().getQps()); @@ -50,5 +53,6 @@ public void testFromConfig() { assertEquals("TestServlet", rl.getHttp().get(0).getComponent()); assertEquals(1, rl.getRpc().size()); assertEquals("TestRpc", rl.getRpc().get(0).getComponent()); + assertTrue(rl.isApiNonBlocking()); } } diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 2d6660f9a6a..f1f828b85b3 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -325,6 +325,7 @@ private static void applyRateLimiterConfig(RateLimiterConfig rl) { PARAMETER.rateLimiterSyncBlockChain = rl.getP2p().getSyncBlockChain(); PARAMETER.rateLimiterFetchInvData = rl.getP2p().getFetchInvData(); PARAMETER.rateLimiterDisconnect = rl.getP2p().getDisconnect(); + PARAMETER.rateLimiterApiNonBlocking = rl.isApiNonBlocking(); // HTTP/RPC rate limiter items: convert bean lists to business objects RateLimiterInitialization initialization = new RateLimiterInitialization(); diff --git a/framework/src/main/java/org/tron/core/services/http/RateLimiterServlet.java b/framework/src/main/java/org/tron/core/services/http/RateLimiterServlet.java index 3086cbb3619..f488c32df4c 100644 --- a/framework/src/main/java/org/tron/core/services/http/RateLimiterServlet.java +++ b/framework/src/main/java/org/tron/core/services/http/RateLimiterServlet.java @@ -107,9 +107,10 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) IRateLimiter rateLimiter = container.get(KEY_PREFIX_HTTP, getClass().getSimpleName()); // Check per-endpoint first to avoid consuming global IP/QPS quota for requests - // that would be rejected by the per-endpoint limiter anyway. - boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData); - boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData); + // that would be rejected by the per-endpoint limiter anyway. acquirePermit() + // chooses blocking or non-blocking semantics based on rate.limiter.apiNonBlocking. + boolean perEndpointAcquired = rateLimiter == null || rateLimiter.acquirePermit(runtimeData); + boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.acquirePermit(runtimeData); String contextPath = req.getContextPath(); String url = Strings.isNullOrEmpty(req.getServletPath()) diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/GlobalRateLimiter.java b/framework/src/main/java/org/tron/core/services/ratelimiter/GlobalRateLimiter.java index 4b3043274d2..11c55e3a2c3 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/GlobalRateLimiter.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/GlobalRateLimiter.java @@ -23,21 +23,43 @@ public class GlobalRateLimiter { public static boolean tryAcquire(RuntimeData runtimeData) { String ip = runtimeData.getRemoteAddr(); if (!Strings.isNullOrEmpty(ip)) { - RateLimiter r; - try { - // cache.get is atomic: only one loader executes per key under concurrent requests, - // preventing multiple RateLimiter instances from being created for the same IP. - r = cache.get(ip, () -> RateLimiter.create(IP_QPS)); - } catch (Exception e) { - logger.warn("Failed to load IP rate limiter for {}, denying request: {}", - ip, e.getMessage()); + RateLimiter r = loadIpLimiter(ip); + if (r == null || !r.tryAcquire()) { return false; } - if (!r.tryAcquire()) { + } + return rateLimiter.tryAcquire(); + } + + public static boolean acquire(RuntimeData runtimeData) { + String ip = runtimeData.getRemoteAddr(); + if (!Strings.isNullOrEmpty(ip)) { + RateLimiter r = loadIpLimiter(ip); + if (r == null) { return false; } + r.acquire(); + } + rateLimiter.acquire(); + return true; + } + + public static boolean acquirePermit(RuntimeData runtimeData) { + return Args.getInstance().isRateLimiterApiNonBlocking() + ? tryAcquire(runtimeData) + : acquire(runtimeData); + } + + private static RateLimiter loadIpLimiter(String ip) { + try { + // cache.get is atomic: only one loader executes per key under concurrent requests, + // preventing multiple RateLimiter instances from being created for the same IP. + return cache.get(ip, () -> RateLimiter.create(IP_QPS)); + } catch (Exception e) { + logger.warn("Failed to load IP rate limiter for {}, denying request: {}", + ip, e.getMessage()); + return null; } - return rateLimiter.tryAcquire(); } } diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/RateLimiterInterceptor.java b/framework/src/main/java/org/tron/core/services/ratelimiter/RateLimiterInterceptor.java index a07cf955828..85e94f2e768 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/RateLimiterInterceptor.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/RateLimiterInterceptor.java @@ -108,9 +108,10 @@ public Listener interceptCall(ServerCall call, RuntimeData runtimeData = new RuntimeData(call); // Check per-endpoint first to avoid consuming global IP/QPS quota for requests - // that would be rejected by the per-endpoint limiter anyway. - boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData); - boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData); + // that would be rejected by the per-endpoint limiter anyway. acquirePermit() + // chooses blocking or non-blocking semantics based on rate.limiter.apiNonBlocking. + boolean perEndpointAcquired = rateLimiter == null || rateLimiter.acquirePermit(runtimeData); + boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.acquirePermit(runtimeData); if (!acquireResource) { // Release the per-endpoint permit when global rejected, to avoid semaphore leak. diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/DefaultBaseQqsAdapter.java b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/DefaultBaseQqsAdapter.java index 8f5b5a487bf..63d4cc77587 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/DefaultBaseQqsAdapter.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/DefaultBaseQqsAdapter.java @@ -15,4 +15,9 @@ public DefaultBaseQqsAdapter(String paramString) { public boolean tryAcquire(RuntimeData data) { return strategy.tryAcquire(); } + + @Override + public boolean acquire(RuntimeData data) { + return strategy.acquire(); + } } \ No newline at end of file diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/GlobalPreemptibleAdapter.java b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/GlobalPreemptibleAdapter.java index 4adc142ed28..eb85baa8b41 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/GlobalPreemptibleAdapter.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/GlobalPreemptibleAdapter.java @@ -21,4 +21,8 @@ public boolean tryAcquire(RuntimeData data) { return strategy.tryAcquire(); } + @Override + public boolean acquire(RuntimeData data) { + return strategy.acquire(); + } } \ No newline at end of file diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IPQPSRateLimiterAdapter.java b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IPQPSRateLimiterAdapter.java index c6fb089063a..0ebd21149a7 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IPQPSRateLimiterAdapter.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IPQPSRateLimiterAdapter.java @@ -16,4 +16,9 @@ public boolean tryAcquire(RuntimeData data) { return strategy.tryAcquire(data.getRemoteAddr()); } + @Override + public boolean acquire(RuntimeData data) { + return strategy.acquire(data.getRemoteAddr()); + } + } \ No newline at end of file diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IRateLimiter.java b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IRateLimiter.java index 46ed8beee92..29f7b61b6a5 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IRateLimiter.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/IRateLimiter.java @@ -1,9 +1,17 @@ package org.tron.core.services.ratelimiter.adapter; +import org.tron.core.config.args.Args; import org.tron.core.services.ratelimiter.RuntimeData; public interface IRateLimiter { boolean tryAcquire(RuntimeData data); + boolean acquire(RuntimeData data); + + default boolean acquirePermit(RuntimeData data) { + return Args.getInstance().isRateLimiterApiNonBlocking() + ? tryAcquire(data) + : acquire(data); + } } diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/QpsRateLimiterAdapter.java b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/QpsRateLimiterAdapter.java index 846a5eb1c4e..62074eac885 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/QpsRateLimiterAdapter.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/adapter/QpsRateLimiterAdapter.java @@ -16,4 +16,9 @@ public boolean tryAcquire(RuntimeData data) { return strategy.tryAcquire(); } + @Override + public boolean acquire(RuntimeData data) { + return strategy.acquire(); + } + } \ No newline at end of file diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/GlobalPreemptibleStrategy.java b/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/GlobalPreemptibleStrategy.java index 0a29183d762..e7b7f560b29 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/GlobalPreemptibleStrategy.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/GlobalPreemptibleStrategy.java @@ -3,11 +3,15 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class GlobalPreemptibleStrategy extends Strategy { public static final String STRATEGY_PARAM_PERMIT = "permit"; public static final int DEFAULT_PERMIT_NUM = 1; + public static final int DEFAULT_ACQUIRE_TIMEOUT = 2; private Semaphore sp; public GlobalPreemptibleStrategy(String paramString) { @@ -23,15 +27,25 @@ protected Map defaultParam() { return map; } - // Non-blocking: immediately rejects if no permit is available. - // Intentional change from the previous tryAcquire(2, TimeUnit.SECONDS) behaviour: - // blocking the caller for up to 2 s ties up Netty IO / gRPC executor threads and - // masks overload rather than shedding it. All rate-limiting in this stack is now - // non-blocking to keep the thread model consistent with GlobalRateLimiter. + // Non-blocking: immediately rejects if no permit is available. Used when the + // apiNonBlocking switch is on, to shed overload instead of tying up Netty IO / + // gRPC executor threads while waiting for a permit. public boolean tryAcquire() { return sp.tryAcquire(); } + public boolean acquire() { + try { + return sp.tryAcquire(DEFAULT_ACQUIRE_TIMEOUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Restore the interrupt flag and reject — caller must not release a permit + // it never acquired. + logger.error("acquire permit with error: {}", e.getMessage()); + Thread.currentThread().interrupt(); + return false; + } + } + public void release() { sp.release(); } diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/IPQpsStrategy.java b/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/IPQpsStrategy.java index 6589c90fe1d..7ffd1f04eb7 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/IPQpsStrategy.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/IPQpsStrategy.java @@ -22,17 +22,29 @@ public IPQpsStrategy(String paramString) { } public boolean tryAcquire(String ip) { - RateLimiter limiter; + RateLimiter limiter = loadLimiter(ip); + return limiter != null && limiter.tryAcquire(); + } + + public boolean acquire(String ip) { + RateLimiter limiter = loadLimiter(ip); + if (limiter == null) { + return false; + } + limiter.acquire(); + return true; + } + + private RateLimiter loadLimiter(String ip) { try { // cache.get is atomic: only one loader executes per key under concurrent requests, // preventing multiple RateLimiter instances from being created for the same IP. - limiter = ipLimiter.get(ip, this::newRateLimiter); + return ipLimiter.get(ip, this::newRateLimiter); } catch (Exception e) { logger.warn("Failed to load IP rate limiter for {}, denying request: {}", ip, e.getMessage()); - return false; + return null; } - return limiter.tryAcquire(); } private RateLimiter newRateLimiter() { diff --git a/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/QpsStrategy.java b/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/QpsStrategy.java index 7e0466448b3..9116af1b7da 100644 --- a/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/QpsStrategy.java +++ b/framework/src/main/java/org/tron/core/services/ratelimiter/strategy/QpsStrategy.java @@ -29,4 +29,9 @@ protected Map defaultParam() { public boolean tryAcquire() { return rateLimiter.tryAcquire(); } + + public boolean acquire() { + rateLimiter.acquire(); + return true; + } } \ No newline at end of file diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index d00f334f4ce..d6d3ab236a6 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -415,7 +415,7 @@ node { ## rate limiter config rate.limiter = { - # Every api could only set a specific rate limit strategy. Three non-blocking strategy are supported: + # Every api could only set a specific rate limit strategy. Three strategy are supported: # GlobalPreemptibleAdapter: The number of preemptible resource or maximum concurrent requests globally. # QpsRateLimiterAdapter: qps is the average request count in one second supported by the server, it could be a Double or a Integer. # IPQPSRateLimiterAdapter: similar to the QpsRateLimiterAdapter, qps could be a Double or a Integer. @@ -473,6 +473,9 @@ rate.limiter = { global.qps = 50000 # IP-based global qps, default 10000 global.ip.qps = 10000 + # If true, API rate limiters reject immediately on overload (non-blocking). + # If false (default), callers wait for a permit (blocking, the legacy behaviour). + apiNonBlocking = false } diff --git a/framework/src/test/java/org/tron/core/services/http/RateLimiterServletTest.java b/framework/src/test/java/org/tron/core/services/http/RateLimiterServletTest.java index 1ae341696eb..8cca558d151 100644 --- a/framework/src/test/java/org/tron/core/services/http/RateLimiterServletTest.java +++ b/framework/src/test/java/org/tron/core/services/http/RateLimiterServletTest.java @@ -167,14 +167,14 @@ public void testBuildsEachWhitelistedAdapter() { @Test public void testPerEndpointRejectedDoesNotConsumeGlobalQuota() throws Exception { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(false); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(false); container.add(KEY_HTTP, "TestServlet", perEndpoint); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { servlet.service(request, response); - globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), never()); - // tryAcquire returned false — no permit was taken, nothing to release + globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), never()); + // acquirePermit returned false — no permit was taken, nothing to release verify(perEndpoint, never()).release(); } } @@ -186,13 +186,13 @@ public void testPerEndpointRejectedDoesNotConsumeGlobalQuota() throws Exception @Test public void testNonPreemptiblePerEndpointRejectedDoesNotConsumeGlobal() throws Exception { IRateLimiter perEndpoint = Mockito.mock(IRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(false); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(false); container.add(KEY_HTTP, "TestServlet", perEndpoint); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { servlet.service(request, response); - globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), never()); + globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), never()); } } @@ -203,11 +203,11 @@ public void testNonPreemptiblePerEndpointRejectedDoesNotConsumeGlobal() throws E @Test public void testGlobalRejectedReleasesPreemptiblePermit() throws Exception { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true); container.add(KEY_HTTP, "TestServlet", perEndpoint); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(false); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(false); servlet.service(request, response); @@ -223,11 +223,11 @@ public void testGlobalRejectedReleasesPreemptiblePermit() throws Exception { @Test public void testBothPassPermitReleasedAfterRequest() throws Exception { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true); container.add(KEY_HTTP, "TestServlet", perEndpoint); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true); servlet.service(request, response); @@ -243,11 +243,11 @@ public void testBothPassPermitReleasedAfterRequest() throws Exception { public void testNullRateLimiterConsultsOnlyGlobal() throws Exception { // No entry added to container — container.get() returns null try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true); servlet.service(request, response); - globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), times(1)); + globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), times(1)); } } } diff --git a/framework/src/test/java/org/tron/core/services/ratelimiter/GlobalRateLimiterTest.java b/framework/src/test/java/org/tron/core/services/ratelimiter/GlobalRateLimiterTest.java index c34d49d9009..8ea0f908899 100644 --- a/framework/src/test/java/org/tron/core/services/ratelimiter/GlobalRateLimiterTest.java +++ b/framework/src/test/java/org/tron/core/services/ratelimiter/GlobalRateLimiterTest.java @@ -1,5 +1,10 @@ package org.tron.core.services.ratelimiter; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.RateLimiter; @@ -9,6 +14,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.tron.common.TestConstants; import org.tron.core.config.args.Args; @@ -135,6 +143,104 @@ public void testPerIpLimitsAreIndependent() throws Exception { Assert.assertFalse(GlobalRateLimiter.tryAcquire(runtimeDataFor("2.2.2.2"))); } + /** + * acquire() must drain the IP limiter before the global limiter, mirroring + * tryAcquire(). A reversed order would let one chatty IP consume global + * quota even when its own per-IP budget is exhausted. + */ + @Test + public void testAcquireOrdersIpBeforeGlobal() throws Exception { + RateLimiter globalMock = Mockito.mock(RateLimiter.class); + RateLimiter ipMock = Mockito.mock(RateLimiter.class); + injectRateLimiter(globalMock); + Cache seeded = CacheBuilder.newBuilder() + .maximumSize(10).expireAfterWrite(1, TimeUnit.HOURS).build(); + seeded.put("10.0.0.1", ipMock); + injectCache(seeded); + + Assert.assertTrue(GlobalRateLimiter.acquire(runtimeDataFor("10.0.0.1"))); + + InOrder inOrder = Mockito.inOrder(ipMock, globalMock); + inOrder.verify(ipMock).acquire(); + inOrder.verify(globalMock).acquire(); + } + + /** + * If the IP limiter cannot be created (cache loader throws), acquire() + * returns false without consuming a global token — same fail-closed + * behaviour as tryAcquire(). + */ + @Test + public void testAcquireDoesNotConsumeGlobalWhenIpLoaderFails() throws Exception { + RateLimiter globalMock = Mockito.mock(RateLimiter.class); + injectRateLimiter(globalMock); + // RateLimiter.create(-1.0) throws IllegalArgumentException, so the + // cache loader fails and loadIpLimiter() returns null. + injectIpQps(-1.0); + injectCache(CacheBuilder.newBuilder() + .maximumSize(10).expireAfterWrite(1, TimeUnit.HOURS).build()); + + Assert.assertFalse(GlobalRateLimiter.acquire(runtimeDataFor("10.0.0.1"))); + + Mockito.verify(globalMock, never()).acquire(); + } + + /** + * acquirePermit dispatches based on rate.limiter.apiNonBlocking: + * switch on → only tryAcquire runs; switch off → only acquire runs. + * These tests pin that contract on the static dispatcher; the matching + * default-method contract for IRateLimiter is covered in AdaptorTest. + */ + @Test + public void testAcquirePermitDispatchesToTryAcquireWhenNonBlocking() throws Exception { + Args.getInstance().setRateLimiterApiNonBlocking(true); + RuntimeData rd = runtimeDataFor("10.0.0.1"); + + try (MockedStatic mock = mockStatic(GlobalRateLimiter.class)) { + mock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenCallRealMethod(); + mock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true); + + Assert.assertTrue(GlobalRateLimiter.acquirePermit(rd)); + + mock.verify(() -> GlobalRateLimiter.tryAcquire(any()), times(1)); + mock.verify(() -> GlobalRateLimiter.acquire(any()), never()); + } + } + + @Test + public void testAcquirePermitDispatchesToAcquireWhenBlocking() throws Exception { + Args.getInstance().setRateLimiterApiNonBlocking(false); + RuntimeData rd = runtimeDataFor("10.0.0.1"); + + try (MockedStatic mock = mockStatic(GlobalRateLimiter.class)) { + mock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenCallRealMethod(); + mock.when(() -> GlobalRateLimiter.acquire(any())).thenReturn(true); + + Assert.assertTrue(GlobalRateLimiter.acquirePermit(rd)); + + mock.verify(() -> GlobalRateLimiter.acquire(any()), times(1)); + mock.verify(() -> GlobalRateLimiter.tryAcquire(any()), never()); + } + } + + private static void injectRateLimiter(RateLimiter rl) throws Exception { + Field f = GlobalRateLimiter.class.getDeclaredField("rateLimiter"); + f.setAccessible(true); + f.set(null, rl); + } + + private static void injectCache(Cache cache) throws Exception { + Field f = GlobalRateLimiter.class.getDeclaredField("cache"); + f.setAccessible(true); + f.set(null, cache); + } + + private static void injectIpQps(double qps) throws Exception { + Field f = GlobalRateLimiter.class.getDeclaredField("IP_QPS"); + f.setAccessible(true); + f.set(null, qps); + } + @AfterClass public static void destroy() { Args.clearParam(); diff --git a/framework/src/test/java/org/tron/core/services/ratelimiter/RateLimiterInterceptorTest.java b/framework/src/test/java/org/tron/core/services/ratelimiter/RateLimiterInterceptorTest.java index 6cf02a25050..bbc365f3e0b 100644 --- a/framework/src/test/java/org/tron/core/services/ratelimiter/RateLimiterInterceptorTest.java +++ b/framework/src/test/java/org/tron/core/services/ratelimiter/RateLimiterInterceptorTest.java @@ -95,13 +95,13 @@ public void setUp() throws Exception { @Test public void testPerEndpointRejectedDoesNotConsumeGlobalQuota() { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(false); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(false); container.add(KEY_RPC, METHOD_NAME, perEndpoint); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { interceptor.interceptCall(call, headers, next); - globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), never()); + globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), never()); verify(perEndpoint, never()).release(); } } @@ -112,13 +112,13 @@ public void testPerEndpointRejectedDoesNotConsumeGlobalQuota() { @Test public void testNonPreemptiblePerEndpointRejectedDoesNotConsumeGlobal() { IRateLimiter perEndpoint = Mockito.mock(IRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(false); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(false); container.add(KEY_RPC, METHOD_NAME, perEndpoint); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { interceptor.interceptCall(call, headers, next); - globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), never()); + globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), never()); } } @@ -129,11 +129,11 @@ public void testNonPreemptiblePerEndpointRejectedDoesNotConsumeGlobal() { @Test public void testGlobalRejectedReleasesPreemptiblePermit() { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true); container.add(KEY_RPC, METHOD_NAME, perEndpoint); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(false); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(false); interceptor.interceptCall(call, headers, next); @@ -153,12 +153,12 @@ public void testGlobalRejectedReleasesPreemptiblePermit() { @Test public void testStartCallExceptionReleasesPermitAndClosesCall() throws Exception { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true); container.add(KEY_RPC, METHOD_NAME, perEndpoint); when(next.startCall(any(), any())).thenThrow(new RuntimeException("handler crash")); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true); interceptor.interceptCall(call, headers, next); @@ -176,14 +176,14 @@ public void testStartCallExceptionReleasesPermitAndClosesCall() throws Exception @Test public void testListenerReleasesPermitOnComplete() throws Exception { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true); container.add(KEY_RPC, METHOD_NAME, perEndpoint); ServerCall.Listener delegate = Mockito.mock(ServerCall.Listener.class); when(next.startCall(any(), any())).thenReturn(delegate); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true); ServerCall.Listener listener = interceptor.interceptCall(call, headers, next); listener.onComplete(); @@ -199,14 +199,14 @@ public void testListenerReleasesPermitOnComplete() throws Exception { @Test public void testListenerReleasesPermitOnCancel() throws Exception { IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class); - when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true); + when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true); container.add(KEY_RPC, METHOD_NAME, perEndpoint); ServerCall.Listener delegate = Mockito.mock(ServerCall.Listener.class); when(next.startCall(any(), any())).thenReturn(delegate); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true); ServerCall.Listener listener = interceptor.interceptCall(call, headers, next); listener.onCancel(); @@ -225,11 +225,11 @@ public void testNullRateLimiterConsultsOnlyGlobal() throws Exception { when(next.startCall(any(), any())).thenReturn(delegate); try (MockedStatic globalMock = mockStatic(GlobalRateLimiter.class)) { - globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true); + globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true); interceptor.interceptCall(call, headers, next); - globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), times(1)); + globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), times(1)); } } } diff --git a/framework/src/test/java/org/tron/core/services/ratelimiter/adaptor/AdaptorTest.java b/framework/src/test/java/org/tron/core/services/ratelimiter/adaptor/AdaptorTest.java index 69a6c688200..5ab85a42bbf 100644 --- a/framework/src/test/java/org/tron/core/services/ratelimiter/adaptor/AdaptorTest.java +++ b/framework/src/test/java/org/tron/core/services/ratelimiter/adaptor/AdaptorTest.java @@ -4,12 +4,18 @@ import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.tron.common.TestConstants; import org.tron.common.es.ExecutorServiceManager; import org.tron.common.utils.ReflectUtils; +import org.tron.core.config.args.Args; +import org.tron.core.services.ratelimiter.RuntimeData; import org.tron.core.services.ratelimiter.adapter.GlobalPreemptibleAdapter; import org.tron.core.services.ratelimiter.adapter.IPQPSRateLimiterAdapter; +import org.tron.core.services.ratelimiter.adapter.IRateLimiter; import org.tron.core.services.ratelimiter.adapter.QpsRateLimiterAdapter; import org.tron.core.services.ratelimiter.strategy.GlobalPreemptibleStrategy; import org.tron.core.services.ratelimiter.strategy.IPQpsStrategy; @@ -17,6 +23,61 @@ public class AdaptorTest { + @Before + public void setUp() { + Args.setParam(new String[0], TestConstants.TEST_CONF); + } + + @AfterClass + public static void tearDown() { + Args.clearParam(); + } + + /** + * IRateLimiter.acquirePermit is a default method that dispatches based on + * rate.limiter.apiNonBlocking. The two cases below pin that contract: with + * the switch on, only tryAcquire is invoked; with the switch off, only + * acquire is invoked. Breaking either direction is a behavioural regression. + */ + @Test + public void testAcquirePermitDispatchesToTryAcquireWhenNonBlocking() { + Args.getInstance().setRateLimiterApiNonBlocking(true); + CountingRateLimiter limiter = new CountingRateLimiter(); + + Assert.assertTrue(limiter.acquirePermit(null)); + + Assert.assertEquals(1, limiter.tryAcquireCount); + Assert.assertEquals(0, limiter.acquireCount); + } + + @Test + public void testAcquirePermitDispatchesToAcquireWhenBlocking() { + Args.getInstance().setRateLimiterApiNonBlocking(false); + CountingRateLimiter limiter = new CountingRateLimiter(); + + Assert.assertTrue(limiter.acquirePermit(null)); + + Assert.assertEquals(0, limiter.tryAcquireCount); + Assert.assertEquals(1, limiter.acquireCount); + } + + private static final class CountingRateLimiter implements IRateLimiter { + int tryAcquireCount; + int acquireCount; + + @Override + public boolean tryAcquire(RuntimeData data) { + tryAcquireCount++; + return true; + } + + @Override + public boolean acquire(RuntimeData data) { + acquireCount++; + return true; + } + } + @Test public void testStrategy() { String paramString1 = "qps=5 notExist=6";