Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/common/plugin-change-check/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,6 @@ runs:
# ==========xds service is needed to test?==========
if [ ${{ env.sermantAgentCoreXdsServiceChanged }} == 'true' -o \
${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' -o \
${{ env.sermantFlowcontrolChanged }} == 'true'];then
${{ env.sermantFlowcontrolChanged }} == 'true' ];then
echo "enableXdsFlowControl=true" >> $GITHUB_ENV
fi
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ public void testRetry() {

// Test meet the matching rules, retry will not be triggered
result = doGet(buildGateWayErrorUrl(HttpClientType.HTTP_CLIENT, "v1"));
Assertions.assertEquals("3", result.getData());
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP2, "v1"));
Assertions.assertEquals("3", result.getData());
Assertions.assertEquals("2", result.getData());
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP2, "v1"));
Assertions.assertEquals("4", result.getData());
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP2, "v1"));
Assertions.assertEquals("3", result.getData());
result = doGet(buildGateWayErrorUrl(HttpClientType.HTTP_URL_CONNECTION, "v1"));
Assertions.assertEquals("4", result.getData());
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP3, "v1"));
Assertions.assertEquals("5", result.getData());
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP3, "v1"));
Assertions.assertTrue("4".equals(result.getData()) || "6".equals(result.getData()));
resetRequestCount();

// Test the retry be triggered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,15 @@ protected Optional<ServiceInstance> chooseServiceInstanceForXds() {
if (CollectionUtils.isEmpty(serviceInstanceSet)) {
return Optional.empty();
}
removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet);
if (RetryContext.INSTANCE.isPolicyNeedRetry()) {
removeRetriedServiceInstance(serviceInstanceSet);
}
removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet);
return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(serviceInstanceSet, scenarioInfo));
}

private void removeRetriedServiceInstance(Set<ServiceInstance> serviceInstanceSet) {
RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy();
retryPolicy.retryMark();
Set<Object> retriedInstance = retryPolicy.getAllRetriedInstance();
Set<ServiceInstance> allInstance = new HashSet<>(serviceInstanceSet);
for (Object retryInstance : retriedInstance) {
Expand All @@ -271,7 +270,9 @@ private ServiceInstance chooseServiceInstanceByLoadBalancer(Set<ServiceInstance>
FlowControlScenario scenarioInfo) {
XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory.getLoadBalancer(scenarioInfo.getServiceName(),
scenarioInfo.getClusterName());
return loadBalancer.selectInstance(new ArrayList<>(instanceSet));
ServiceInstance serviceInstance = loadBalancer.selectInstance(new ArrayList<>(instanceSet));
RetryContext.INSTANCE.updateRetriedServiceInstance(serviceInstance);
return serviceInstance;
}

private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set<ServiceInstance> instanceSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public class DispatcherServletInterceptor extends InterceptorSupporter {

private Function<Object, String> getRequestUri;

private Function<Object, String> getPathInfo;

private Function<Object, String> getMethod;

private Function<Object, Enumeration<String>> getHeaderNames;
Expand Down Expand Up @@ -89,11 +87,9 @@ private Optional<HttpRequestEntity> convertToHttpEntity(Object request) {
if (request == null) {
return Optional.empty();
}
String uri = getRequestUri.apply(request);
return Optional.of(new HttpRequestEntity.Builder()
.setRequestType(RequestType.SERVER)
.setPathInfo(getPathInfo.apply(request))
.setServletPath(uri)
.setApiPath(getRequestUri.apply(request))
.setHeaders(getHeaders(request))
.setMethod(getMethod.apply(request))
.setServiceName(getHeader.apply(request, ConfigConst.FLOW_REMOTE_SERVICE_NAME_HEADER_KEY))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't delete this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Expand Down Expand Up @@ -232,7 +228,6 @@ private void initFunction() {
boolean canLoadLowVersion = canLoadLowVersion();
if (canLoadLowVersion) {
getRequestUri = obj -> ((HttpServletRequest) obj).getRequestURI();
getPathInfo = obj -> ((HttpServletRequest) obj).getPathInfo();
getMethod = obj -> ((HttpServletRequest) obj).getMethod();
getHeaderNames = obj -> ((HttpServletRequest) obj).getHeaderNames();
getHeader = (obj, key) -> ((HttpServletRequest) obj).getHeader(key);
Expand All @@ -246,7 +241,6 @@ private void initFunction() {
setStatus = (obj, code) -> ((HttpServletResponse) obj).setStatus(code);
} else {
getRequestUri = this::getRequestUri;
getPathInfo = this::getPathInfo;
getMethod = this::getMethod;
getHeaderNames = this::getHeaderNames;
getHeader = this::getHeader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

package io.sermant.flowcontrol.retry.client;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.flowcontrol.AbstractXdsDeclarer;

/**
* For OKHTTP requests, obtain the instance list from the registry to block them
*
* @author zhp
* @since 2024-12-20
*/
public class OkHttp3ClientDeclarer extends AbstractPluginDeclarer {
public class OkHttp3ClientDeclarer extends AbstractXdsDeclarer {
/**
* The fully qualified name of the enhanced okhttp request
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

package io.sermant.flowcontrol.retry.client;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.flowcontrol.AbstractXdsDeclarer;

/**
* For OKHTTP requests, modify the URL of request
*
* @author zhp
* @since 2024-12-20
*/
public class OkHttpClientInterceptorChainDeclarer extends AbstractPluginDeclarer {
public class OkHttpClientInterceptorChainDeclarer extends AbstractXdsDeclarer {
private static final String ENHANCE_CLASSES =
"com.squareup.okhttp.Call$ApplicationInterceptorChain";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (C) 2025-2025 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.retry.client;


import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.plugin.service.PluginServiceManager;
import io.sermant.core.service.ServiceManager;
import io.sermant.core.service.xds.XdsCoreService;
import io.sermant.core.service.xds.XdsFlowControlService;
import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers;
import io.sermant.flowcontrol.common.config.FlowControlConfig;
import io.sermant.flowcontrol.common.entity.FlowControlResponse;
import io.sermant.flowcontrol.common.entity.FlowControlResult;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import io.sermant.flowcontrol.common.xds.circuit.XdsCircuitBreakerManager;
import io.sermant.flowcontrol.inject.ErrorCloseableHttpResponse;
import io.sermant.flowcontrol.service.rest4j.XdsHttpFlowControlService;

import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.lang.reflect.Method;
import java.net.URI;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;

/**
* HttpClient4xIntercepto test
*
* @author zhp
* @since 2025-04-11
*/
public class HttpClient4xInterceptorTest {
private MockedStatic<PluginConfigManager> pluginConfigManagerMockedStatic;

private MockedStatic<PluginServiceManager> pluginServiceManagerMockedStatic;

private MockedStatic<ServiceManager> serviceManagerMockedStatic;

private XdsHttpFlowControlService xdsHttpFlowControlService;

private HttpGet httpRequest;

private Method method = this.getClass().getMethod("getResult", null);

private XdsFlowControlService xdsFlowControlService;

public HttpClient4xInterceptorTest() throws NoSuchMethodException {
}

@After
public void tearDown() {
pluginConfigManagerMockedStatic.close();
pluginServiceManagerMockedStatic.close();
serviceManagerMockedStatic.close();
}

/**
* preinitialization
*
* @throws Exception initialization failure thrown
*/
@Before
public void before() throws Exception {
pluginConfigManagerMockedStatic = Mockito
.mockStatic(PluginConfigManager.class);
pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(FlowControlConfig.class))
.thenReturn(new FlowControlConfig());
pluginServiceManagerMockedStatic = Mockito.mockStatic(PluginServiceManager.class);
xdsHttpFlowControlService = Mockito.mock(XdsHttpFlowControlService.class);
pluginServiceManagerMockedStatic.when(() -> PluginServiceManager.getPluginService(XdsHttpFlowControlService.class))
.thenReturn(xdsHttpFlowControlService);
serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class);
XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class);
serviceManagerMockedStatic.when(() -> ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService);
xdsFlowControlService = Mockito.mock(XdsFlowControlService.class);
Mockito.when(xdsCoreService.getXdsFlowControlService()).thenReturn(xdsFlowControlService);
method = Mockito.mock(Method.class);
httpRequest = new HttpGet();
}

@Test
public void test() throws NoSuchMethodException {
HttpClient4xInterceptor interceptor = new HttpClient4xInterceptor();
// test parameter type is incorrect
ExecuteContext executeContext = buildErrorContext();
interceptor.doBefore(executeContext);
Assert.assertNull(executeContext.getResult());

// test request URL does not meet the requirements
executeContext = buildContext();
String resultMsg = "success";
int code = HttpStatus.SC_BAD_REQUEST;
AtomicBoolean triggeredFlag = new AtomicBoolean(true);
doAnswer(invocation -> {
if (triggeredFlag.get()) {
Object[] args = invocation.getArguments();
FlowControlResult flowControlResult = (FlowControlResult) args[1];
flowControlResult.setSkip(true);
flowControlResult.setResponse(new FlowControlResponse(resultMsg, code));
}
return null;
}).when(xdsHttpFlowControlService).onBefore(any(), any());
interceptor.doBefore(executeContext);
Assert.assertNull(executeContext.getResult());

// test triggered flow control rules, abort the request
httpRequest.setURI(URI.create("http://provider:8080/path"));
interceptor.doBefore(executeContext);
Object result = executeContext.getResult();
Assert.assertTrue(result instanceof ErrorCloseableHttpResponse);
ErrorCloseableHttpResponse response = (ErrorCloseableHttpResponse) result;
Assert.assertEquals(code, response.getStatusLine().getStatusCode());
Assert.assertEquals(resultMsg, response.getStatusLine().getReasonPhrase());

// test trigger circuit breaker rules
triggeredFlag.set(false);
XdsRequestCircuitBreakers xdsRequestCircuitBreakers = new XdsRequestCircuitBreakers();
Mockito.when(xdsFlowControlService.getRequestCircuitBreakers(any(), any()))
.thenReturn(Optional.of(xdsRequestCircuitBreakers));
xdsRequestCircuitBreakers.setMaxRequests(1);
FlowControlScenario scenario = new FlowControlScenario();
scenario.setClusterName("test");
scenario.setServiceName("provider");
XdsCircuitBreakerManager.incrementActiveRequests(scenario.getServiceName(), scenario.getClusterName());
XdsThreadLocalUtil.setScenarioInfo(scenario);
interceptor.doBefore(executeContext);
result = executeContext.getResult();
Assert.assertTrue(result instanceof ErrorCloseableHttpResponse);
response = (ErrorCloseableHttpResponse) result;
Assert.assertEquals(HttpStatus.SC_INTERNAL_SERVER_ERROR, response.getStatusLine().getStatusCode());
Assert.assertEquals("CircuitBreaker has forced open and deny any requests",
response.getStatusLine().getReasonPhrase());
}

private ExecuteContext buildContext() throws NoSuchMethodException {
httpRequest.setURI(URI.create("http://127.0.0.1:8080/path"));
return ExecuteContext.forMemberMethod(new HttpClient4xInterceptorTest(), method, new Object[]{null, httpRequest}, null, null);
}

private ExecuteContext buildErrorContext() throws NoSuchMethodException {
return ExecuteContext.forMemberMethod(new HttpClient4xInterceptorTest(), method, new Object[]{null, ""}, null, null);
}

public String getResult(){
return "success";
}
}
Loading
Loading