Seamless Transaction ID Propagation in Spring WebFlux Applications

Introduction

In Spring WebFlux applications, ensuring consistent transaction ID propagation across asynchronous execution chains is essential for effective logging and tracing. However, the reactive, non-blocking nature of WebFlux introduces challenges due to multiple threads handling requests concurrently. In this guide, we'll explore how to seamlessly propagate transaction IDs using the Reactor Hooks library and ContextPropagation in Spring WebFlux applications.

  1. Understanding the Challenge: In traditional servlet-based applications, the Mapped Diagnostic Context (MDC) is thread-local, making it straightforward to propagate transaction IDs across synchronous execution chains. However, in reactive WebFlux applications, multiple threads handle requests concurrently, requiring a mechanism to propagate transaction IDs across reactive operators.
  2. Leveraging Reactor Hooks for Automatic Context Propagation: The Reactor Hooks library provides a solution for automatic context propagation in reactive applications. By enabling automatic context propagation, transaction IDs can be automatically propagated across reactive operators such as flatMap, zip, and Flux.
  3. Implementing Transaction ID Propagation in Web Filters: Create a WebFilter to intercept incoming requests and extract the transaction ID from the request header. After processing the request, set the transaction ID in the reactor context using Hooks.onEachOperator.

Sample Implementation:


import lombok.Setter;
import org.slf4j.MDC;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;

import io.micrometer.context.ContextRegistry;
import reactor.util.context.Context;

@Setter
@Component
public class CustomLogger extends LayoutBase<ILoggingEvent> implements WebFilter {

    private static final String TRANSACTION_ID_KEY = "transaction_id";
    private static final int STACK_TRACE_LIMIT = 100;
    private String hostname;

    @EventListener(ApplicationReadyEvent.class)
    public void register() {
        // auto reactive context propagation
        Hooks.enableAutomaticContextPropagation();
        // register TRANSACTION_ID thread local accessor
        registerMDC();
    }

    private static void registerMDC() {
        ContextRegistry.getInstance()
                .registerThreadLocalAccessor(TRANSACTION_ID_KEY,
                        () -> MDC.get(TRANSACTION_ID_KEY),
                        value -> MDC.put(TRANSACTION_ID_KEY, value),
                        () -> MDC.remove(TRANSACTION_ID_KEY));
    }

    @Override
    public String doLayout(ILoggingEvent event) {
        StringBuilder customLog = new StringBuilder();
        CachingDateFormatter cachingDateFormatter = new CachingDateFormatter(ISO8601_PATTERN);
        customLog.append(cachingDateFormatter.format(event.getTimeStamp())/* - event.getLoggerContextVO().getBirthTime())*/);
        customLog.append(" ");
        customLog.append(event.getLevel());
        if (!event.getMDCPropertyMap().isEmpty()) {
            customLog.append(" [hostName=");
            customLog.append(hostname);
            customLog.append(", threadName=");
            customLog.append(Thread.currentThread().getName());
            customLog.append(", transactionId=");
            customLog.append(event.getMDCPropertyMap().get(TRANSACTION_ID_KEY));
            customLog.append("]");
        }
        customLog.append(" ");
        customLog.append(event.getLoggerName());
        customLog.append(" : ");
        customLog.append(event.getFormattedMessage().replaceAll("\n|\r|\t|%0A|%0D|%0a|%0d", "_"));
        customLog.append(LINE_SEPARATOR);
        if (null != event.getThrowableProxy()) {
            customLog.append(CAUSED_BY);
            customLog.append(event.getThrowableProxy().getClassName());
            customLog.append(COLON_CHAR);
            customLog.append(event.getThrowableProxy().getMessage());
            customLog.append(LINE_SEPARATOR);
            customLog.append(getStackTrace(event.getThrowableProxy().getStackTraceElementProxyArray()));
        }
        return customLog.toString();
    }

    private String getStackTrace(StackTraceElementProxy[] stackTraceElementProxies) {
        StringBuilder stackTrace = new StringBuilder();
        Arrays.stream(stackTraceElementProxies).limit(STACK_TRACE_LIMIT).filter(Objects::nonNull)
                .forEach(stackTraceElementProxy -> stackTrace
                        .append(stackTraceElementProxy.getSTEAsString())
                        .append(LINE_SEPARATOR));
        return stackTrace.toString();
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String transactionId = exchange.getRequest().getHeaders().getFirst(TRANSACTION_ID_KEY);
        if (StringUtils.hasText(transactionId)) {
            MDC.put(TRANSACTION_ID_KEY, transactionId); // for none reactive context
            return chain.filter(exchange)
                    // for reactive context, will automatic propagation by Hooks
                    .contextWrite(Context.of(Map.of(TRANSACTION_ID_KEY, transactionId)))
                    // clean up the TRANSACTION_ID_KEY from MDC map
                    .doFinally(signalType -> MDC.remove(TRANSACTION_ID_KEY));
        }
        return chain.filter(exchange);
    }
}

In this implmentation, we set up the hook on @EventListener(ApplicationReadyEvent.class) and register the Context into thread local for the transaction id. Then in the WebFilter doFilter method, we get the transaction id from request header and set it into Mono context via contextWrite. by enable the auto context propagation hook and register the thread local accessor for context registry, the context with transaction id will be duplicated into each reactive block. With the custom doLayout SL4J logback implementation, any logs will print out the transaction no matter which thread it is running on.

2024-04-12 01:49:44,973 INFO [hostName=CAMC02F617BMD6P, threadName=lettuce-nioEventLoop-5-1, transactionId=1234234234]
2024-04-12 01:49:45,134 INFO [hostName=CAMC02F617BMD6P, threadName=lettuce-nioEventLoop-5-2, transactionId=1234234234] 
2024-04-12 01:49:45,150 INFO [hostName=CAMC02F617BMD6P, threadName=lettuce-nioEventLoop-5-3, transactionId=1234234234] 
2024-04-12 01:49:45,659 INFO [hostName=CAMC02F617BMD6P, threadName=reactor-http-nio-3, transactionId=1234234234] threadName=reactor-http-nio-4, transactionId=1234234234] 

Conclusion

By leveraging Reactor Hooks for automatic context propagation and integrating with ContextPropagation in Spring WebFlux applications, developers can seamlessly propagate transaction IDs across asynchronous execution chains. This ensures consistent logging and tracing capabilities in reactive, non-blocking environments, enabling effective monitoring and debugging of Spring WebFlux applications.

reactor-core/docs/asciidoc/faq.adoc at main · reactor/reactor-core
Non-Blocking Reactive Foundation for the JVM. Contribute to reactor/reactor-core development by creating an account on GitHub.

Subscribe to Post, Code and Quiet Time.

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe