| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |
- 실시간 스트리밍 데이터
- APM 만들기
- 시간 윈도우
- 추적 데이터 마이닝 파이프라인
- s3
- 메모리 세그먼트
- nandtotetris
- 분산추적
- 스택머신
- 구문 분석
- 핵심 데이터 모델링
- InnoDB
- jack 문법
- vm머신
- ec2
- SpanId
- MySQL
- 밑바닥부터 만드는 컴퓨팅 시스템
- 텀블링 윈도우
- 리눅스
- 스트리밍 데이터 아키텍쳐
- 스트리밍 아키텍쳐
- 컴퓨터 아키텍쳐
- 도커
- 피벗 추적
- vm번역기
- Terraform
- apm
- OTEL
- 마운트
- Today
- Total
이것이 점프 투 공작소
분산추적에서 다른 프로세스, 외부 서비스간 전파는 어떻게 이루어질까 본문
회사에서 분산추적APM을 사용하며 가장 궁금했던 부분은,
'다른 프로세스, 외부 서비스간 이동이 발생이 있었는데, 어떻게 하나의 추적이라고 계속 인식하는거지?' 였습니다.
오픈소스인 OTel 자바 프로젝트를 통해 분산추적 구현의 중요한 챕터 중 하나인 Context 전파에 대해서 알아보려고 합니다.
https://github.com/open-telemetry/opentelemetry-java-instrumentation
GitHub - open-telemetry/opentelemetry-java-instrumentation: OpenTelemetry auto-instrumentation and instrumentation libraries for
OpenTelemetry auto-instrumentation and instrumentation libraries for Java - open-telemetry/opentelemetry-java-instrumentation
github.com
여기서 말하는 Context는 APM의 추적, Trace에서 계속해서 생성되는 Span들이 하나의 Trace안에서 순차적으로 이어지기 위해 필요한 정보를 의미합니다.
Span과 Trace에 대해서는 다른 포스팅에서 다루겠습니다.
APM에서 어떻게 프로세스, 스레드 변경을 탐지할까?
OTel의 추적은 LocalThread 기반으로 만들어져 있어, 기본적으로 동일 스레드 안에서만 APM의 Context를 공유합니다.
그렇기에 새로운 프로세스, 스레드에 대한 추적을 적용하려면 LocalThread -> NewThread간 Context를 공유해야하기에
에이전트는 새로운 스레드의 생성을 지속적으로 감시하고있습니다.
새롭게 스레드를 생성하는 Runnable, Callable, ExecutorService 등을 감지하면, 에이전트에서 내부적으로 Context.wrap()를 호출하여 새로운 스레드에 Context가 추가된 Context 객체로 바꿔치기 합니다.
wrap은 Context 객체의 메소드로 자바의 스레드 관련 메소드가 실행될 때 현재 Context를 추가해주는 메소드입니다.
default Runnable wrap(Runnable runnable) {
return () -> {
try (Scope ignored = makeCurrent()) {
runnable.run();
}
};
}
또 HTTP, RPC 클라이언트 호출과 같은 경우에는 에이전트가 Context 정보를 헤더에 실어 보냅니다.
수신하는 쪽에서는 Context를 추출하여 Cotext를 사용합니다.
추가로 OTel에서 Spring Batch의 추적과 계측은 아직 experimental상태로 OTel의 프로퍼티 옵션을 통해 활성은 가능하지만
기본적으로 비활성화 되어있기에 다루지 않으려고 합니다.
멀티스레드가 발생 했을 때
OTel은 기본적으로 LocalThread에 Context를 저장하기에 새롭게 스레드가 생긴다면, 해당 스레드에서 생성되는 Span은 이전 Context를 가지고 있지 않습니다.
즉 하나의 Trace로 연결될 수 없기에, 새로운 스레드에 Context를 넘겨주는 작업이 필요합니다.
자바에서 Executor, ExecutorService같은 스레드 관련 클래스가 실행되면,
에이전트가 bytecode 레벨에서 해당 코드를 변경하여, 새로운 스레드가 실행 될때 현재 wrap메소드를 사용해 현재 Context를 넣어줍니다.
Executor.submit(() -> { ... })
Executor.submit(Context.current().wrap(() -> { ... }))
앞서 잠깐 언급하였듯 wrap메소드는 각 Executor관련 메소드들을 wrap하여 makeCurrent로 현재 Context를 넣어주는 역할을 합니다.
자바의 Runnable 말고도 여러 메소드에 대한 wrap이 존재하며, Context.taskWrapping()를 사용해 Executor 자체를 감싸서 사용 할 수도 있습니다.
default Runnable wrap(Runnable runnable) {
return () -> {
try (Scope ignored = makeCurrent()) {
runnable.run();
}
};
}
default <T> Callable<T> wrap(Callable<T> callable) {
return () -> {
try (Scope ignored = makeCurrent()) {
return callable.call();
}
};
}
default Executor wrap(Executor executor) {
return command -> executor.execute(wrap(command));
}
default ExecutorService wrap(ExecutorService executor) {
if (executor instanceof ContextExecutorService) {
return executor;
}
return new ContextExecutorService(this, executor);
}
default ScheduledExecutorService wrap(ScheduledExecutorService executor) {
if (executor instanceof ContextScheduledExecutorService) {
return executor;
}
return new ContextScheduledExecutorService(this, executor);
}
비동기 호출 시
CompletableFuture, @Async 등의 비동기 호출 또한 내부적으로 작업 스레드가 분리되어 새로운 스레드에서 실행되므로,
기본적으로 멀티 스레드에서의 Context전파와 동일한 방식이 사용됩니다.
HTTP, RPC, Kafka 호출이 발생 했을 때
멀티스레드가 아닌 http, rpc, 메시징 시스템 kafka 등을 활용한 호출이 발생했을때는
각 방법들에 맞게 Context를 옮겨주는 작업이 필요합니다.
이 때 OTel에서는 Context를 전파할 때 Instrumenter 객체를 공통적으로 사용합니다.
Instrumenter
에이전트에서 span을 생성하는데 필요한 객체입니다.
요청이 들어오면 에이전트는 Instrumenter를 사용해 span을 생성하고 종료시킵니다.
샘플링 여부 등을 파악해서 span의 시작여부를 결정하는 shouldStart() ,
span을 시작하는 start(), 종료하는 end() 메소드로 구성되어 있습니다.
public class Instrumenter<REQUEST, RESPONSE> {
...
public boolean shouldStart(Context parentContext, REQUEST request) {
SpanKind kind = spanKindExtractor.extract(request);
return enabled && !spanSuppressor.shouldSuppress(parentContext, kind);
}
public Context start(Context parentContext, REQUEST request) {
return doStart(parentContext, request, null);
}
public void end(Context context, REQUEST request, RESPONSE response, Throwable error) {
doEnd(context, request, response, error, null);
}
HTTP 호출이 발생 했을 때
http 모듈이 감지되면 에이전트에서는 헤더에 traceparent, tracestate라는 key에 Context를 담아서 전달합니다.
OTel에서는 Propagator객체를 사용하여 이 작업을 수행합니다.
Inject()를 호출하여 헤더에 Context를 담고 , Extract()를 통해 헤더에 담긴 Context들을 추출해서 사용합니다.
RestTemplate를 사용하는 경우 에이전트에서는 RestTemplateInterceptor를 bean-post-processor를 통해 모든 RestTemplate인스턴스에 추가하고, WebClient의 경우에는 WebClient.Builder Bean을 후처리하여 http요청을 감지합니다.
참고로 http호출이 발생하면 APM입장에서는 http호출이 시작되는 시점에 span을 생성하고, http응답을 받는 시점에 종료되는 'http호출'에 대한 하나의 span이 존재해야합니다. 해당 span을 통해 trace를 더 정확하게 관리 할 수 있습니다.
traceparent, tracestate
traceId와 Context를 담아 보내는 W3C의 규격 헤더입니다.
traceparent는 필수지만, tracestate는 부가정보입니다.
아래는 traceparent를 규정하는 W3C규격이며, trace-flags는 샘플링된 추적인지 여부를 확인합니다.
01은 샘플링되는 추적, 00은 샘플링되지 않는 추적을 의미합니다.
traceparent: {version}-{trace-id}-{span-id}-{trace-flags}
00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
W3CTraceContextPropagator
propagator 객체는 http뿐만 아니라 rpc, 메시징 시스템에서 Context를 전달하기 위해 사용하는 객체입니다.
inject(), extract()메소드를 사용해 Context를 주입, 추출합니다.
Inject
W3CTraceContextPropagator 객체의 Inject() 메소드입니다.
Context에 담긴 SpanContext를 traceparent, tracestate 헤더에 추가합니다.
파라미터의 C carrier는 헤더를 담을 객체, HttpHeaders 또는 Map입니다.
@Override
public <C> void inject(Context context, @Nullable C carrier, TextMapSetter<C> setter) {
if (context == null || setter == null) {
return;
}
SpanContext spanContext = Span.fromContext(context).getSpanContext();
if (!spanContext.isValid()) {
return;
}
char[] chars = TemporaryBuffers.chars(TRACEPARENT_HEADER_SIZE);
chars[0] = VERSION.charAt(0);
chars[1] = VERSION.charAt(1);
chars[2] = TRACEPARENT_DELIMITER;
String traceId = spanContext.getTraceId();
traceId.getChars(0, traceId.length(), chars, TRACE_ID_OFFSET);
chars[SPAN_ID_OFFSET - 1] = TRACEPARENT_DELIMITER;
String spanId = spanContext.getSpanId();
spanId.getChars(0, spanId.length(), chars, SPAN_ID_OFFSET);
chars[TRACE_OPTION_OFFSET - 1] = TRACEPARENT_DELIMITER;
String traceFlagsHex = spanContext.getTraceFlags().asHex();
chars[TRACE_OPTION_OFFSET] = traceFlagsHex.charAt(0);
chars[TRACE_OPTION_OFFSET + 1] = traceFlagsHex.charAt(1);
setter.set(carrier, TRACE_PARENT, new String(chars, 0, TRACEPARENT_HEADER_SIZE));
TraceState traceState = spanContext.getTraceState();
if (traceState.isEmpty()) {
// No need to add an empty "tracestate" header.
return;
}
setter.set(carrier, TRACE_STATE, encodeTraceState(traceState));
}
Extract
요청이 도착하면 에이전트는 http헤더에서 traceparent, tracestate를 꺼내서 Context를 복원합니다.
@Override
public <C> Context extract(Context context, @Nullable C carrier, TextMapGetter<C> getter) {
if (context == null) {
return Context.root();
}
if (getter == null) {
return context;
}
SpanContext spanContext = extractImpl(carrier, getter);
if (!spanContext.isValid()) {
return context;
}
return context.with(Span.wrap(spanContext));
}
RPC 호출이 발생 했을 때
rpc에서는 메타데이터에 Context를 넣어서 전달합니다.
http와 동일하게 propagator클래스의 Extract(), Inject() 메소드를 사용하며, traceparent, tracestate는 RPC Metadata 키로서 사용됩니다.
에이전트에서 TracingClientInterceptor, TracingServerInterceptor에서 rpc을 감지하여 context를 주입합니다.
sendMessage, OnMessage
rpc 요청을 보내고 수신할 때,
Interceptor에서는 어떤 메시지가 몇 개 오갔는지, 또 언제 어떤 순서로 처리됐는지를 분석하기 위해
sendMessage(), on onMessage() 메소드를 사용해 자동으로 증가하는 메시지ID와 메시지 타입 SEND, RECEIVED를 저장합니다.
메시지 ID와 메시지 타입은 sendMessage()와 onMessage() 안에서 Span 이벤트로 기록됩니다.
@Override
public void sendMessage(REQUEST message) {
try (Scope ignored = context.makeCurrent()) {
super.sendMessage(message);
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
throw e;
}
long messageId = SENT_MESSAGE_ID_UPDATER.incrementAndGet(this);
if (emitMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
}
@Override
public void onMessage(REQUEST message) {
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingServerCall.this);
if (emitMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
delegate().onMessage(message);
}
TracingClientInterceptor-interceptCall
gRPC의 ClientInterceptor를 상속받아 interceptCall를 오버라이드하여, rpc가 나갈 때 Context를 추기해주는 역할을 합니다.
final class TracingClientInterceptor implements ClientInterceptor {
...
@Override
public <REQUEST, RESPONSE> ClientCall<REQUEST, RESPONSE> interceptCall(
MethodDescriptor<REQUEST, RESPONSE> method, CallOptions callOptions, Channel next) {
GrpcRequest request = new GrpcRequest(method, null, null, next.authority());
Context parentContext = Context.current();
if (!instrumenter.shouldStart(parentContext, request)) {
return next.newCall(method, callOptions);
}
Context context = instrumenter.start(parentContext, request);
ClientCall<REQUEST, RESPONSE> result;
try (Scope ignored = context.makeCurrent()) {
// call other interceptors
result = next.newCall(method, callOptions);
} catch (Throwable t) {
instrumenter.end(context, request, Status.UNKNOWN, t);
throw t;
}
return new TracingClientCall<>(result, parentContext, context, request);
}
final class TracingClientCall<REQUEST, RESPONSE>
extends ForwardingClientCall.SimpleForwardingClientCall<REQUEST, RESPONSE> {
private final Context parentContext;
private final Context context;
private final GrpcRequest request;
// Used by SENT_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long sentMessageId;
// Used by RECEIVED_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long receivedMessageId;
TracingClientCall(
ClientCall<REQUEST, RESPONSE> delegate,
Context parentContext,
Context context,
GrpcRequest request) {
super(delegate);
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
@Override
public void start(Listener<RESPONSE> responseListener, Metadata headers) {
propagators.getTextMapPropagator().inject(context, headers, MetadataSetter.INSTANCE);
// store metadata so that it can be used by custom AttributesExtractors
request.setMetadata(headers);
try (Scope ignored = context.makeCurrent()) {
super.start(
new TracingClientCallListener(responseListener, parentContext, context, request),
headers);
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
throw e;
}
}
...
}
TracingClientCallListener
rpc의 interceptCall 메소드를 호출하기 위해서는 인자로 Listener를 전달해야하는데 이 때 OTel에서는 커스텀한 리스너 TracingClientCallListener를 전달합니다.
TracingClientCallListener는 클라이언트가 rpc 호출할 때 start() 내부에서 생성되며 gRPC의 ClientCall.Listener를 상속받아서 클라이언트가 서버 응답을 받을 때마다, Span에 이벤트를 찍고, Span을 종료하는 실질적인 역할을 합니다.
final class TracingClientInterceptor implements ClientInterceptor {
...
final class TracingClientCallListener
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RESPONSE> {
private final Context parentContext;
private final Context context;
private final GrpcRequest request;
TracingClientCallListener(
Listener<RESPONSE> delegate,
Context parentContext,
Context context,
GrpcRequest request) {
super(delegate);
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
@Override
public void onMessage(RESPONSE message) {
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this);
if (emitMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
try (Scope ignored = context.makeCurrent()) {
delegate().onMessage(message);
}
}
@Override
public void onClose(Status status, Metadata trailers) {
request.setPeerSocketAddress(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
if (captureExperimentalSpanAttributes) {
Span span = Span.fromContext(context);
span.setAttribute(
GRPC_RECEIVED_MESSAGE_COUNT, RECEIVED_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
span.setAttribute(
GRPC_SENT_MESSAGE_COUNT, SENT_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
}
instrumenter.end(context, request, status, status.getCause());
try (Scope ignored = parentContext.makeCurrent()) {
delegate().onClose(status, trailers);
}
}
@Override
public void onReady() {
try (Scope ignored = context.makeCurrent()) {
delegate().onReady();
}
}
}
}
}
TracingServerInterceptor-interceptCall
gRPC의 ServerInterceptor를 상속받아 interceptCall를 오버라이드하여, rpc가 들어오면 컨텍스트를 추출하고 Context에 저장합니다.
final class TracingServerInterceptor implements ServerInterceptor {
...
@Override
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(
ServerCall<REQUEST, RESPONSE> call,
Metadata headers,
ServerCallHandler<REQUEST, RESPONSE> next) {
String authority = call.getAuthority();
if (authority == null) {
// Armeria grpc server call does not implement getAuthority(). In
// ArmeriaServerCallInstrumentation we store the value for the authority header in a virtual
// field.
authority = GrpcAuthorityStorage.getAuthority(call);
}
GrpcRequest request =
new GrpcRequest(
call.getMethodDescriptor(),
headers,
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR),
authority);
Context parentContext = Context.current();
if (!instrumenter.shouldStart(parentContext, request)) {
return next.startCall(call, headers);
}
Context context = instrumenter.start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
return new TracingServerCall<>(call, context, request).start(headers, next);
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
throw e;
}
}
final class TracingServerCall<REQUEST, RESPONSE>
extends ForwardingServerCall.SimpleForwardingServerCall<REQUEST, RESPONSE> {
private final Context context;
private final GrpcRequest request;
private Status status;
// Used by SENT_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long sentMessageId;
// Used by RECEIVED_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long receivedMessageId;
TracingServerCall(
ServerCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
super(delegate);
this.context = context;
this.request = request;
}
}
TracingServerInterceptor
서버가 rpc 요청을 수신할 때 interceptCall() 내부에서 생성되며, 요청을 받는 동안 메시지의 수신,완료,취소 처리를 합니다.
final class TracingServerInterceptor implements ServerInterceptor {
...
final class TracingServerCallListener
extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> {
private final Context context;
private final GrpcRequest request;
TracingServerCallListener(Listener<REQUEST> delegate, Context context, GrpcRequest request) {
super(delegate);
this.context = context;
this.request = request;
}
private void end(Context context, GrpcRequest request, Status response, Throwable error) {
if (captureExperimentalSpanAttributes) {
Span span = Span.fromContext(context);
span.setAttribute(
GRPC_RECEIVED_MESSAGE_COUNT, RECEIVED_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
span.setAttribute(
GRPC_SENT_MESSAGE_COUNT, SENT_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
if (Status.CANCELLED.equals(status)) {
span.setAttribute(GRPC_CANCELED, true);
}
}
instrumenter.end(context, request, response, error);
}
@Override
public void onHalfClose() {
try {
delegate().onHalfClose();
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
throw e;
}
}
@Override
public void onCancel() {
try {
delegate().onCancel();
} catch (Throwable e) {
end(context, request, Status.UNKNOWN, e);
throw e;
}
end(context, request, Status.CANCELLED, null);
}
@Override
public void onComplete() {
try {
delegate().onComplete();
} catch (Throwable e) {
end(context, request, Status.UNKNOWN, e);
throw e;
}
if (status == null) {
status = Status.UNKNOWN;
}
end(context, request, status, status.getCause());
}
@Override
public void onReady() {
try {
delegate().onReady();
} catch (Throwable e) {
end(context, request, Status.UNKNOWN, e);
throw e;
}
}
}
}
}
메시징 시스템에서는?
컨슈머와 프로듀서를 사용해서 데이터를 전달할때는 kafka의 send(), poll() 메소드를 wrap해서 propagator의 inject(), extract()를 사용하여 Context를 넣어줍니다.
Context 값의 key는 다른 방식들과 동일하게 traceparent입니다.
메시징 시스템에서 각 요청,응답 span은 process, received로 표현됩니다.
Producer-wrap
java의 동적 Proxy로 kafka의 Producer를 감싸서 kafka의 send메소드를 인터셉트해서 값들을 buildAndInjectSpan에 넣어줍니다.
kafka의 콜백이 있으면 콜백도 넣어줍니다.
public final class KafkaTelemetry {
...
/** Returns a decorated {@link Producer} that emits spans for each sent message. */
@SuppressWarnings("unchecked")
public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
return (Producer<K, V>)
Proxy.newProxyInstance(
KafkaTelemetry.class.getClassLoader(),
new Class<?>[] {Producer.class},
(proxy, method, args) -> {
// Future<RecordMetadata> send(ProducerRecord<K, V> record)
// Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
if ("send".equals(method.getName())
&& method.getParameterCount() >= 1
&& method.getParameterTypes()[0] == ProducerRecord.class) {
ProducerRecord<K, V> record = (ProducerRecord<K, V>) args[0];
Callback callback =
method.getParameterCount() >= 2
&& method.getParameterTypes()[1] == Callback.class
? (Callback) args[1]
: null;
return buildAndInjectSpan(record, producer, callback, producer::send);
}
try {
return method.invoke(producer, args);
} catch (InvocationTargetException exception) {
throw exception.getCause();
}
});
}
...
}
buildAndInjectSpan
buildAndInjectSpan에서는 Context를 꺼내고, OTel의 Kafka DTO, KafkaProducerRequest객체를 이용해 요청과 producer를 wrap 후, shouldStart 메소드로 해당 요청이 샘플링 되는 요청인지 확인합니다.
샘플링이 필요없으면 span생성 없이 카프카로 요청을 전달하고 필요하다면 span을 inject하여 요청을 전달합니다.
<K, V> Future<RecordMetadata> buildAndInjectSpan(
ProducerRecord<K, V> record,
Producer<K, V> producer,
Callback callback,
BiFunction<ProducerRecord<K, V>, Callback, Future<RecordMetadata>> sendFn) {
Context parentContext = Context.current();
KafkaProducerRequest request = KafkaProducerRequest.create(record, producer);
if (!producerInstrumenter.shouldStart(parentContext, request)) {
return sendFn.apply(record, callback);
}
Context context = producerInstrumenter.start(parentContext, request);
propagator().inject(context, record.headers(), SETTER);
try (Scope ignored = context.makeCurrent()) {
return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request));
}
}
Consumer-wrap
Producer와 동일하게 동적 Proxy를 사용해 kafka의 Consumer 를 wrap합니다.
proxy는 람다함수를 가로채서 poll() 메소드가 발생하면 먼저 시간 체크 작업을 시작합니다.
poll() 결과가 ConsumerRecords이면, ConsumerRecords는 여러 kafka메시지를 가지고 있는 객체입니다.
buildAndFinishSpan() 메소드를 실행시켜 poll()한 kafka메시지로 Context를 만들고 반환합니다.
이후 receiveContext, KafkaConsumer를 인자로 KafkaConsumerContext를 만들어 addTracing() 메소드를 호출해 반복문을 돌며 각 메시지마다 span을 만듭니다.
public final class KafkaTelemetry {
...
/** Returns a decorated {@link Consumer} that consumes spans for each received message. */
@SuppressWarnings("unchecked")
public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
return (Consumer<K, V>)
Proxy.newProxyInstance(
KafkaTelemetry.class.getClassLoader(),
new Class<?>[] {Consumer.class},
(proxy, method, args) -> {
Object result;
Timer timer = "poll".equals(method.getName()) ? Timer.start() : null;
try {
result = method.invoke(consumer, args);
} catch (InvocationTargetException exception) {
throw exception.getCause();
}
// ConsumerRecords<K, V> poll(long timeout)
// ConsumerRecords<K, V> poll(Duration duration)
if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
ConsumerRecords<K, V> consumerRecords = (ConsumerRecords<K, V>) result;
Context receiveContext = buildAndFinishSpan(consumerRecords, consumer, timer);
if (receiveContext == null) {
receiveContext = Context.current();
}
KafkaConsumerContext consumerContext =
KafkaConsumerContextUtil.create(receiveContext, consumer);
result = addTracing(consumerRecords, consumerContext);
}
return result;
});
}
}
addTracing
카프카 메시지들에 span을 붙이는 메소드입니다.
카프카 파티션의 메시지들을 consumerProcessInstrumenter를 사용하여 span의 시작과 종료를 감지하도록합니다.
<K, V> ConsumerRecords<K, V> addTracing(
ConsumerRecords<K, V> consumerRecords, KafkaConsumerContext consumerContext) {
if (consumerRecords.isEmpty()) {
return consumerRecords;
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
for (TopicPartition partition : consumerRecords.partitions()) {
List<ConsumerRecord<K, V>> list = consumerRecords.records(partition);
if (list != null && !list.isEmpty()) {
list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext);
}
records.put(partition, list);
}
return new ConsumerRecords<>(records);
}
Reactive 환경
reactive 체인에서는 스레드가 언제든지 변경 될 수 있기에, OTel의 LocalThread 기반의 Context 전파를 사용할 수 없습니다.
reactor 관련 메소드가 실행되면 wrap 후 hook을 만들어 체인간의 Context전파를 해주어야합니다.
참고로 webflux환경, 즉 netty의 멀티플렉싱 환경에서 추적은 bossGroup의 스레드가 아닌 Worker스레드에서부터 시작합니다.
ContextPropagationOperator
reactor 연산자들에 hook을 추가해주는 클래스입니다.
reactor의 연산자들은 내부적으로 모두 Subscriber를 만들고 다음 연산자에게 넘깁니다.
즉 모든 체인은 내부적으로 CoreSubscriber 클래스가 사용됩니다.
ContextPropagaionOperator 클래스에서 registerOnEachOperator의 tracingLift() 함수로 연산자마다 TracingSubscriber가 들어가도록 합니다.
public final class ContextPropagationOperator {
...
public void registerOnEachOperator() {
synchronized (lock) {
if (enabled) {
return;
}
Hooks.onEachOperator(
TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
enabled = true;
}
}
...
}
tracingLift 함수에서 reactor가 연산자마다 Subscriber를 생성할 때 그걸 감싸주는 Lifter를 등록합니다.
Lifter를 통해 원래 subscriber를 TracingSubscriber로 감싸게 됩니다.
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(...) {
return Operators.lift(
ContextPropagationOperator::shouldInstrument,
new Lifter<>(...));
}
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
return new TracingSubscriber<>(sub, sub.currentContext());
}
마무리
분산추적에서 각 상황에 대해서 Context를 어떻게 전파하는지 OTel프로젝트를 보며 정리해 보았는데,
오픈소스를 처음으로 분석하다 보니 이런식으로 분석하는게 맞는건지 또 좋은건지 판단이 어렵네요.
어느정도 흐름은 이해했는데, 아무래도 직접 코드를 작성해봐야 어떤 디테일들이 있는지 알 수 있을 것 같습니다.
'APM만들기' 카테고리의 다른 글
| 피쳐 추출 시스템에 대해 알아보자 (0) | 2025.08.29 |
|---|---|
| 피벗 추적에 대해 알아보자 (0) | 2025.08.03 |
| 분산추적의 샘플링에 대해서 알아보자 (0) | 2025.07.18 |
| OpenTelemetry 프로젝트를 보며 공부하는 분산추적의 핵심요소들 (0) | 2025.06.20 |