diff --git a/.github/actions/common/plugin-change-check/action.yml b/.github/actions/common/plugin-change-check/action.yml index fdd12b5b82..e4e7f44ee8 100644 --- a/.github/actions/common/plugin-change-check/action.yml +++ b/.github/actions/common/plugin-change-check/action.yml @@ -437,6 +437,27 @@ runs: shell: bash run: | echo "rocketmqTagTransmissionChanged=${{ steps.changed-tag-transmission-rocketmq.outputs.changed }}" >> $GITHUB_ENV + - uses: ktamas77/has-changed-path@v1.0.3 + id: changed-tag-transmission-rabbitmq + with: + paths: sermant-plugins/sermant-tag-transmission/tag-transmission-common + sermant-plugins/sermant-tag-transmission/tag-transmission-crossthread-plugin + sermant-plugins/sermant-tag-transmission/tag-transmission-service + sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin + sermant-plugins/sermant-tag-transmission/config + sermant-integration-tests/tag-transmission-test/tag-transmission-integration-test + sermant-integration-tests/tag-transmission-test/tag-transmission-util-demo + sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo + sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo + sermant-integration-tests/tag-transmission-test/midware-common-demo + sermant-integration-tests/tag-transmission-test/httpserver-common-demo + ./.github/workflows/tagtransmission_integration_test.yml + ./.github/actions/common/tag-transmission + ./.github/actions/scenarios/tag-transmission/rabbitmq + - name: env tag-transmission-rabbitmq + shell: bash + run: | + echo "rabbitMqTagTransmissionChanged=${{ steps.changed-tag-transmission-rabbitmq.outputs.changed }}" >> $GITHUB_ENV - uses: ktamas77/has-changed-path@v1.0.3 id: changed-tag-transmission-servicecomb with: @@ -1027,3 +1048,10 @@ runs: ${{ env.sermantFlowcontrolChanged }} == 'true'];then echo "enableXdsFlowControl=true" >> $GITHUB_ENV fi + + # ==========rabbit mq Tag Transmission is needed to test?========== + if [ ${{ env.rabbitMqTagTransmissionChanged }} == 'true' -o \ + ${{ env.sermantAgentCoreChanged }} == 'true' -o \ + ${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then + echo "enableRabbitMqTagTransmissionAction=true" >> $GITHUB_ENV + fi diff --git a/.github/actions/scenarios/tag-transmission/rabbitmq/action.yml b/.github/actions/scenarios/tag-transmission/rabbitmq/action.yml new file mode 100644 index 0000000000..8afdfcb6da --- /dev/null +++ b/.github/actions/scenarios/tag-transmission/rabbitmq/action.yml @@ -0,0 +1,70 @@ +name: "Tag Transmission Plugin RabbitMQ Test" +description: "Auto test for tag transmission by rabbit mq" +runs: + using: composite + steps: + - name: entry + uses: ./.github/actions/common/entry + with: + log-dir: ./logs/tag-transmission/rabbitmq + - name: install rabbit mq + shell: bash + run: | + sudo apt-get update + sudo apt-get install -y apt-transport-https ca-certificates curl gnupg-agent software-properties-common + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - + sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" + sudo apt-get update + sudo apt-get install -y docker-ce docker-ce-cli containerd.io + docker run -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management + - name: package demos + shell: bash + run: | + mvn package -Drabbitmq-client.version=${{ matrix.rabbitmqVersion }} -DskipTests -Prabbitmq-test --file \ + sermant-integration-tests/tag-transmission-test/pom.xml + - name: start rabbitmq producer demo + shell: bash + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=default -jar \ + -Dsermant_log_dir=${{ env.logDir }}/sermant \ + sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/target/rabbitmq-producer-demo.jar > ${{ env.logDir }}/rabbitmq-producer.log 2>&1 & + - name: start rabbitmq consumer demo + shell: bash + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=default -jar \ + -Dsermant_log_dir=${{ env.logDir }}/sermant \ + sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/target/rabbitmq-consumer-demo.jar > ${{ env.logDir }}/rabbitmq-consumer.log 2>&1 & + - name: waiting for services start + shell: bash + run: | + docker ps | grep rabbit + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:15672 120 + ps -ef | grep java + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9040/common/httpServer 120 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9057/rabbitMqProducer/checkRabbitMqProducerStatus 120 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9056/rabbitMqConsumer/queryRabbitMqTag 120 + - name: test rabbitmq + shell: bash + run: | + mvn test -Dtag.transmission.integration.test.type=RABBITMQ --file \ + sermant-integration-tests/tag-transmission-test/tag-transmission-integration-test/pom.xml + - name: test rabbitmq consecutive messages + shell: bash + run: | + mvn test -Dtag.transmission.integration.test.type=RABBITMQ-CONSECUTIVE-MESSAGE --file \ + sermant-integration-tests/tag-transmission-test/tag-transmission-integration-test/pom.xml + - name: exit + if: always() + uses: ./.github/actions/common/exit + with: + processor-keyword: httpserver-common + - name: if failure then upload error log + uses: actions/upload-artifact@v4 + if: ${{ failure() || cancelled() }} + with: + name: (${{ github.job }})-tag-transmission-rabbitmq-(${{ matrix.rabbitMqVersion }}-logs + path: | + ./*.log + ./logs/** + if-no-files-found: warn + retention-days: 2 diff --git a/.github/workflows/tagtransmission_integration_test.yml b/.github/workflows/tagtransmission_integration_test.yml index bf2af48760..cc2bd460d4 100644 --- a/.github/workflows/tagtransmission_integration_test.yml +++ b/.github/workflows/tagtransmission_integration_test.yml @@ -49,6 +49,7 @@ jobs: echo "enableJettyTagTransmissionAction=${{env.enableJettyTagTransmissionAction}}" >> $GITHUB_OUTPUT echo "enableTomcatTagTransmissionAction=${{env.enableTomcatTagTransmissionAction}}" >> $GITHUB_OUTPUT echo "enableJakartaServlet56TagTransmissionAction=${{env.enableJakartaServlet56TagTransmissionAction}}" >> $GITHUB_OUTPUT + echo "enableRabbitMqTagTransmissionAction=${{env.enableRabbitMqTagTransmissionAction}}" >> $GITHUB_OUTPUT outputs: enableHttpclientV3TagTransmissionAction: ${{ steps.set-outputs.outputs.enableHttpclientV3TagTransmissionAction }} enableHttpclientV4TagTransmissionAction: ${{ steps.set-outputs.outputs.enableHttpclientV4TagTransmissionAction }} @@ -65,6 +66,7 @@ jobs: enableJettyTagTransmissionAction: ${{ steps.set-outputs.outputs.enableJettyTagTransmissionAction }} enableTomcatTagTransmissionAction: ${{ steps.set-outputs.outputs.enableTomcatTagTransmissionAction }} enableJakartaServlet56TagTransmissionAction: ${{ steps.set-outputs.outputs.enableJakartaServlet56TagTransmissionAction }} + enableRabbitMqTagTransmissionAction: ${{ steps.set-outputs.outputs.enableRabbitMqTagTransmissionAction }} download-midwares-and-cache: name: download midwares and cache runs-on: ubuntu-latest @@ -611,3 +613,28 @@ jobs: uses: ./.github/actions/common/tag-transmission - name: tag transmission test for jakarta-servlet-java8 uses: ./.github/actions/scenarios/tag-transmission/jakarta-servlet-java8 + test-for-tag-transmission-rabbitmq: + name: Test for tag transmission rabbitmq + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableRabbitMqTagTransmissionAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache] + strategy: + matrix: + include: + - rabbitMqVersion: "5.25.0" + - rabbitMqVersion: "5.21.0" + - rabbitMqVersion: "5.16.0" + - rabbitMqVersion: "5.9.0" + - rabbitMqVersion: "5.0.0" + fail-fast: false + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/tag-transmission + - name: tag transmission test for rabbitMqVersion=${{ matrix.rabbitMqVersion }} + uses: ./.github/actions/scenarios/tag-transmission/rabbitmq diff --git a/.gitignore b/.gitignore index 186f107912..7667e9f2af 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,6 @@ node_modules/ node/ sermant-integration-tests/tag-transmission-test/grpc-api-demo/src/main/java/io/sermant/demo/tagtransmission/grpc/api sermant-*.tar.gz + +# os +.DS_Store diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/CollectionUtils.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/CollectionUtils.java index f8d156ab6d..80f6406794 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/CollectionUtils.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/CollectionUtils.java @@ -37,4 +37,16 @@ private CollectionUtils() { public static boolean isEmpty(Collection collection) { return collection == null || collection.isEmpty(); } + + /** + * Get the first element of the collection + * + * @param collection collection + * @param type + * @param orElse when the collection is null or empty, return this + * @return first element of the collection, or orElse if the collection is null or empty + */ + public static T getFirst(Collection collection, T orElse) { + return isEmpty(collection) ? orElse : collection.iterator().next(); + } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/StringUtils.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/StringUtils.java index 8c394c132c..e179bd427a 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/StringUtils.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/StringUtils.java @@ -183,4 +183,15 @@ public static String trim(String target) { public static String getString(Object object) { return object == null ? "" : object.toString(); } + + /** + * Get the object's toString information + * + * @param object object + * @param defaultValue default value + * @return string if object is null, default value otherwise + */ + public static String getString(Object object, String defaultValue) { + return object == null ? defaultValue : object.toString(); + } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/utils/CollectionUtilsTest.java b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/utils/CollectionUtilsTest.java new file mode 100644 index 0000000000..f1adb977bc --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/utils/CollectionUtilsTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.core.utils; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Test for CollectionUtils + * + * @since 2025-07-05 + */ +public class CollectionUtilsTest { + + @Test + public void testGetFirst_whenCollectionIsEmpty() { + List emptyCollection = Collections.emptyList(); + String result = CollectionUtils.getFirst(emptyCollection, "default"); + Assertions.assertEquals("default", result); + } + + @Test + public void testGetFirst_whenCollectionIsNotEmpty() { + List collection = Arrays.asList("apple", "banana", "cherry"); + String result = CollectionUtils.getFirst(collection, "default"); + Assertions.assertEquals("apple", result); + } + + @Test + public void testGetFirst_whenCollectionIsNull() { + List nullCollection = null; + String result = CollectionUtils.getFirst(nullCollection, "default"); + Assertions.assertEquals("default", result); + } + + @Test + public void testGetFirst_whenCollectionHasMultipleElements() { + List collection = Arrays.asList(10, 20, 30); + Integer result = CollectionUtils.getFirst(collection, 0); + Assertions.assertEquals(10, result); + } +} diff --git a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/utils/StringUtilsTest.java b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/utils/StringUtilsTest.java new file mode 100644 index 0000000000..e48d09ecf9 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/utils/StringUtilsTest.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.core.utils; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test for StringUtils + * + * @since 2025-07-28 + */ +class StringUtilsTest { + @Test + public void testGetString() { + assertEquals("default", StringUtils.getString(null, "default")); + assertEquals("test", StringUtils.getString("test", "")); + } +} diff --git a/sermant-integration-tests/tag-transmission-test/midware-common-demo/src/main/java/io/sermant/demo/tagtransmission/midware/common/MessageConstant.java b/sermant-integration-tests/tag-transmission-test/midware-common-demo/src/main/java/io/sermant/demo/tagtransmission/midware/common/MessageConstant.java index 6febdf11cb..ec15d4b75f 100644 --- a/sermant-integration-tests/tag-transmission-test/midware-common-demo/src/main/java/io/sermant/demo/tagtransmission/midware/common/MessageConstant.java +++ b/sermant-integration-tests/tag-transmission-test/midware-common-demo/src/main/java/io/sermant/demo/tagtransmission/midware/common/MessageConstant.java @@ -73,6 +73,11 @@ public class MessageConstant { */ public static final int KAFKA_CONSUMER_TIMEOUT = 100; + /** + * rabbitmq message body + */ + public static final String MESSAGE_BODY_RABBITMQ = "hello inner rabbitmq:"; + private MessageConstant() { } } diff --git a/sermant-integration-tests/tag-transmission-test/pom.xml b/sermant-integration-tests/tag-transmission-test/pom.xml index aa29d40f5b..5108022995 100644 --- a/sermant-integration-tests/tag-transmission-test/pom.xml +++ b/sermant-integration-tests/tag-transmission-test/pom.xml @@ -33,6 +33,7 @@ 2.6.12 1.52.1 3.21.7 + 5.20.0 @@ -119,6 +120,15 @@ tag-transmission-util-demo + + rabbitmq-test + + rabbitmq-consumer-demo + rabbitmq-producer-demo + midware-common-demo + tag-transmission-util-demo + + kafka-test diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/pom.xml b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/pom.xml new file mode 100644 index 0000000000..d283ee8772 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/pom.xml @@ -0,0 +1,49 @@ + + + + tag-transmission-test + io.sermant.tagtransmission + 1.0.0 + + 4.0.0 + + rabbitmq-consumer-demo + + + 8 + 8 + + + + + io.sermant.tagtransmission + midware-common-demo + + + io.sermant.tagtransmission + tag-transmission-util-demo + + + org.springframework.boot + spring-boot-starter-web + + + com.rabbitmq + amqp-client + ${rabbitmq-client.version} + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/RabbitMqConsumer.java b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/RabbitMqConsumer.java new file mode 100644 index 0000000000..c9062addfb --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/RabbitMqConsumer.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.tagtransmission.rabbitmq.consumer; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +import io.sermant.demo.tagtransmission.rabbitmq.consumer.controller.RabbitMqConsumerController; +import io.sermant.demo.tagtransmission.util.HttpClientUtils; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +/** + * RabbitMQ consumer + * + * @since 2025-06-27 + */ +@Component +public class RabbitMqConsumer implements CommandLineRunner { + /** + * Store traffic tags returned by HTTP server called by consumer + */ + public static final Map RABBITMQ_TAG_MAP = new HashMap<>(); + + @Value("${common.server.url}") + private String commonServerUrl; + + @Value("${rabbitmq.address}") + private String rabbitMqAddress; + + @Value("${rabbitmq.port}") + private int rabbitMqPort; + + @Value("${rabbitmq.queue}") + private String queueName; + + @Override + public void run(String[] args) throws IOException, TimeoutException { + consumeData(); + } + + /** + * Consume data from RabbitMQ + * + * @throws IOException when connection fails + * @throws TimeoutException when operation times out + */ + private void consumeData() throws IOException, TimeoutException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(rabbitMqAddress); + factory.setPort(rabbitMqPort); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + channel.queueDeclare(queueName, false, false, false, null); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + RABBITMQ_TAG_MAP.put(RabbitMqConsumerController.RABBITMQ_TAG, + HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)); + }; + + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); + } +} diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/RabbitMqConsumerApplication.java b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/RabbitMqConsumerApplication.java new file mode 100644 index 0000000000..25499759d5 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/RabbitMqConsumerApplication.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.tagtransmission.rabbitmq.consumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * SpringBoot startup class + * + * @since 2025-06-27 + */ +@SpringBootApplication +public class RabbitMqConsumerApplication { + /** + * Main method to start the application + * + * @param args process startup parameters + */ + public static void main(String[] args) { + SpringApplication.run(RabbitMqConsumerApplication.class, args); + } +} diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/controller/RabbitMqConsumerController.java b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/controller/RabbitMqConsumerController.java new file mode 100644 index 0000000000..31ae700b29 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/consumer/controller/RabbitMqConsumerController.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.tagtransmission.rabbitmq.consumer.controller; + +import io.sermant.demo.tagtransmission.rabbitmq.consumer.RabbitMqConsumer; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +/** + * RabbitMQ message middleware consumer controller + * + * @since 2025-06-27 + */ +@RestController +@RequestMapping(value = "rabbitMqConsumer") +public class RabbitMqConsumerController { + + /** + * Tag key + */ + public static final String RABBITMQ_TAG = "rabbitmqTag"; + + /** + * Query traffic tag transmission returned by RabbitMQ consumer after consuming messages + * + * @return traffic tag string + */ + @RequestMapping(value = "queryRabbitMqTag", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public String queryRabbitMqTag() { + String trafficTag = RabbitMqConsumer.RABBITMQ_TAG_MAP.get(RABBITMQ_TAG); + + // Remove traffic tag to avoid interfering with next test query + RabbitMqConsumer.RABBITMQ_TAG_MAP.remove(RABBITMQ_TAG); + return trafficTag; + } +} diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/resources/application.properties b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/resources/application.properties new file mode 100644 index 0000000000..eea33096ad --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-consumer-demo/src/main/resources/application.properties @@ -0,0 +1,5 @@ +common.server.url=http://127.0.0.1:9040/common/httpServer +rabbitmq.address=127.0.0.1 +rabbitmq.port=5672 +rabbitmq.queue=traffic_tag_test +server.port=9056 diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/pom.xml b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/pom.xml new file mode 100644 index 0000000000..1246d8f0b9 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/pom.xml @@ -0,0 +1,48 @@ + + + + tag-transmission-test + io.sermant.tagtransmission + 1.0.0 + + 4.0.0 + + rabbitmq-producer-demo + + + 8 + 8 + + + + + io.sermant.tagtransmission + midware-common-demo + + + io.sermant.tagtransmission + tag-transmission-util-demo + + + org.springframework.boot + spring-boot-starter-web + + + com.rabbitmq + amqp-client + ${rabbitmq-client.version} + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/producer/RabbitMqProducerApplication.java b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/producer/RabbitMqProducerApplication.java new file mode 100644 index 0000000000..7dcc15f368 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/producer/RabbitMqProducerApplication.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.tagtransmission.rabbitmq.producer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * SpringBoot startup class + * + * @since 2025-06-27 + */ +@SpringBootApplication +public class RabbitMqProducerApplication { + /** + * Main method to start the application + * + * @param args process startup parameters + */ + public static void main(String[] args) { + SpringApplication.run(RabbitMqProducerApplication.class, args); + } +} diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/producer/controller/RabbitMqProducerController.java b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/producer/controller/RabbitMqProducerController.java new file mode 100644 index 0000000000..9a28e06027 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/java/io/sermant/demo/tagtransmission/rabbitmq/producer/controller/RabbitMqProducerController.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.tagtransmission.rabbitmq.producer.controller; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +import io.sermant.demo.tagtransmission.midware.common.MessageConstant; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeoutException; + +/** + * RabbitMQ message middleware producer controller for producing messages + * + * @since 2025-06-27 + */ +@RestController +@RequestMapping(value = "rabbitMqProducer") +public class RabbitMqProducerController { + @Value("${rabbitmq.address}") + private String rabbitMqAddress; + + @Value("${rabbitmq.port}") + private int rabbitMqPort; + + @Value("${rabbitmq.queue}") + private String queueName; + + /** + * Check if producer process is started normally + * + * @return status string + */ + @RequestMapping(value = "checkRabbitMqProducerStatus", method = RequestMethod.GET, + produces = MediaType.TEXT_PLAIN_VALUE) + public String checkRabbitMqProducerStatus() { + return "ok"; + } + + /** + * Produce a message via RabbitMQ + * + * @return success message + * @throws IOException when connection fails + * @throws TimeoutException when operation times out + */ + @RequestMapping(value = "testRabbitMqProducer", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public String testRabbitMqProducer() throws IOException, TimeoutException { + produceRabbitMqData(); + return "rabbitmq-produce-message-success"; + } + + /** + * Produce RabbitMQ data + * + * @throws IOException when connection fails + * @throws TimeoutException when operation times out + */ + private void produceRabbitMqData() throws IOException, TimeoutException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(rabbitMqAddress); + factory.setPort(rabbitMqPort); + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, null); + String messageBody = buildMessageBody(MessageConstant.MESSAGE_BODY_RABBITMQ); + channel.basicPublish("", queueName, null, messageBody.getBytes(StandardCharsets.UTF_8)); + } + } + + /** + * Build message body with timestamp + * + * @param body base message body + * @return complete message body with timestamp + */ + private String buildMessageBody(String body) { + DateTimeFormatter dtf = DateTimeFormatter.ofPattern(MessageConstant.TIME_FORMAT); + String messageBody = body + dtf.format(LocalDateTime.now()); + return messageBody; + } +} diff --git a/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/resources/application.properties b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/resources/application.properties new file mode 100644 index 0000000000..d0d0e3162c --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/rabbitmq-producer-demo/src/main/resources/application.properties @@ -0,0 +1,4 @@ +server.port=9057 +rabbitmq.address=127.0.0.1 +rabbitmq.port=5672 +rabbitmq.queue=traffic_tag_test diff --git a/sermant-integration-tests/tag-transmission-test/tag-transmission-integration-test/src/test/java/io/sermant/demo/tagtransmission/integration/TagTransmissionTest.java b/sermant-integration-tests/tag-transmission-test/tag-transmission-integration-test/src/test/java/io/sermant/demo/tagtransmission/integration/TagTransmissionTest.java index 39181c56f2..d65ac5549c 100644 --- a/sermant-integration-tests/tag-transmission-test/tag-transmission-integration-test/src/test/java/io/sermant/demo/tagtransmission/integration/TagTransmissionTest.java +++ b/sermant-integration-tests/tag-transmission-test/tag-transmission-integration-test/src/test/java/io/sermant/demo/tagtransmission/integration/TagTransmissionTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -213,6 +214,50 @@ public void testKafka() throws InterruptedException { checkTagTransmission("http://127.0.0.1:9053/kafkaConsumer/queryKafkaTag", EXACT_TAG_MAP, "kafka", "id"); } + /** + * Test RabbitMQ traffic tag transmission + */ + @Test + @EnabledIfSystemProperty(named = "tag.transmission.integration.test.type", matches = "RABBITMQ") + public void testRabbitmq() throws InterruptedException { + // Check before sending message to prevent misuse of previous traffic tags + Optional checkTagOptional = RequestUtils.get("http://127.0.0.1:9056/rabbitMqConsumer/queryRabbitMqTag", EXACT_TAG_MAP); + if (checkTagOptional.isPresent() && !checkTagOptional.get().equals("")) { + Assertions.assertTrue(false, "invalid tag for rabbitmq"); + } + + // Produce message + RequestUtils.get("http://127.0.0.1:9057/rabbitMqProducer/testRabbitMqProducer", EXACT_TAG_MAP); + + // Sleep 5 seconds, wait for consumer to consume + Thread.sleep(5000); + checkTagTransmission("http://127.0.0.1:9056/rabbitMqConsumer/queryRabbitMqTag", EXACT_TAG_MAP, "rabbitmq"); + } + + /** + * Test RabbitMQ traffic tag transmission with consecutive messages + */ + @Test + @EnabledIfSystemProperty(named = "tag.transmission.integration.test.type", matches = "RABBITMQ-CONSECUTIVE-MESSAGE") + public void testRabbitmqConsecutiveMessages() throws InterruptedException { + // Check before sending message to prevent misuse of previous traffic tags + Optional checkTagOptional = RequestUtils.get("http://127.0.0.1:9056/rabbitMqConsumer/queryRabbitMqTag", EXACT_TAG_MAP); + if (checkTagOptional.isPresent() && !checkTagOptional.get().equals("")) { + Assertions.assertTrue(false, "invalid tag for rabbitmq"); + } + + for (int i = 0; i < 10; i++) { + Map tagMap = Collections.singletonMap("id", String.valueOf(i)); + + // Produce message + RequestUtils.get("http://127.0.0.1:9057/rabbitMqProducer/testRabbitMqProducer", tagMap); + + // Sleep 5 seconds, wait for consumer to consume + Thread.sleep(5000); + checkTagTransmission("http://127.0.0.1:9056/rabbitMqConsumer/queryRabbitMqTag", tagMap, "rabbitmq"); + } + } + /** * 测试流量标签前缀匹配、后缀匹配和动态配置,调用tomcat-demo模块的接口完成测试 */ diff --git a/sermant-plugins/sermant-tag-transmission/pom.xml b/sermant-plugins/sermant-tag-transmission/pom.xml index 04bdb786e0..195c57b0c0 100644 --- a/sermant-plugins/sermant-tag-transmission/pom.xml +++ b/sermant-plugins/sermant-tag-transmission/pom.xml @@ -57,6 +57,7 @@ tag-transmission-feign11.x-plugin tag-transmission-jakarta-servlet-plugin tag-transmission-okhttp34-plugin + tag-transmission-rabbitmq5.x-plugin @@ -83,6 +84,7 @@ tag-transmission-feign11.x-plugin tag-transmission-jakarta-servlet-plugin tag-transmission-okhttp34-plugin + tag-transmission-rabbitmq5.x-plugin @@ -109,6 +111,7 @@ tag-transmission-feign11.x-plugin tag-transmission-jakarta-servlet-plugin tag-transmission-okhttp34-plugin + tag-transmission-rabbitmq5.x-plugin diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/pom.xml b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/pom.xml new file mode 100644 index 0000000000..689b9df541 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/pom.xml @@ -0,0 +1,91 @@ + + + + + sermant-tag-transmission + io.sermant + 1.0.0 + + 4.0.0 + + tag-transmission-rabbitmq5.x-plugin + + + 8 + 8 + false + plugin + 5.20.0 + 2.0.9 + 3.27.3 + + + + + io.sermant + sermant-agentcore-core + provided + + + io.sermant + tag-transmission-common + + + com.rabbitmq + amqp-client + ${rabbitmq-java-client.version} + provided + + + junit + junit + test + + + org.assertj + assertj-core + ${assertj.version} + test + + + org.mockito + mockito-core + test + + + org.mockito + mockito-inline + test + + + org.powermock + powermock-core + ${powermock.version} + test + + + org.powermock + powermock-api-mockito2 + ${powermock.version} + test + + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + + diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/declarers/RabbitmqConsumerDeclarer.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/declarers/RabbitmqConsumerDeclarer.java new file mode 100644 index 0000000000..b1c1506580 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/declarers/RabbitmqConsumerDeclarer.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.declarers; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.tag.transmission.rabbitmqv5.interceptor.RabbitmqConsumerInterceptor; + +/** + * RabbitMQ enhanced Consumer declarer for Traffic Label Transmission,supports RabbitMQ5.x + * + * @since 2025-06-27 + */ +public class RabbitmqConsumerDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "com.rabbitmq.client.impl.ChannelN"; + + private static final String METHOD_NAME = "basicConsume"; + + private static final String BOOLEAN_TYPE = "boolean"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + BOOLEAN_TYPE, + "java.lang.String", + BOOLEAN_TYPE, + BOOLEAN_TYPE, + "java.util.Map", + "com.rabbitmq.client.Consumer" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME) + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), new RabbitmqConsumerInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/declarers/RabbitmqProducerSendDeclarer.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/declarers/RabbitmqProducerSendDeclarer.java new file mode 100644 index 0000000000..c75d484d56 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/declarers/RabbitmqProducerSendDeclarer.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.declarers; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.tag.transmission.rabbitmqv5.interceptor.RabbitmqProducerInterceptor; + +/** + * RabbitMQ enhanced producer declarer for Traffic Label Transmission,supports RabbitMQ5.x + * + * @since 2025-06-27 + */ +public class RabbitmqProducerSendDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "com.rabbitmq.client.impl.ChannelN"; + + private static final String METHOD_NAME = "basicPublish"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + "java.lang.String", + "boolean", + "boolean", + "com.rabbitmq.client.AMQP$BasicProperties", + "byte[]" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME) + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), new RabbitmqProducerInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqConsumerInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqConsumerInterceptor.java new file mode 100644 index 0000000000..bc0204d975 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqConsumerInterceptor.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.interceptor; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Consumer; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.StringUtils; +import io.sermant.core.utils.tag.TrafficTag; +import io.sermant.core.utils.tag.TrafficUtils; +import io.sermant.tag.transmission.config.strategy.TagKeyMatcher; +import io.sermant.tag.transmission.interceptors.AbstractServerInterceptor; +import io.sermant.tag.transmission.rabbitmqv5.wrapper.ConsumerWrapper; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * RabbitMQ consumer interceptor for transparent transmission of traffic tags, supports RabbitMQ5.x + * + * @since 2025-06-27 + */ +public class RabbitmqConsumerInterceptor extends AbstractServerInterceptor { + + private static final Logger LOGGER = LoggerFactory.getLogger(); + private static final int CONSUMER_INDEX = 6; + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + Object[] arguments = context.getArguments(); + + // wrapped consumer + Consumer consumeCallback = (Consumer) arguments[CONSUMER_INDEX]; + arguments[CONSUMER_INDEX] = new ConsumerWrapper(consumeCallback, this::updateTrafficTag); + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + return context; + } + + /** + * Parse traffic tag from properties + * + * @param properties RabbitMQ consumer traffic tag carrier + * @return traffic tag map + */ + @Override + protected Map> extractTrafficTagFromCarrier(AMQP.BasicProperties properties) { + return Optional.ofNullable(properties) + .map(AMQP.BasicProperties::getHeaders) + .orElse(Collections.emptyMap()) + .entrySet() + .stream() + .filter(entry -> TagKeyMatcher.isMatch(entry.getKey())) + .peek(entry -> LOGGER.log(Level.FINE, "Traffic tag {0} have been extracted from rabbitmq.", entry)) + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> Collections.singletonList(StringUtils.getString(entry.getValue(), null)) + )); + } + + /** + * Update traffic tag + * + * @param properties RabbitMQ properties + */ + protected void updateTrafficTag(AMQP.BasicProperties properties) { + Map> tagMap = extractTrafficTagFromCarrier(properties); + TrafficUtils.setTrafficTag(new TrafficTag(tagMap)); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqProducerInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqProducerInterceptor.java new file mode 100644 index 0000000000..fca2e97e2e --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqProducerInterceptor.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.interceptor; + +import com.rabbitmq.client.AMQP; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.core.utils.tag.TrafficUtils; +import io.sermant.tag.transmission.config.strategy.TagKeyMatcher; +import io.sermant.tag.transmission.interceptors.AbstractClientInterceptor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Producer interceptor for RabbitMQ traffic tag transparent transmission, supporting RabbitMQ5.x + * + * @since 2025-06-27 + */ +public class RabbitmqProducerInterceptor extends AbstractClientInterceptor { + + private static final Logger LOGGER = LoggerFactory.getLogger(); + private static final int PROPERTY_INDEX = 4; + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + Object[] arguments = context.getArguments(); + + AMQP.BasicProperties properties = initPropertiesIfNecessary(arguments); + injectTrafficTag2Carrier(properties); + + arguments[PROPERTY_INDEX] = properties; + return context; + } + + /** + * init properties field if null + * + * @param arguments args + * @return message properties + */ + protected AMQP.BasicProperties initPropertiesIfNecessary(Object[] arguments) { + AMQP.BasicProperties properties = (AMQP.BasicProperties) arguments[PROPERTY_INDEX]; + if (Objects.isNull(properties)) { + return new AMQP.BasicProperties.Builder().build(); + } + return properties; + } + + /** + * Add traffic tag to AMQP.BasicProperties + * + * @param properties RabbitMQ tag transfer carrier + */ + @Override + protected void injectTrafficTag2Carrier(AMQP.BasicProperties properties) { + Map originHeaders = Optional.of(properties) + .map(AMQP.BasicProperties::getHeaders) + .orElse(Collections.emptyMap()); + + Map newHeaders = new HashMap<>(originHeaders); + TrafficUtils.getTrafficTag().getTag().entrySet() + .stream() + .filter(entry -> TagKeyMatcher.isMatch(entry.getKey())) + .filter(entry -> !originHeaders.containsKey(entry.getKey())) + .peek(entry -> LOGGER.log(Level.FINE, "Traffic tag {0} have been filtered.", entry)) + .forEach(entry -> newHeaders.put(entry.getKey(), CollectionUtils.getFirst(entry.getValue(), null))); + + // reflection set field value + ReflectUtils.setFieldValue(properties, "headers", newHeaders); + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + return context; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/wrapper/ConsumerWrapper.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/wrapper/ConsumerWrapper.java new file mode 100644 index 0000000000..1bc9824b18 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/java/io/sermant/tag/transmission/rabbitmqv5/wrapper/ConsumerWrapper.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.wrapper; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; + +import java.io.IOException; + +/** + * consumer wrapper + * + * @since 2025-07-15 + */ +public class ConsumerWrapper implements Consumer { + + private final Consumer delegate; + private final java.util.function.Consumer updateTagFunction; + + /** + * constructor + * + * @param delegate consumer delegate + * @param updateTagFunction update tag function + */ + public ConsumerWrapper(Consumer delegate, java.util.function.Consumer updateTagFunction) { + this.delegate = delegate; + this.updateTagFunction = updateTagFunction; + } + + /** + * Called when a basic.deliver is received for this consumer. + */ + @Override + public void handleDelivery(final String consumerTag, + final Envelope envelope, + final AMQP.BasicProperties properties, + final byte[] body) throws IOException { + // update tag then proxy + updateTagFunction.accept(properties); + this.delegate.handleDelivery(consumerTag, envelope, properties, body); + } + + @Override + public void handleConsumeOk(final String consumerTag) { + this.delegate.handleConsumeOk(consumerTag); + } + + @Override + public void handleCancelOk(final String consumerTag) { + this.delegate.handleCancelOk(consumerTag); + } + + @Override + public void handleCancel(final String consumerTag) throws IOException { + this.delegate.handleCancel(consumerTag); + } + + @Override + public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) { + this.delegate.handleShutdownSignal(consumerTag, sig); + } + + @Override + public void handleRecoverOk(final String consumerTag) { + this.delegate.handleRecoverOk(consumerTag); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer new file mode 100644 index 0000000000..d0040a1234 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -0,0 +1,2 @@ +io.sermant.tag.transmission.rabbitmqv5.declarers.RabbitmqConsumerDeclarer +io.sermant.tag.transmission.rabbitmqv5.declarers.RabbitmqProducerSendDeclarer diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/BaseInterceptorTest.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/BaseInterceptorTest.java new file mode 100644 index 0000000000..6d4a46969d --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/BaseInterceptorTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.interceptor; + +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.core.utils.tag.TrafficUtils; +import io.sermant.tag.transmission.config.TagTransmissionConfig; + +import org.junit.After; +import org.junit.Before; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base test class for RabbitMQ interceptors + * + * @since 2025-07-05 + */ +public class BaseInterceptorTest { + public final TagTransmissionConfig tagTransmissionConfig = new TagTransmissionConfig(); + + public MockedStatic pluginConfigManagerMockedStatic; + + public BaseInterceptorTest() { + pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(TagTransmissionConfig.class)) + .thenReturn(tagTransmissionConfig); + } + + @Before + public void before() { + tagTransmissionConfig.setEnabled(true); + List tagKeys = new ArrayList<>(); + tagKeys.add("id"); + tagKeys.add("name"); + Map> matchRule = new HashMap<>(); + matchRule.put("exact", tagKeys); + tagTransmissionConfig.setMatchRule(matchRule); + TrafficUtils.removeTrafficTag(); + } + + @After + public void after() { + TrafficUtils.removeTrafficTag(); + pluginConfigManagerMockedStatic.close(); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqConsumerInterceptorTest.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqConsumerInterceptorTest.java new file mode 100644 index 0000000000..854ab931b3 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqConsumerInterceptorTest.java @@ -0,0 +1,205 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.interceptor; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.tag.TrafficTag; +import io.sermant.core.utils.tag.TrafficUtils; +import io.sermant.tag.transmission.rabbitmqv5.wrapper.ConsumerWrapper; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Test for RabbitMQ consumer interceptor + * + * @since 2025-07-05 + */ +public class RabbitmqConsumerInterceptorTest extends BaseInterceptorTest { + + private final RabbitmqConsumerInterceptor interceptor; + private final Object[] arguments; + + public RabbitmqConsumerInterceptorTest() { + interceptor = new RabbitmqConsumerInterceptor(); + arguments = new Object[7]; + } + + @Test + public void testDoBeforeWithValidConsumer() { + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + + ExecuteContext context = buildContext(new Object(), arguments); + ExecuteContext result = interceptor.doBefore(context); + + assertThat(result).isEqualTo(context); + assertThat(arguments[6]).isInstanceOf(ConsumerWrapper.class); + } + + @Test + public void testDoAfter() { + ExecuteContext context = buildContext(new Object(), null); + + ExecuteContext result = interceptor.doAfter(context); + + assertThat(result).isEqualTo(context); + } + + @Test + public void testExtractSingleTag() throws IOException { + Map headers = Collections.singletonMap("id", "testId001"); + AMQP.BasicProperties properties = createProperties(headers); + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + ExecuteContext context = buildContext(new Object(), arguments); + interceptor.doBefore(context); + ConsumerWrapper wrapper = (ConsumerWrapper) arguments[6]; + + wrapper.handleDelivery("consumerTag", mock(Envelope.class), properties, new byte[0]); + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + assertThat(trafficTag).isNotNull(); + assertThat(trafficTag.getTag()).containsEntry("id", Collections.singletonList("testId001")); + } + + @Test + public void testExtractMultipleTags() throws IOException { + Map headers = new HashMap<>(); + headers.put("id", "testId001"); + headers.put("name", "testName001"); + AMQP.BasicProperties properties = createProperties(headers); + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + ExecuteContext context = buildContext(new Object(), arguments); + interceptor.doBefore(context); + ConsumerWrapper wrapper = (ConsumerWrapper) arguments[6]; + + wrapper.handleDelivery("consumerTag", mock(Envelope.class), properties, new byte[0]); + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + assertThat(trafficTag).isNotNull(); + assertThat(trafficTag.getTag()).containsEntry("id", Collections.singletonList("testId001")); + assertThat(trafficTag.getTag()).containsEntry("name", Collections.singletonList("testName001")); + } + + @Test + public void testExtractDifferentTags() throws IOException { + Map headers = Collections.singletonMap("address", "address001"); + AMQP.BasicProperties properties = createProperties(headers); + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + TrafficUtils.setTrafficTag(new TrafficTag(new HashMap<>())); + ExecuteContext context = buildContext(new Object(), arguments); + interceptor.doBefore(context); + ConsumerWrapper wrapper = (ConsumerWrapper) arguments[6]; + + wrapper.handleDelivery("consumerTag", mock(Envelope.class), properties, new byte[0]); + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + assertThat(trafficTag).isNotNull(); + assertThat(trafficTag.getTag()).isEmpty(); + } + + @Test + public void testExtractMixedTags() throws IOException { + Map headers = new HashMap<>(); + headers.put("name", "testName001"); + headers.put("address", "address001"); + AMQP.BasicProperties properties = createProperties(headers); + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + ExecuteContext context = buildContext(new Object(), arguments); + interceptor.doBefore(context); + ConsumerWrapper wrapper = (ConsumerWrapper) arguments[6]; + + wrapper.handleDelivery("consumerTag", mock(Envelope.class), properties, new byte[0]); + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + assertThat(trafficTag).isNotNull(); + assertThat(trafficTag.getTag()).containsEntry("name", Collections.singletonList("testName001")); + assertThat(trafficTag.getTag()).doesNotContainKey("address"); + } + + @Test + public void testExtractNullValue() throws IOException { + Map headers = Collections.singletonMap("id", null); + AMQP.BasicProperties properties = createProperties(headers); + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + ExecuteContext context = buildContext(new Object(), arguments); + interceptor.doBefore(context); + ConsumerWrapper wrapper = (ConsumerWrapper) arguments[6]; + + wrapper.handleDelivery("consumerTag", mock(Envelope.class), properties, new byte[0]); + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + assertThat(trafficTag).isNotNull(); + assertThat(trafficTag.getTag()).containsEntry("id", Collections.singletonList(null)); + } + + @Test + public void testExtractWithNullProperties() throws IOException { + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + ExecuteContext context = buildContext(new Object(), arguments); + interceptor.doBefore(context); + ConsumerWrapper wrapper = (ConsumerWrapper) arguments[6]; + + wrapper.handleDelivery("consumerTag", mock(Envelope.class), null, new byte[0]); + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + assertThat(trafficTag).isNotNull(); + assertThat(trafficTag.getTag()).isEmpty(); + } + + @Test + public void testExtractWithNullHeaders() throws IOException { + AMQP.BasicProperties properties = createProperties(null); + Consumer mockConsumer = mock(Consumer.class); + arguments[6] = mockConsumer; + ExecuteContext context = buildContext(new Object(), arguments); + interceptor.doBefore(context); + ConsumerWrapper wrapper = (ConsumerWrapper) arguments[6]; + + wrapper.handleDelivery("consumerTag", mock(Envelope.class), properties, new byte[0]); + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + assertThat(trafficTag).isNotNull(); + assertThat(trafficTag.getTag()).isEmpty(); + } + + private AMQP.BasicProperties createProperties(Map headers) { + return new AMQP.BasicProperties.Builder() + .headers(headers) + .build(); + } + + protected ExecuteContext buildContext(Object object, Object[] arguments) { + return ExecuteContext.forMemberMethod(object, null, arguments, null, null); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqProducerInterceptorTest.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqProducerInterceptorTest.java new file mode 100644 index 0000000000..9993236926 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-rabbitmq5.x-plugin/src/test/java/io/sermant/tag/transmission/rabbitmqv5/interceptor/RabbitmqProducerInterceptorTest.java @@ -0,0 +1,185 @@ +/* + * Copyright (C) 2025-2025 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.tag.transmission.rabbitmqv5.interceptor; + +import com.rabbitmq.client.AMQP; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.tag.TrafficUtils; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * RabbitMQ5.x tag transmission test + * + * @since 2025-07-05 + */ +public class RabbitmqProducerInterceptorTest extends BaseInterceptorTest { + private final RabbitmqProducerInterceptor interceptor = new RabbitmqProducerInterceptor(); + + @Test + public void testBeforeWithNoHeadersAndNoTags() { + Map originHeaders = Collections.emptyMap(); + Map> tags = Collections.emptyMap(); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders()).isEmpty(); + } + + @Test + public void testBeforeWithHeadersButNoTags() { + Map originHeaders = Collections.singletonMap("defaultKey", "defaultValue"); + Map> tags = Collections.emptyMap(); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders()).containsEntry("defaultKey", "defaultValue"); + } + + @Test + public void testBeforeWithHeadersAndTags() { + Map originHeaders = Collections.singletonMap("defaultKey", "defaultValue"); + Map> tags = Collections.singletonMap("id", Arrays.asList("testId001", "testId002")); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders().size()).isEqualTo(2); + assertThat(properties.getHeaders()).containsEntry("defaultKey", "defaultValue"); + assertThat(properties.getHeaders()).containsEntry("id", "testId001"); + } + + @Test + public void testBeforeWithTagTransmissionConfigDisabled() { + Map originHeaders = Collections.emptyMap(); + Map> tags = Collections.singletonMap("id", Arrays.asList("testId001", "testId002")); + tagTransmissionConfig.setEnabled(false); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders().size()).isEqualTo(0); + tagTransmissionConfig.setEnabled(true); + } + + @Test + public void testBeforeWithExistingTagHeader() { + Map originHeaders = Collections.singletonMap("id", "existingValue"); + Map> tags = Collections.singletonMap("id", Collections.singletonList("tagValue")); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders()).containsEntry("id", "existingValue"); + } + + @Test + public void testTagNotMatch() { + Map originHeaders = Collections.singletonMap("id", "existingValue"); + Map> tags = Collections.singletonMap("notExistTag", Collections.singletonList("tagValue")); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders()).containsEntry("id", "existingValue"); + assertThat(properties.getHeaders()).doesNotContainKey("notExistTag"); + } + + @Test + public void testBeforeWithBoundaryTag() { + Map originHeaders = Collections.emptyMap(); + Map> tags = Collections.singletonMap("id", Collections.emptyList()); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders().get("id")).isNull(); + } + + @Test + public void testBeforeWithMultipleTags() { + Map originHeaders = Collections.emptyMap(); + Map> tags = new HashMap<>(); + tags.put("id", Arrays.asList("testId001", "testId002")); + tags.put("name", Collections.singletonList("testName001")); + ExecuteContext context = buildContext(originHeaders); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders()).containsEntry("id", "testId001"); + assertThat(properties.getHeaders()).containsEntry("name", "testName001"); + } + + @Test + public void testBeforeWithNullProperty() { + ExecuteContext context = ExecuteContext.forMemberMethod(new Object(), null, new Object[5], null, null); + Map> tags = Collections.singletonMap("id", Collections.singletonList("testId001")); + TrafficUtils.updateTrafficTag(tags); + + interceptor.before(context); + + AMQP.BasicProperties properties = (AMQP.BasicProperties) context.getArguments()[4]; + assertThat(properties.getHeaders()).containsEntry("id", "testId001"); + } + + @Test + public void testDoAfter() { + ExecuteContext context = ExecuteContext.forMemberMethod(new Object(), null, null, null, null); + + ExecuteContext resContext = interceptor.doAfter(context); + + assertThat(resContext.getObject()).isEqualTo(context.getObject()); + } + + private ExecuteContext buildContext(Map originHeaders) { + Object[] arguments = new Object[5]; + arguments[4] = createProperties(originHeaders); + return ExecuteContext.forMemberMethod(new Object(), null, arguments, null, null); + } + + private AMQP.BasicProperties createProperties(Map headers) { + return new AMQP.BasicProperties.Builder() + .headers(headers) + .build(); + } +}