| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
- ec2
- s3
- nandtotetris
- OTEL
- 컴퓨터 아키텍쳐
- 시간 윈도우
- 메모리 세그먼트
- 스트리밍 데이터 아키텍쳐
- 구문 분석
- SpanId
- 밑바닥부터 만드는 컴퓨팅 시스템
- 분산추적
- 실시간 스트리밍 데이터
- 리눅스
- 피벗 추적
- 스택머신
- 마운트
- 스트리밍 아키텍쳐
- 핵심 데이터 모델링
- InnoDB
- 도커
- APM 만들기
- Terraform
- MySQL
- 텀블링 윈도우
- apm
- jack 문법
- 추적 데이터 마이닝 파이프라인
- vm번역기
- vm머신
- Today
- Total
이것이 점프 투 공작소
OpenTelemetry 프로젝트를 보며 공부하는 분산추적의 핵심요소들 본문
APM을 직접 만들어 저의 사이드 프로젝트들에 쓰고싶다는 작은 소망이 있었습니다.
지금부터 APM을 만들기 위한 공부들과 만드는 과정들을 같이 포스팅하려고 합니다.
화이팅
첫번째로 CNCF 표준이자 분산 추적 프레임워크인 OpenTelemetry, OTel의 구조와 핵심 구성요소
Trace, Span, Baggage, Metric, Log 에 대해 알아보겠습니다.
OTel이란?
이전에 분산추적을 구현하는 방식은 매우 다양했고 대부분 직접 만들어 사용했기에 각 서비스들에 종속되어 있는 방식이었습니다.
시간이 지날수록 표준의 필요성이 느껴지게 되었고 두가지 오픈 소스가 만들어졌습니다.
하나는 구글의 내부 라이브러리 집합인 센서스(Census)에서 시작된 오픈센서스(OpenCensus)
하나는 CNCF 프로젝트인 OpenTracing입니다.
지금은 이 두 프로젝트가 합쳐저 OpenTelementry가 되었습니다.
OpenTelementry는 Jaeger, Zipkin과 같은 어플리케이션이 아니고 예시와 같은 추적서버에 데이터를 전송하는 역할을 합니다.
SDK와 API는 openTelementry-java프로젝트에 있고 Agent관련 파일은 openTelementry-java-instrumentation 프로젝트에 있습니다.
저는 먼저 자바 APM을 만들고자 해당 프로젝트를 좀 살펴보려합니다.
다른 생태계는 JavaAPM 먼저 만들고 한번 도전해 볼까 합니다,,
https://github.com/open-telemetry/opentelemetry-java
GitHub - open-telemetry/opentelemetry-java: OpenTelemetry Java SDK
OpenTelemetry Java SDK. Contribute to open-telemetry/opentelemetry-java development by creating an account on GitHub.
github.com
https://github.com/open-telemetry/opentelemetry-java-instrumentation
GitHub - open-telemetry/opentelemetry-java-instrumentation: OpenTelemetry auto-instrumentation and instrumentation libraries for
OpenTelemetry auto-instrumentation and instrumentation libraries for Java - open-telemetry/opentelemetry-java-instrumentation
github.com
트레이서 (Tracer)
분산 추적 시스템에서 요청은 여러 서비스와 컴포넌트 등 다양한 홉(hop)을 거치게 되는데 Trace는 그 전체에 대한 경로를 의미하며 각 Trace들은Trace_Id로 구분됩니다.
Trace들은 여러 hop들을 통과하면서 스팬(Span)을 생성하는 역할을 합니다.
Tracer는 TracerProvider로 부터 생성되며, Provider는 TracerProvider는 어플리케이션의 수명주기와 동일합니다.
Expoter를 통해 Tracer를 Consumer, 다른 서버에 보낼수도 있다고 합니다.
근데 코드를 찾아보니 TracerExpoter는 따로 없고 SpanExtoter에서 해당 기능을 수행합니다.
이제 Tracer쪽 소스코드를 간략히 살펴보려고합니다.
Tracer
Tracer 인터페이스는 Span을 생성하는 SpanBuilder를 생성하는 메소드 하나만 있습니다.
@ThreadSafe
public interface Tracer {
/**
* Returns a {@link SpanBuilder} to create and start a new {@link Span}.
*
* <p>See {@link SpanBuilder} for usage examples.
*
* @param spanName The name of the returned Span.
* @return a {@code Span.Builder} to create and start a new {@code Span}.
*/
SpanBuilder spanBuilder(String spanName);
}
Tracer_ID
Tracer_ID가 어떻게 생성되는지 살펴보겠습니다.
개인적으로 이런 오픈소스를 통해서 공부하는게 처음이라, 어떤식으로 ID를 관리하는지 궁금했었는데 이제야 확인해보게 되네요,,ㅎ
trace_id의 크기는 16byte입니다.
id를 사용할때는 이를 절반, 8byte로 나누어 high, low로 구분합니다.
이렇게 구분하여 사용하는 이유는 byte[16]과 같이 사용하면 jvm에서 heap객체로 사용되어서 G.C의 영향을 받게 됩니다.
반면 원시타입 long을 사용하면 G.C의 영향을 받지 않기에 이런식으로 구분하여 사용한다고 합니다.
또한 값이 모두 0이면 유효하지 않은 Trace로 취급합니다.
string으로 출력할때는 16진수 문자열로 변환하여 32자리의 string을 사용합니다.
Valid는 언제하는데,,?? (알아보자)
public final class TraceId {
private static final int BYTES_LENGTH = 16;
private static final int HEX_LENGTH = 2 * BYTES_LENGTH;
private static final String INVALID = "00000000000000000000000000000000";
private TraceId() {}
...
public static String fromLongs(long traceIdLongHighPart, long traceIdLongLowPart) {
if (traceIdLongHighPart == 0 && traceIdLongLowPart == 0) {
return getInvalid();
}
char[] chars = TemporaryBuffers.chars(HEX_LENGTH);
OtelEncodingUtils.longToBase16String(traceIdLongHighPart, chars, 0);
OtelEncodingUtils.longToBase16String(traceIdLongLowPart, chars, 16);
return new String(chars, 0, HEX_LENGTH);
}
...
}
Trace_ID의 생성은 RandomIdGenerator 객체를 이용해 high, low id값을 만들어 조합후 리턴해줍니다.
enum RandomIdGenerator implements IdGenerator {
INSTANCE;
private static final long INVALID_ID = 0;
private static final Supplier<Random> randomSupplier = RandomSupplier.platformDefault();
@Override
public String generateSpanId() {
long id;
Random random = randomSupplier.get();
do {
id = random.nextLong();
} while (id == INVALID_ID);
return SpanId.fromLong(id);
}
@Override
public String generateTraceId() {
Random random = randomSupplier.get();
long idHi = random.nextLong();
long idLo;
do {
idLo = random.nextLong();
} while (idLo == INVALID_ID);
return TraceId.fromLongs(idHi, idLo);
}
@Override
public String toString() {
return "RandomIdGenerator{}";
}
}
Tracer가 가지고 있는 정보들
Tracer는 span을 만드는 간단한 업무만 하지만, Tracer가 자체적으로 가지고 있는 정보도 중요합니다.
Tracer에 어떤 값들이 있어야하는지 SdkTracerBuilder와 Tracer의 구현체 SdkTracer 코드를 통해 알아보겠습니다.
TracerBuilder
SdkTracerBuilder 부터 먼저 알아보겠습니다.
Tracer를 만드는 빌더 클래스입니다.
TracerBuilder에는 가 어떤 기능에 사용될지 지정하는 instrumentationScopeName, 주로 라이브러리나 모듈을 지정합니다.
해당 Version, URL 정보를 넣기위한 schemaUrl이 존재합니다.
그리고 이렇게 만들어둔 Tracer 인스턴스를 재사용하기 위한 registry가 있습니다.
build() 메소드는 레지스트리에 해당 트레이서가 없으면 생성하는 역할까지 합니다.
Builder를 통해 생성된 각 Tracer는 고유한 이름을 사용해 해당 Tracer가 어떤 라이브러리나 모듈에 속하는지 식별합니다.
예를들어 "io.opentelemetry.contrib.mongodb"라는 이름(instrumentationScopeName)의 Tracer를 통해 생성된 Span들은 MongoDB 관련 작업을 의미하도록 추적에 명시 할 수 있습니다.
Tracer는 여러 개 만들어질 수 있지만, 일반적으로 구성요소별로 하나씩만 생성하여 재사용합니다.
class SdkTracerBuilder implements TracerBuilder {
private final ComponentRegistry<SdkTracer> registry;
private final String instrumentationScopeName;
@Nullable private String instrumentationScopeVersion;
@Nullable private String schemaUrl;
SdkTracerBuilder(ComponentRegistry<SdkTracer> registry, String instrumentationScopeName) {
this.registry = registry;
this.instrumentationScopeName = instrumentationScopeName;
}
@Override
public TracerBuilder setSchemaUrl(String schemaUrl) {
this.schemaUrl = schemaUrl;
return this;
}
@Override
public TracerBuilder setInstrumentationVersion(String instrumentationScopeVersion) {
this.instrumentationScopeVersion = instrumentationScopeVersion;
return this;
}
@Override
public Tracer build() {
return registry.get(
instrumentationScopeName, instrumentationScopeVersion, schemaUrl, Attributes.empty());
}
}
SdkTracer
SDK에서 사용하는 Tracer의 구현체입니다.
생성자 부분을 보면 3가지 객체가 들어갑니다.
각 객체들을 살펴보겠습니다.
class SdkTracer implements Tracer {
static final String FALLBACK_SPAN_NAME = "<unspecified span name>";
private static final Tracer NOOP_TRACER = TracerProvider.noop().get("noop");
private static final boolean INCUBATOR_AVAILABLE;
static {
boolean incubatorAvailable = false;
try {
Class.forName("io.opentelemetry.api.incubator.trace.ExtendedDefaultTracerProvider");
incubatorAvailable = true;
} catch (ClassNotFoundException e) {
// Not available
}
INCUBATOR_AVAILABLE = incubatorAvailable;
}
private final TracerSharedState sharedState;
private final InstrumentationScopeInfo instrumentationScopeInfo;
protected boolean tracerEnabled;
SdkTracer(
TracerSharedState sharedState,
InstrumentationScopeInfo instrumentationScopeInfo,
TracerConfig tracerConfig) {
this.sharedState = sharedState;
this.instrumentationScopeInfo = instrumentationScopeInfo;
this.tracerEnabled = tracerConfig.isEnabled();
}
1. TracerSharedState
Tracer가 공통적으로 가지고 있는 설정값들입니다.
final class TracerSharedState {
...
TracerSharedState(
Clock clock,
IdGenerator idGenerator,
Resource resource,
Supplier<SpanLimits> spanLimitsSupplier,
Sampler sampler,
List<SpanProcessor> spanProcessors,
ExceptionAttributeResolver exceptionAttributeResolver) {
...
}
- clock : Span의 시작/종료 시간을 측정
- idGenerator : TraceId, SpanId를 만드는 객체
- idGeneratorSafeToSkipIdValidation : ID 생성 검증 생략 여부
- resource : 해당 Trace가 속한 서비스/환경 정보
- activeSpanProcessor : Span종료 시 Export 등 종료 처리 객체
- exceptionAttributeResolver : 예외처리
- shutdownResult : 종료 상태 저장
2. InstrumentationScopeInfo
앞서 살펴보았던 Tracer의 기본 정보(ScopeName, Version, SchemaUrl)를 담는 객체입니다.
public final class InstrumentationScopeInfoBuilder {
private final String name;
@Nullable private String version;
@Nullable private String schemaUrl;
@Nullable private Attributes attributes;
...
}
3. TracerConfig
Tracer가 생성 될 때 Tracer의 생성을 막는 메소드입니다.
불필요한 계측을 할 것 같다거나, 라이브러리의 성능의 이슈가 있을 수 있을경우 명시적으로 해당 scope의 tracer를 막기 위한 설정값으로 사용합니다.
public abstract class TracerConfig {
private static final TracerConfig DEFAULT_CONFIG =
new AutoValue_TracerConfig(/* enabled= */ true);
private static final TracerConfig DISABLED_CONFIG =
new AutoValue_TracerConfig(/* enabled= */ false);
/** Returns a disabled {@link TracerConfig}. */
public static TracerConfig disabled() {
return DISABLED_CONFIG;
}
/** Returns an enabled {@link TracerConfig}. */
public static TracerConfig enabled() {
return DEFAULT_CONFIG;
}
...
}
이제 생성자 아래 부분 코드도 살펴보겠습니다.
개념적으로 계속 Tracer는 Span을 생성한다고 했지만 코드상으로는 Span을 만드는 SpanBuilder를 리턴합니다.
class SdkTracer implements Tracer {
...
/**
* Note that {@link ExtendedSdkTracer#spanBuilder(String)} calls this and depends on it returning
* {@link ExtendedSdkTracer} in all cases when the incubator is present.
*/
@Override
public SpanBuilder spanBuilder(String spanName) {
if (!tracerEnabled) {
return NOOP_TRACER.spanBuilder(spanName);
}
if (spanName == null || spanName.trim().isEmpty()) {
spanName = FALLBACK_SPAN_NAME;
}
if (sharedState.hasBeenShutdown()) {
return NOOP_TRACER.spanBuilder(spanName);
}
return INCUBATOR_AVAILABLE
? IncubatingUtil.createExtendedSpanBuilder(
spanName, instrumentationScopeInfo, sharedState, sharedState.getSpanLimits())
: new SdkSpanBuilder(
spanName, instrumentationScopeInfo, sharedState, sharedState.getSpanLimits());
}
...
}
TraceProvider
Tracer들의 팩토리입니다.
보통 어플리케이션 시작 시 TraceProvider를 초기화하여 전역으로 사용합니다.
TracerProvider는 Stateful한 객체로서 샘플링, SpanProcessor, Exporter 등 추적 구성(config) 정보를 보유하여 해당 Provider에서 만들어진 모든 Tracer와 Span에 일관된 설정을 적용합니다.
메소드에 get밖에 없어서 어디서 Tracer를 만드는거지? 했지만 타고타고 살펴보면 구현체의 get()메소드가 실행되면 Tracer들의 저장소인 ComponentRegistry에서 원하는 Tracer가 없으면 생성하는 역할까지 하고있습니다.
package io.opentelemetry.api.trace;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public interface TracerProvider {
/**
* Returns a no-op {@link TracerProvider} which only creates no-op {@link Span}s which do not
* record nor are emitted.
*/
static TracerProvider noop() {
return DefaultTracerProvider.getInstance();
}
/**
* Gets or creates a named tracer instance.
*
* @param instrumentationScopeName A name uniquely identifying the instrumentation scope, such as
* the instrumentation library, package, or fully qualified class name. Must not be null.
* @return a tracer instance.
*/
Tracer get(String instrumentationScopeName);
/**
* Gets or creates a named and versioned tracer instance.
*
* @param instrumentationScopeName A name uniquely identifying the instrumentation scope, such as
* the instrumentation library, package, or fully qualified class name. Must not be null.
* @param instrumentationScopeVersion The version of the instrumentation scope (e.g., "1.0.0").
* @return a tracer instance.
*/
Tracer get(String instrumentationScopeName, String instrumentationScopeVersion);
/**
* Creates a TracerBuilder for a named {@link Tracer} instance.
*
* @param instrumentationScopeName A name uniquely identifying the instrumentation scope, such as
* the instrumentation library, package, or fully qualified class name. Must not be null.
* @return a TracerBuilder instance.
* @since 1.4.0
*/
default TracerBuilder tracerBuilder(String instrumentationScopeName) {
return DefaultTracerBuilder.getInstance();
}
}
SdkTraceProvider
SDK에서 사용하는 TraceProvider의 구현체입니다.
Tracer의 설정값을 지정하는 생성자, 익스포터로 전송하는 flush메소드, 가장 핵심인 Tracer를 만드는 get()메소드도 구현되어있습니다.
또한 이미 만들어둔 Tracer 객체를 TracerBuilder를 통해 재사용하기 위한 ComponentRegistry 객체와,
상황에 따라 기존 설정값이 아닌 다른 설정값을 새로이 지정할 수 있도록 하는 setTracerConfigurator메소드도 보입니다.
참고로 ComponentRegistry는 Map을 통해 Tracer들을 관리합니다.
public final class SdkTracerProvider implements TracerProvider, Closeable {
private static final Logger logger = Logger.getLogger(SdkTracerProvider.class.getName());
static final String DEFAULT_TRACER_NAME = "";
private final TracerSharedState sharedState;
private final ComponentRegistry<SdkTracer> tracerSdkComponentRegistry;
// deliberately not volatile because of performance concerns
// - which means its eventually consistent
private ScopeConfigurator<TracerConfig> tracerConfigurator;
/**
* Returns a new {@link SdkTracerProviderBuilder} for {@link SdkTracerProvider}.
*
* @return a new {@link SdkTracerProviderBuilder} for {@link SdkTracerProvider}.
*/
public static SdkTracerProviderBuilder builder() {
return new SdkTracerProviderBuilder();
}
@SuppressWarnings("NonApiType")
SdkTracerProvider(
Clock clock,
IdGenerator idsGenerator,
Resource resource,
Supplier<SpanLimits> spanLimitsSupplier,
Sampler sampler,
List<SpanProcessor> spanProcessors,
ScopeConfigurator<TracerConfig> tracerConfigurator,
ExceptionAttributeResolver exceptionAttributeResolver) {
this.sharedState =
new TracerSharedState(
clock,
idsGenerator,
resource,
spanLimitsSupplier,
sampler,
spanProcessors,
exceptionAttributeResolver);
this.tracerSdkComponentRegistry =
new ComponentRegistry<>(
instrumentationScopeInfo ->
SdkTracer.create(
sharedState,
instrumentationScopeInfo,
getTracerConfig(instrumentationScopeInfo)));
this.tracerConfigurator = tracerConfigurator;
}
private TracerConfig getTracerConfig(InstrumentationScopeInfo instrumentationScopeInfo) {
TracerConfig tracerConfig = tracerConfigurator.apply(instrumentationScopeInfo);
return tracerConfig == null ? TracerConfig.defaultConfig() : tracerConfig;
}
@Override
public Tracer get(String instrumentationScopeName) {
return tracerBuilder(instrumentationScopeName).build();
}
@Override
public Tracer get(String instrumentationScopeName, String instrumentationScopeVersion) {
return tracerBuilder(instrumentationScopeName)
.setInstrumentationVersion(instrumentationScopeVersion)
.build();
}
@Override
public TracerBuilder tracerBuilder(@Nullable String instrumentationScopeName) {
// Per the spec, both null and empty are "invalid" and a default value should be used.
if (instrumentationScopeName == null || instrumentationScopeName.isEmpty()) {
logger.fine("Tracer requested without instrumentation scope name.");
instrumentationScopeName = DEFAULT_TRACER_NAME;
}
return new SdkTracerBuilder(tracerSdkComponentRegistry, instrumentationScopeName);
}
/** Returns the {@link SpanLimits} that are currently applied to created spans. */
public SpanLimits getSpanLimits() {
return sharedState.getSpanLimits();
}
/** Returns the configured {@link Sampler}. */
public Sampler getSampler() {
return sharedState.getSampler();
}
/**
* Updates the tracer configurator, which computes {@link TracerConfig} for each {@link
* InstrumentationScopeInfo}.
*
* <p>This method is experimental so not public. You may reflectively call it using {@link
* SdkTracerProviderUtil#setTracerConfigurator(SdkTracerProvider, ScopeConfigurator)}.
*
* @see TracerConfig#configuratorBuilder()
*/
void setTracerConfigurator(ScopeConfigurator<TracerConfig> tracerConfigurator) {
this.tracerConfigurator = tracerConfigurator;
this.tracerSdkComponentRegistry
.getComponents()
.forEach(
sdkTracer ->
sdkTracer.updateTracerConfig(
getTracerConfig(sdkTracer.getInstrumentationScopeInfo())));
}
/**
* Attempts to stop all the activity for {@link Tracer}s created by this provider. Calls {@link
* SpanProcessor#shutdown()} for all registered {@link SpanProcessor}s.
*
* <p>The returned {@link CompletableResultCode} will be completed when all the Spans are
* processed.
*
* <p>After this is called, newly created {@code Span}s will be no-ops.
*
* <p>After this is called, further attempts at re-using this instance will result in undefined
* behavior. It should be considered a terminal operation for the SDK.
*
* @return a {@link CompletableResultCode} which is completed when all the span processors have
* been shut down.
*/
public CompletableResultCode shutdown() {
if (sharedState.hasBeenShutdown()) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return sharedState.shutdown();
}
/**
* Requests the active span processor to process all span events that have not yet been processed
* and returns a {@link CompletableResultCode} which is completed when the flush is finished.
*
* @see SpanProcessor#forceFlush()
*/
public CompletableResultCode forceFlush() {
return sharedState.getActiveSpanProcessor().forceFlush();
}
/**
* Attempts to stop all the activity for {@link Tracer}s created by this provider. Calls {@link
* SpanProcessor#shutdown()} for all registered {@link SpanProcessor}s.
*
* <p>This operation may block until all the Spans are processed. Must be called before turning
* off the main application to ensure all data are processed and exported.
*
* <p>After this is called, newly created {@code Span}s will be no-ops.
*
* <p>After this is called, further attempts at re-using this instance will result in undefined
* behavior. It should be considered a terminal operation for the SDK.
*/
@Override
public void close() {
shutdown().join(10, TimeUnit.SECONDS);
}
@Override
public String toString() {
// ...
}
}
스팬 (Span)
스팬은 어플리케이션이 수행하는 작업 단위를 의미합니다.
span들은 부모-자식 구조로 모여져 하나의 trace를 구성합니다.
OTel에서는 이벤트기반 설계같은 아키텍쳐도 커버하기 위해 부모-자식 관계를 Link관계라고도 표현합니다.
span은 hop마다 시작시간과 종료시간, 예외와 이벤트를 기록하는 역할이기에 추적에서 굉장히 중요한 요소입니다.
아래는 Elastic APM의 Tracing 관련 그림과 제가 작성한 코드인데 이를 보면 span이 어떤 느낌인지 쉽게 알 수 있습니다.

Span span = tracer.spanBuilder("span name")
.setSpanKind(SpanKind.INTERNAL) // Span 종류 설정 (Internal 기본값)
.setAttribute("com.acme.string-key", "value") // 속성 설정 (여러 타입 지원)
// ... 추가 속성들 설정 ...
.addLink(linkContext, linkAttributes) // 링크 추가 (옵션)
.startSpan(); // Span 시작
if (span.isRecording()) { // 샘플링되어 기록중인 경우에만
span.updateName("new span name"); // Span 이름 변경 (필요시)
span.setAttribute("com.acme.new-attr", "v"); // 동적으로 속성 추가
span.addLink(otherCtx); // 실행 중 링크 추가
span.addEvent("my-event"); // 이벤트 추가
span.addEvent("my-event", someAttributes); // 속성 포함 이벤트 추가
span.recordException(new RuntimeException("error")); // 예외 기록 이벤트
span.setStatus(StatusCode.OK, "done"); // 상태 설정 (OK 또는 ERROR)
}
span.end(); // Span 종료
아래는 실제 코드는 아니고, 이해를 돕기 위해 제가 적어본 Span이 생성되고 종료되는 과정에 대한 코드입니다.
우리는 Tracer의 spanBuilder메소드로 빌더를 만들고 빌더에 각 옵션값들을 넣은뒤 span을 실행시킵니다.
기록이 되어야하는 span이라면 스팬의 값들을 채우게됩니다.
OTel에서 Span의 이름은 낮은 카더널리티를 추구합니다.
만약 많은 정보가 들어가야한다면 Attribute속성 사용을 권장합니다.
그리고 span이 종료되지 않으면 메모리누수와 Trace가 종료되지 않는 문제가 생기기에 반드시 적절하게 종료되어야합니다.
Span-Context
trace에서 span을 식별하는 불변 객체입니다.
어떤 서비스가 다른 서비스에게 요청 할 때 Trace_ID와 Span_ID를 실어 보내면 수신 서비스에서는 이를 SpanContext로 받아들여 새로운 Span의 부모로 취급하게 됩니다.
Trace_ID, Span_ID, 샘플링 여부를 결정하는 TraceFlags, Trace의 상태값들을 저장하는 TraceState가 주된 구성요소이며,
TraceState는 key/value 리스트로 불변객체 특성상 새로운 key가 추가되면 새로운 TraceState를 반환하게 됩니다.
W3C 표준 형식에 맞게 작성해야합니다.
@Immutable
@AutoValue
public abstract class ImmutableSpanContext implements SpanContext {
public static final SpanContext INVALID =
createInternal(
TraceId.getInvalid(),
SpanId.getInvalid(),
TraceFlags.getDefault(),
TraceState.getDefault(),
/* remote= */ false,
/* valid= */ false);
private static AutoValue_ImmutableSpanContext createInternal(
String traceId,
String spanId,
TraceFlags traceFlags,
TraceState traceState,
boolean remote,
boolean valid) {
return new AutoValue_ImmutableSpanContext(
traceId, spanId, traceFlags, traceState, remote, valid);
}
public static SpanContext create(
String traceIdHex,
String spanIdHex,
TraceFlags traceFlags,
TraceState traceState,
boolean remote,
boolean skipIdValidation) {
if (skipIdValidation || (SpanId.isValid(spanIdHex) && TraceId.isValid(traceIdHex))) {
return createInternal(
traceIdHex, spanIdHex, traceFlags, traceState, remote, /* valid= */ true);
}
return createInternal(
TraceId.getInvalid(),
SpanId.getInvalid(),
traceFlags,
traceState,
remote,
/* valid= */ false);
}
@Override
public abstract boolean isValid();
}
SpanBuilder
span은 tracer에서 호출하는 SpanBuilder에서 만들어집니다.
빌더가 생성되면 위에서 살펴본 Tracer관련 설정 파라미터 4개를 받습니다.
아래서 부터는 Span에서 새롭게 등장하는 값들에 대해 알아보겠습니다.
class SdkSpanBuilder implements SpanBuilder {
private final String spanName;
private final InstrumentationScopeInfo instrumentationScopeInfo;
private final TracerSharedState tracerSharedState;
private final SpanLimits spanLimits;
@Nullable private Context parent;
private SpanKind spanKind = SpanKind.INTERNAL;
@Nullable private AttributesMap attributes;
@Nullable private List<LinkData> links;
private int totalNumberOfLinksAdded = 0;
private long startEpochNanos = 0;
SdkSpanBuilder(
String spanName,
InstrumentationScopeInfo instrumentationScopeInfo,
TracerSharedState tracerSharedState,
SpanLimits spanLimits) {
this.spanName = spanName;
this.instrumentationScopeInfo = instrumentationScopeInfo;
this.tracerSharedState = tracerSharedState;
this.spanLimits = spanLimits;
}
...
}
Attributes
Span의 전반적인 정보를 key/value로 보관하는 클래스입니다.
예를들어 HTTP라면 method = GET, status_code = 200 DB쿼리 span이라면 system = mysql, statement = SELECT 1 FROM DUAL 등 우리가 span하면 떠오르는 값들을 저장합니다.
이후 여기에 넣어둔 Attribute 값들을 클라이언트나 샘플러에서 쿼리, 필터링하여 이용하게 됩니다.
class SdkSpanBuilder implements SpanBuilder {
...
private AttributesMap attributes() {
AttributesMap attributes = this.attributes;
if (attributes == null) {
this.attributes =
AttributesMap.create(
spanLimits.getMaxNumberOfAttributes(), spanLimits.getMaxAttributeValueLength());
attributes = this.attributes;
}
return attributes;
}
...
}
}
Parent , Span-Link와 LinkData
Parent는 Span간의 부모-자식 관계, Span-Link는 일종의 원인-결과관계를 의미합니다.
일반적으로 Parent는 Span이 생성될때 부모의 spanContext를 가져와 세팅하고
Span-Link는 Fan-Out패턴 처럼 부모는 따로 있어도 지금 Span과 관련된 다른 Span들을 참조하고 싶을 때 사용합니다.
OTel에서 Parent는 spanContext 객체를 사용해 표현하고 Link는 LinkData라는 객체를 사용해서 Span-Link를 구현합니다.
LinkData는 SpanContext, Attribute 객체를 가지고 있으며, Span에서 LinkData의 배열을 가짐으로서 1대1이 아닌 1대N관계도 표현할 수 있습니다.
@AutoValue
@Immutable
abstract class ImmutableLinkData implements LinkData {
private static final Attributes DEFAULT_ATTRIBUTE_COLLECTION = Attributes.empty();
private static final int DEFAULT_ATTRIBUTE_COUNT = 0;
static LinkData create(SpanContext spanContext) {
return new AutoValue_ImmutableLinkData(
spanContext, DEFAULT_ATTRIBUTE_COLLECTION, DEFAULT_ATTRIBUTE_COUNT);
}
static LinkData create(SpanContext spanContext, Attributes attributes) {
return new AutoValue_ImmutableLinkData(spanContext, attributes, attributes.size());
}
static LinkData create(SpanContext spanContext, Attributes attributes, int totalAttributeCount) {
return new AutoValue_ImmutableLinkData(spanContext, attributes, totalAttributeCount);
}
ImmutableLinkData() {}
}
Link 세팅
spanBuilder에서 필요에 따라 SpanContext와 Attribute를 추가 할 수 있습니다.
class SdkSpanBuilder implements SpanBuilder {
...
@Override
public SpanBuilder addLink(SpanContext spanContext) {
if (spanContext == null || !spanContext.isValid()) {
return this;
}
addLink(LinkData.create(spanContext));
return this;
}
@Override
public SpanBuilder addLink(SpanContext spanContext, Attributes attributes) {
if (spanContext == null || !spanContext.isValid()) {
return this;
}
if (attributes == null) {
attributes = Attributes.empty();
}
int totalAttributeCount = attributes.size();
addLink(
LinkData.create(
spanContext,
AttributeUtil.applyAttributesLimit(
attributes,
spanLimits.getMaxNumberOfAttributesPerLink(),
spanLimits.getMaxAttributeValueLength()),
totalAttributeCount));
return this;
}
private void addLink(LinkData link) {
totalNumberOfLinksAdded++;
if (links == null) {
links = new ArrayList<>(spanLimits.getMaxNumberOfLinks());
}
// don't bother doing anything with any links beyond the max.
if (links.size() == spanLimits.getMaxNumberOfLinks()) {
return;
}
links.add(link);
}
...
}
Patent 세팅
tracer에서 span을 만드는 OTel의 예시 코드를 보게되면 Context 객체를 Link에 넣어줍니다.
Span-Link는 부모의 SpanContext를 세팅하지만, Link와 달리 Parent가 필요로 하는 Context는 SpanContext가 아닌 다른 Context, Context.java의 객체입니다.
Context객체는 ContextKey<V> 와 같은 형태로 구현되어있어 다양한 값을 저장 할 수 있는 불변객체이자 저장소입니다.
Span span = tracer.spanBuilder("sb")
.setParent(Context.current()) // 현재 컨텍스트를 부모로 설정
.startSpan();
Context.current() 메소드를 살펴보면 ContextStorage라는 저장소에서 Context객체를 꺼내오는걸 확인 할 수 있습니다.
public interface Context {
...
/** Return the context associated with the current {@link Scope}. */
static Context current() {
Context current = ContextStorage.get().current();
return current != null ? current : root();
}
...
}
ContextStorage
enum 싱글톤 방식을 사용하여 Context를 자바의 ThreadLocal에 보관하는 Stoarge입니다.
ThreadLocal에 보관하여 Context가 스레드별 전역참조가 가능하도록 합니다.
Context.current()메소드를 사용하면 THREAD_LOCAL_STORAGE에 보관되어 있는 Context객체를 리턴합니다.
Context객체는 주로 span객체 자체를 저장하는데 사용됩니다.
사실 Context 객체, Context.java는 프로젝트에서 interface 파일인데 구현체가 프로젝트에 없는 것 같아서,,
구글링 + 공식문서 + GPT로 찾아봤더니 Context는 immutable이고, 대부분 내부 구현체를 사용한다.
이 정도로만 설명이 되어있어서 흠,, Context구현체에 대해서는 조금 더 찾아봐야 할 것 같습니다 ㅠㅠ
enum ThreadLocalContextStorage implements ContextStorage {
INSTANCE;
private static final Logger logger = Logger.getLogger(ThreadLocalContextStorage.class.getName());
private static final ThreadLocal<Context> THREAD_LOCAL_STORAGE = new ThreadLocal<>();
@Override
public Scope attach(Context toAttach) {
if (toAttach == null) {
// Null context not allowed so ignore it.
return NoopScope.INSTANCE;
}
Context beforeAttach = current();
if (toAttach == beforeAttach) {
return NoopScope.INSTANCE;
}
THREAD_LOCAL_STORAGE.set(toAttach);
return new ScopeImpl(beforeAttach, toAttach);
}
private class ScopeImpl implements Scope {
@Nullable private final Context beforeAttach;
private final Context toAttach;
private boolean closed;
private ScopeImpl(@Nullable Context beforeAttach, Context toAttach) {
this.beforeAttach = beforeAttach;
this.toAttach = toAttach;
}
@Override
public void close() {
if (!closed && current() == toAttach) {
closed = true;
THREAD_LOCAL_STORAGE.set(beforeAttach);
} else {
logger.log(
Level.FINE,
" Trying to close scope which does not represent current context. Ignoring the call.");
}
}
}
@Override
@Nullable
public Context current() {
return THREAD_LOCAL_STORAGE.get();
}
enum NoopScope implements Scope {
INSTANCE;
@Override
public void close() {}
}
}
Context와 Scope
Scope는 이전 Context를 기억하고 있는 객체입니다.
현재 Span을 Context에 값을 저장하기 위해서는,
Span이 상속받는 인터페이스 중 하나인 ImplicitContextKeyed의 makeCurrent()메소드를 통해 현재 span을 Context에 저장합니다.
여기서 this는 Span입니다.
public interface ImplicitContextKeyed {
default Scope makeCurrent() {
return Context.current().with(this).makeCurrent();
}
}
makeCurrent메소드가 종료되면 Scope가 반환되는데 Scoep가 close되기 전 까지는 지금 Span이 현재 Thread에서 Current상태가 됩니다. close되면 다시 이전 상태의 Context가 Current가 됩니다.
전체적인 흐름으로 보면 이렇습니다.
// 현재 Context를 부모로 설정 하여 span생성
Span span = tracer.spanBuilder("sb")
.setParent(Context.current())
.startSpan();
// 해당 span을 Context에 등록
try (Scope scope = span.makeCurrent()) {
...
} finally {
span.end(); // 이전 Context로 돌아감
}
하지만 일반적으로 Context들은 ThreadLocalStoarge에 저장되기에 만약 trace가 다른 스레드로 넘어가거나, http요청으로 인해 다른 서비스로 가야할때는 Context를 wrapp해서 전달하는 등 다른 방식이 필요합니다.
Span Status (스팬 상태)
public enum StatusCode {
UNSET,
OK,
ERROR
}
기본값은 UNSET이고 오류 없음을 의미합니다.
즉 별도로 status가 지정되지 않은 스팬은 오류없음, 즉 성공 스팬으로 취급합니다.
OTel에서도 오류가 있을때만 ERROR로 사용하도록 권고합니다.
span에서 status의 우선순위는 OK, ERROR, UNSET 이며 이미 OK상태가 된 span은 ERROR로 변경해도 무시됩니다.
Span Kind
SpanKind는 5가지 종류 Client, Server, Internal, Producer, Consumer가 있으며 Span이 어디서 실행되는지 구분하는 역할을 합니다.
public enum SpanKind {
INTERNAL,
SERVER,
CLIENT,
PRODUCER,
CONSUMER
}
기본 SpanKind는 Internal입니다.
메시징 아키텍쳐에서 사용하기 위한 producer, consumer 타입도 존재합니다.
OTel에서 HTTP모듈, 데이터베이스를 향한 호출은 CLIENT로 처리한다고 합니다.
또 RPC의 양 끝점은 CLIENT, SERVER로 처리한다고 합니다.
대신 SpanKind를 세팅 할 때 한가지 주의 사항이 있는데 하나의 스팬은 하나의 역할만 해야한다는 것 입니다.
즉 아래와 같은 상황이 되면 서버 스팬이 일종의 클라이언트역할까지 하게됩니다.
이런 경우에는 클라이언트 스팬을 따로 만들어 httpCall을 해야합니다.
Span span = tracer.spanBuilder("processRequest")
.setSpanKind(SpanKind.SERVER) // 서버 스팬
.startSpan();
// 이 안에서 외부로 HTTP 요청을 날림
makeHttpCall(); // CLIENT 역할도 동시에 수행하는 코드
span.end();
Span Events
span의 타임라인에서 발생한 시점의 이벤트를 의미합니다.
이벤트는 이름, 타임스탬프, Attribute를 가집니다.
span의 로그정도라고 생각하면 될 것 같습니다.
Span span = tracer.spanBuilder("processOrder")
.startSpan();
try (Scope scope = span.makeCurrent()) {
// 이런식으로 로그처럼 사용합니다.
span.addEvent("order-fetched-from-db");
processBusinessLogic();
span.addEvent("order-validated");
callExternalPaymentService();
span.addEvent("payment-completed");
} finally {
span.end();
}
Span Attribute(속성)와 Span Event(이벤트) 의 차이
Span에는 이미 Event와 비슷한 역할을 할 수 있는 Attribute가 있습니다.
두개의 쓰임새는 상황에 따라 조금 달라집니다.
간단하게 이야기하면 특정 타이밍이 중요한 정보라면 Event, 시간과 무관한 정보라면 Attribute에 넣는게 권장됩니다.
어떤 일이 발생한 순간 자체가 중요한 경우 이벤트를 사용하고, 어떤 상태나 결과를 나타내는 정보로서 시점이 중요하지 않다면 속성이 적절합니다.
OTel에서 Attribute와 Event를 구분하는 몇가지 기준이 있습니다.
- 시점이 의미 있는가? : Y 이벤트 , N : 속성
- 값이 변하는가? : Y : 이벤트, N : 속성
- 발생 기록: 이벤트, 단순 설명 : 속성
- 로그성 : 이벤트, 대표값: 속성
- 순간: 이벤트, 기간 : 속성
startSpan() (SdkSpan에서 startSpan()코드도 살펴보기)
이제 span의 속성들을 알아봤으니, span을 시작하는 startSpan 메소드를 살펴보려합니다.
class SdkSpanBuilder implements SpanBuilder {
...
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Span startSpan() {
Context parentContext = parent == null ? Context.current() : parent;
Span parentSpan = Span.fromContext(parentContext);
SpanContext parentSpanContext = parentSpan.getSpanContext();
String traceId;
IdGenerator idGenerator = tracerSharedState.getIdGenerator();
String spanId = idGenerator.generateSpanId();
if (!parentSpanContext.isValid()) {
// New root span.
traceId = idGenerator.generateTraceId();
} else {
// New child span.
traceId = parentSpanContext.getTraceId();
}
List<LinkData> currentLinks = links;
List<LinkData> immutableLinks =
currentLinks == null ? Collections.emptyList() : Collections.unmodifiableList(currentLinks);
// Avoid any possibility to modify the links list by adding links to the Builder after the
// startSpan is called. If that happens all the links will be added in a new list.
links = null;
Attributes immutableAttributes = attributes == null ? Attributes.empty() : attributes;
SamplingResult samplingResult =
tracerSharedState
.getSampler()
.shouldSample(
parentContext, traceId, spanName, spanKind, immutableAttributes, immutableLinks);
SamplingDecision samplingDecision = samplingResult.getDecision();
TraceState samplingResultTraceState =
samplingResult.getUpdatedTraceState(parentSpanContext.getTraceState());
SpanContext spanContext =
ImmutableSpanContext.create(
traceId,
spanId,
isSampled(samplingDecision) ? TraceFlags.getSampled() : TraceFlags.getDefault(),
samplingResultTraceState,
/* remote= */ false,
tracerSharedState.isIdGeneratorSafeToSkipIdValidation());
if (!isRecording(samplingDecision)) {
return Span.wrap(spanContext);
}
Attributes samplingAttributes = samplingResult.getAttributes();
if (!samplingAttributes.isEmpty()) {
samplingAttributes.forEach((key, value) -> attributes().put((AttributeKey) key, value));
}
// Avoid any possibility to modify the attributes by adding attributes to the Builder after the
// startSpan is called. If that happens all the attributes will be added in a new map.
AttributesMap recordedAttributes = attributes;
attributes = null;
return SdkSpan.startSpan(
spanContext,
spanName,
instrumentationScopeInfo,
spanKind,
parentSpan,
parentContext,
spanLimits,
tracerSharedState.getActiveSpanProcessor(),
tracerSharedState.getExceptionAttributesResolver(),
tracerSharedState.getClock(),
tracerSharedState.getResource(),
recordedAttributes,
currentLinks,
totalNumberOfLinksAdded,
startEpochNanos);
}
}
...
}
코드가 매우 길지만, 전체적으로 하는일은 아래와 같습니다.
1.부모 Context 확인
부모의 context를 가져와서 세팅합니다. 없다면 Current.Context를 호출해서 Context를 사용합니다.
2. SpanId / TraceId 생성
부모가 있다면 TraceId를 그대로 사용하고 없다면 IdGenerator를 사용해 각 ID를 만듭니다.
3. 샘플링(Sampling)
샘플링 방식을 지정합니다. 샘플링에 대해서는 별도 포스팅으로 다루려고 합니다.
4. SpanContext 생성
SpanContext를 만듭니다.
5.기록 여부 판단 → SdkSpan 생성 or NoopSpan 리턴
recording 여부를 판단합니다. 이 부분도 샘플링에서 같이 다루려고 합니다.
배기지 (Baggage)
배기지는 컨텍스트에 정의되는 key-value 정보입니다.
Context 인터페이스의 with 메소드로 Baggage를 Context에 저장 할 수 있습니다.
Baggage에 자체 메소드로도 Context에 추가 할 수 있습니다.
사용자ID나 테넌트ID같은 값을 Baggage에 넣어두면, 해당 추적의 Trace를 따라가는 모든 스팬에서도 Baggage를 참조 할 수 있습니다.
<V> Context with(ContextKey<V> k1, V v1);
Baggage는 http모듈을 통해 외부 서비스로 전달될때는 W3CBaggagePropagator를 이용해 헤더에 값이 들어가서 전파됩니다.
그렇기에 무분별하게 많은 데이터나 민감한 데이터를 넣는 것은 권장되지 않습니다.
baggage: userId=alice,projectId=telemetry
메트릭 (Metric)
Metric은 기본적으로 Trace와 관계 없이 동작합니다.
메트릭은 주기적으로 수집되는데 특정 조건((timeout 등))에 맞는 메트릭이 수집되면 해당 샘플에 메트릭 값을 추가하여 클라이언트에서 관측 가능하도록 합니다.
APM에서는 Metric은 Exporter를 통해 자체적인 클라이언트에 보낼 수도 있고, 아니면 프로메테우스같은 메트릭 수집기에 전달 할 수도 있습니다.
cpu와 메모리는 비동기 측정 방식을 사용하고 Count와 같은 메트릭은 동기적 측정 방식을 사용합니다.
Meter
사용할 메트릭 계측기들을 만드는 인터페이스입니다.
메트릭 계측기들은 동기, 비동기를 선택해 사용할 수 있습니다.
- LongCounter : 카운터, 누적치 계산
- LongUpDownCounter : 업다운 카운터, 증가 감소 둘다 있음
- DoubleHistogram : 히스토그램, 분포를 보고 싶은 값 기록용
- DoubleGauge : 게이지, 현재 값 확인
package io.opentelemetry.api.metrics;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public interface Meter {
LongCounterBuilder counterBuilder(String name);
LongUpDownCounterBuilder upDownCounterBuilder(String name);
DoubleHistogramBuilder histogramBuilder(String name);
DoubleGaugeBuilder gaugeBuilder(String name);
default BatchCallback batchCallback(
Runnable callback,
ObservableMeasurement observableMeasurement,
ObservableMeasurement... additionalMeasurements) {
return DefaultMeter.getInstance()
.batchCallback(callback, observableMeasurement, additionalMeasurements);
}
}
LongCounter
계측기가 여러개 있지만, 그 중 LongCounter의 구현체 하나만 알아보려고 합니다.
class SdkLongCounter extends AbstractInstrument implements LongCounter {
private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
final SdkMeter sdkMeter;
final WriteableMetricStorage storage;
SdkLongCounter(
InstrumentDescriptor descriptor, SdkMeter sdkMeter, WriteableMetricStorage storage) {
super(descriptor);
this.sdkMeter = sdkMeter;
this.storage = storage;
}
...
}
WriteableMetricStorage, sdkMeter, InstrumentDescriptor
WriteableMetricStorage는 들어온 메트릭을 Attributes와 Context를 기준으로 모아두는 storage입니다.
동기 비동기 각 상황에 맞게 다른 구현체가 존재하고 있습니다
Context를 파라미터로 받기에 이 값을 통해 추적에 메트릭을 추가 할 수 있습니다.
SdkMeter는 Meter의 구현체로 각 메트릭 수집기들이 등록되며 storage들과 비동기에 사용될 콜백함수도 등록됩니다.
InstrumentDescriptor 메타데이터를 담는 객체입니다.
public interface WriteableMetricStorage {
/** Records a measurement. */
void recordLong(long value, Attributes attributes, Context context);
/** Records a measurement. */
void recordDouble(double value, Attributes attributes, Context context);
/**
* Returns {@code true} if the storage is actively recording measurements, and {@code false}
* otherwise (i.e. noop / empty metric storage is installed).
*/
boolean isEnabled();
}
add()
Context와 Attributes를 key로 사용해서 값을 올리는 간단한 메소드입니다.
...
@Override
public void add(long increment, Attributes attributes, Context context) {
if (increment < 0) {
throttlingLogger.log(
Level.WARNING,
"Counters can only increase. Instrument "
+ getDescriptor().getName()
+ " has recorded a negative value.");
return;
}
storage.recordLong(increment, attributes, context);
}
...
SdkLongCounterBuilder
클래스 내부에 static class로 빌더가 존재합니다.
여기서 buildWithCallback 함수를 빌더에 추가하면 비동기 측정 방식으로 사용 할 수도 있습니다.
동기로 측정하면 런타임에 add() 가 호출되고 비동기로 사용하면 수집 주기마다 콜백을 호출하는 방식으로 측정합니다.
...
static class SdkLongCounterBuilder implements LongCounterBuilder {
@Override
public LongCounterBuilder setDescription(String description) {
builder.setDescription(description);
return this;
}
@Override
public LongCounterBuilder setUnit(String unit) {
builder.setUnit(unit);
return this;
}
@Override
public SdkLongCounter build() {
return builder.buildSynchronousInstrument(SdkLongCounter::new);
}
@Override
public DoubleCounterBuilder ofDoubles() {
return builder.swapBuilder(SdkDoubleCounter.SdkDoubleCounterBuilder::new);
}
@Override
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
return builder.buildLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
}
@Override
public ObservableLongMeasurement buildObserver() {
return builder.buildObservableMeasurement(InstrumentType.OBSERVABLE_COUNTER);
}
@Override
public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
PeriodicMetricReader
지정한 주기마다 Metric을 읽어서 Exporter로 전달합니다.
생성자에서 Expoter와 Interval, Scheduler를 받습니다.
doRun() 메소드를 사용해 메트릭 수집과 추출을 진행합니다.
public final class PeriodicMetricReader implements MetricReader {
private static final Logger logger = Logger.getLogger(PeriodicMetricReader.class.getName());
private final MetricExporter exporter;
private final long intervalNanos;
private final ScheduledExecutorService scheduler;
private final Scheduled scheduled;
private final Object lock = new Object();
private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();
@Nullable private volatile ScheduledFuture<?> scheduledFuture;
public static PeriodicMetricReader create(MetricExporter exporter) {
return builder(exporter).build();
}
public static PeriodicMetricReaderBuilder builder(MetricExporter exporter) {
return new PeriodicMetricReaderBuilder(exporter);
}
PeriodicMetricReader(
MetricExporter exporter, long intervalNanos, ScheduledExecutorService scheduler) {
this.exporter = exporter;
this.intervalNanos = intervalNanos;
this.scheduler = scheduler;
this.scheduled = new Scheduled();
}
...
}
Flush, Shutdown, Register
Register는 Provider가 storage에 저장되어 있던 메트릭을 가져와서 Exporter에 넘기는 작업, 즉 Reader의 작업을 실행합니다.
forceFlush 메소드는 지금 시점에 강제로 수집과 추출을 진행합니다.
내부적으로 스케줄러의 doRun을 실행시키고, Exporter.flush()를 실행시켜서 즉시 추출을 진행합니다.
shutdown()은 주기 스케줄을 취소하여 수집을 중지합니다.
스케줄러를 종료 대기 상태로 바꾸고 이후 doRun을 실행시킵니다.
마지막으로 exporter.shutdown()을 호출합니다.
public final class PeriodicMetricReader implements MetricReader {
...
@Override
public CompletableResultCode forceFlush() {
CompletableResultCode result = new CompletableResultCode();
CompletableResultCode doRunResult = scheduled.doRun();
doRunResult.whenComplete(
() -> {
CompletableResultCode flushResult = exporter.flush();
flushResult.whenComplete(
() -> {
if (doRunResult.isSuccess() && flushResult.isSuccess()) {
result.succeed();
} else {
result.fail();
}
});
});
return result;
}
@Override
public CompletableResultCode shutdown() {
CompletableResultCode result = new CompletableResultCode();
ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
scheduler.shutdown();
try {
scheduler.awaitTermination(5, TimeUnit.SECONDS);
CompletableResultCode flushResult = scheduled.doRun();
flushResult.join(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// force a shutdown if the export hasn't finished.
scheduler.shutdownNow();
// reset the interrupted status
Thread.currentThread().interrupt();
} finally {
CompletableResultCode shutdownResult = scheduled.shutdown();
shutdownResult.whenComplete(
() -> {
if (!shutdownResult.isSuccess()) {
result.fail();
} else {
result.succeed();
}
});
}
return result;
}
@Override
public void register(CollectionRegistration collectionRegistration) {
this.collectionRegistration = collectionRegistration;
start();
}
void start() {
synchronized (lock) {
if (scheduledFuture != null) {
return;
}
scheduledFuture =
scheduler.scheduleAtFixedRate(
scheduled, intervalNanos, intervalNanos, TimeUnit.NANOSECONDS);
}
}
...
}
스케줄러
스케줄러는 틱 마다 run()을 호출되고 내부에서 doRun()을 호출합니다.
ScheduledExecutorService 라는 타이머가 있는 스레드 풀을 사용해서
scheduleAtFixedRate만큼의 간격마다 doRun을 실행합니다.
스레드 풀의 스레드 수는 보통 하나를 사용한다고 합니다.
doRun 메소드를 살펴보면
exportAvailable값을 읽어 false면, 즉 아직 export되지 않은 사이클이 있다면, 이번 사이클을 drop합니다.
Reader는 Exporter와 독립적인 관계이고 만약 Exporter가 느려지는 등 이슈가 발생은 Reader에서 알 수 없습니다.
그렇기에 추가적인 부하를 줄 수 있는 상황이라고 판단되면 drop을 진행합니다.
추출이 가능하다면, Storage에서 모든 메트릭을 가져와 Exporter에 전송합니다.
그 결과는 비동기 콜백으로 전달받습니다.
이후 exportAvailable값을 true로 변경합니다.
private final class Scheduled implements Runnable {
private final AtomicBoolean exportAvailable = new AtomicBoolean(true);
private Scheduled() {}
@Override
public void run() {
// Ignore the CompletableResultCode from doRun() in order to keep run() asynchronous
doRun();
}
// Runs a collect + export cycle.
CompletableResultCode doRun() {
CompletableResultCode flushResult = new CompletableResultCode();
if (exportAvailable.compareAndSet(true, false)) {
try {
Collection<MetricData> metricData = collectionRegistration.collectAllMetrics();
if (metricData.isEmpty()) {
logger.log(Level.FINE, "No metric data to export - skipping export.");
flushResult.succeed();
exportAvailable.set(true);
} else {
CompletableResultCode result = exporter.export(metricData);
result.whenComplete(
() -> {
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
}
flushResult.succeed();
exportAvailable.set(true);
});
}
} catch (Throwable t) {
exportAvailable.set(true);
logger.log(Level.WARNING, "Exporter threw an Exception", t);
flushResult.fail();
}
} else {
logger.log(Level.FINE, "Exporter busy. Dropping metrics.");
flushResult.fail();
}
return flushResult;
}
CompletableResultCode shutdown() {
return exporter.shutdown();
}
}
OtlpHTTPMetricExporter
Reader가 넘긴 데이터를 OTLP포맷으로 변환해 Collector에 전송합니다.
Collector에서 다시 Prometheous, Datadog으로 전송 가능합니다.
MetricReusableDataMarshaler를 사용해 데이터를 OTLP 바이트로 마샬링 후 GrpcExporter를 사용해Http, gRPC등 적절한 채널로 실어 보냅니다.
AggregationTemporalitySelector를 통해 CUMULATIVE, DELTA 방식을 선택합니다.
CUMULATIVE 방식은 총합을 유지하는 방식이고 DELTA 방식은 직전 값과 이전값의 차이만 보내는 방식입니다.
DefaultAggregationSelector로 계측기의 타입별 기본 집계 방식을 선택합니다.
마샬링 시 메모리를 REUSEABLE, 재사용해서 G.C의 부하를 줄이지만 데이터가 덮어 쓰거나
IMMUTABLE, 메모리를 바꾸지 않아 안전하지만 G.C의 부하를 조금 줄지 결정합니다.
@ThreadSafe
public final class OtlpHttpMetricExporter implements MetricExporter {
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
// Visible for testing
final AggregationTemporalitySelector aggregationTemporalitySelector;
// Visible for testing
final DefaultAggregationSelector defaultAggregationSelector;
private final MetricReusableDataMarshaler marshaler;
OtlpHttpMetricExporter(
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
AggregationTemporalitySelector aggregationTemporalitySelector,
DefaultAggregationSelector defaultAggregationSelector,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.aggregationTemporalitySelector = aggregationTemporalitySelector;
this.defaultAggregationSelector = defaultAggregationSelector;
this.marshaler = new MetricReusableDataMarshaler(memoryMode, delegate::export);
}
/**
* Returns a new {@link OtlpHttpMetricExporter} using the default values.
*
* <p>To load configuration values from environment variables and system properties, use <a
* href="https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure">opentelemetry-sdk-extension-autoconfigure</a>.
*
* @return a new {@link OtlpHttpMetricExporter} instance.
*/
public static OtlpHttpMetricExporter getDefault() {
return builder().build();
}
/**
* Returns a new builder instance for this exporter.
*
* @return a new builder instance for this exporter.
*/
public static OtlpHttpMetricExporterBuilder builder() {
return new OtlpHttpMetricExporterBuilder();
}
/**
* Returns a builder with configuration values equal to those for this exporter.
*
* <p>IMPORTANT: Be sure to {@link #shutdown()} this instance if it will no longer be used.
*
* @since 1.29.0
*/
public OtlpHttpMetricExporterBuilder toBuilder() {
return new OtlpHttpMetricExporterBuilder(
builder.copy(),
aggregationTemporalitySelector,
defaultAggregationSelector,
marshaler.getMemoryMode());
}
@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return aggregationTemporalitySelector.getAggregationTemporality(instrumentType);
}
@Override
public Aggregation getDefaultAggregation(InstrumentType instrumentType) {
return defaultAggregationSelector.getDefaultAggregation(instrumentType);
}
@Override
public MemoryMode getMemoryMode() {
return marshaler.getMemoryMode();
}
...
}
Exporter의 주요 메소드들
export(Collection<MetricData> metrics)
한 사이클에 있는 배치를 Marshaler가 직렬화하고 즉시 전송합니다.
리턴값은 CompletableResultCode, 비동기로 전달 받습니다.
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return marshaler.export(metrics);
}
flush()
gRPC같은 경우는 내부적으로 버퍼를 둬서 값을 보내는데 위와 같은 경우, 버퍼에 남아있는 데이터를 모두 전송합니다.
HTTP는 버퍼를 사용하지 않기에 즉시 리턴값을 받습니다.
리턴값은 동기로 받습니다.
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
shutdown()
메모리 누수 등을 방지하기 위해 사용하던 전송 채널, 스레드, 버퍼 등을 닫습니다.
@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
로그 (Log)
OTel의 레코드는 일반적으로 생각하는 서비스의 printf 와 같은 문자열 로그가 아니라
구조화된 이벤트 값입니다.
LogRecordBuilder
로그의 값을 담고, emit() 메소드를 통해 로그를 Collector로 전송합니다.
로그에는 로그 발생 시각, 관측 시각, 심각도, 본문 메시지, TraceId, SpanId, Flag, 환경 정보, 이벤트 명과 같은 값들이 들어갑니다.
/** SDK implementation of {@link LogRecordBuilder}. */
class SdkLogRecordBuilder implements LogRecordBuilder {
protected final LoggerSharedState loggerSharedState;
protected final LogLimits logLimits;
protected final InstrumentationScopeInfo instrumentationScopeInfo;
protected long timestampEpochNanos;
protected long observedTimestampEpochNanos;
@Nullable protected Context context;
protected Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
@Nullable protected String severityText;
@Nullable protected Value<?> body;
@Nullable protected String eventName;
@Nullable private AttributesMap attributes;
SdkLogRecordBuilder(
LoggerSharedState loggerSharedState, InstrumentationScopeInfo instrumentationScopeInfo) {
this.loggerSharedState = loggerSharedState;
this.logLimits = loggerSharedState.getLogLimits();
this.instrumentationScopeInfo = instrumentationScopeInfo;
}
@Override
public SdkLogRecordBuilder setEventName(String eventName) {
this.eventName = eventName;
return this;
}
@Override
public SdkLogRecordBuilder setTimestamp(long timestamp, TimeUnit unit) {
this.timestampEpochNanos = unit.toNanos(timestamp);
return this;
}
@Override
public SdkLogRecordBuilder setTimestamp(Instant instant) {
this.timestampEpochNanos =
TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
return this;
}
@Override
public LogRecordBuilder setObservedTimestamp(long timestamp, TimeUnit unit) {
this.observedTimestampEpochNanos = unit.toNanos(timestamp);
return this;
}
@Override
public LogRecordBuilder setObservedTimestamp(Instant instant) {
this.observedTimestampEpochNanos =
TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
return this;
}
@Override
public SdkLogRecordBuilder setContext(Context context) {
this.context = context;
return this;
}
@Override
public SdkLogRecordBuilder setSeverity(Severity severity) {
this.severity = severity;
return this;
}
@Override
public SdkLogRecordBuilder setSeverityText(String severityText) {
this.severityText = severityText;
return this;
}
@Override
public SdkLogRecordBuilder setBody(String body) {
return setBody(Value.of(body));
}
@Override
public SdkLogRecordBuilder setBody(Value<?> value) {
this.body = value;
return this;
}
@Override
public <T> SdkLogRecordBuilder setAttribute(AttributeKey<T> key, @Nullable T value) {
if (key == null || key.getKey().isEmpty() || value == null) {
return this;
}
if (this.attributes == null) {
this.attributes =
AttributesMap.create(
logLimits.getMaxNumberOfAttributes(), logLimits.getMaxAttributeValueLength());
}
this.attributes.put(key, value);
return this;
}
...
}
로그 json 예시
{
"time_unix_nano": "1723782150000000000",
"observed_time_unix_nano": "1723782151000000000",
"severity_number": 17,
"severity_text": "ERROR",
"body": {
"stringValue": "Payment failed"
},
"attributes": [
{
"key": "order.id",
"value": { "stringValue": "912345" }
},
{
"key": "user.id",
"value": { "intValue": "1001" }
}
],
"trace_id": "4fd0b9b76e4c9f6cb1f4431a6a68e5b2",
"span_id": "6e0c63257de34c92",
"flags": 1,
"resource": {
"attributes": [
{
"key": "service.name",
"value": { "stringValue": "payment-service" }
},
{
"key": "service.version",
"value": { "stringValue": "1.2.3" }
}
]
},
"instrumentation_scope": {
"name": "io.opentelemetry.example.PaymentLogger",
"version": "1.0.0",
"schemaUrl": "https://opentelemetry.io/schemas/1.18.0"
},
"event_name": "order.payment.failed"
}
emit()
만들어진 로그를 Collector로 전송합니다.
지정하지 않은 속성들은 emit()가 실행될 때 자동으로 채우고 LogRecord객체로 만듭니다.
이후 Processor로 넘깁니다.
public void emit() {
if (loggerSharedState.hasBeenShutdown()) { return; }
Context ctx = (this.context == null) ? Context.current() : this.context;
long observed = (this.observedTimestampEpochNanos == 0)
? loggerSharedState.getClock().now()
: this.observedTimestampEpochNanos;
loggerSharedState.getLogRecordProcessor().onEmit(
ctx,
SdkReadWriteLogRecord.create(
loggerSharedState.getLogLimits(),
loggerSharedState.getResource(),
instrumentationScopeInfo,
timestampEpochNanos,
observed,
Span.fromContext(ctx).getSpanContext(), // traceId/spanId 여기서 추출
severity,
severityText,
body,
attributes,
eventName));
}
LogRecordProcessor
LogRecord를 emit()을 통해 전달받습니다.
이후 LogRecord들을 Exporter를 통해 Collector나 외부로 전송합니다.
Processor가 SimpleLogRecordProcessor라면 즉시 Exporter로 보내고
BatchLogRecordProcessor라면 큐에 넣고 배치 단위로 Exporter로 전송합니다.
Composite
SimpleLogRecordProcessor, BatchLogRecordProcessor, 사용자 정의 Processor 등
여러개의 LogRecordProcessor를 하나로 묶어주는 역할을 합니다.
아래는 composite을 사용해서 SdkLoggerProvider를 만드는 예시코드입니다.
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
LogRecordProcessor.composite(
SimpleLogRecordProcessor.create(exporterA),
BatchLogRecordProcessor.builder(exporterB).build()
)
)
.build();
@ThreadSafe
public interface LogRecordProcessor extends Closeable {
static LogRecordProcessor composite(LogRecordProcessor... processors) {
return composite(Arrays.asList(processors));
}
static LogRecordProcessor composite(Iterable<LogRecordProcessor> processors) {
List<LogRecordProcessor> processorList = new ArrayList<>();
for (LogRecordProcessor processor : processors) {
processorList.add(processor);
}
if (processorList.isEmpty()) {
return NoopLogRecordProcessor.getInstance();
}
if (processorList.size() == 1) {
return processorList.get(0);
}
return MultiLogRecordProcessor.create(processorList);
}
...
}
onEmit
로그를 Exporter로 전송합니다.
구현체 ((SimpleLogRecordProcessor, BatchLogRecordProcessor))에 따라
바로 Exporter로 전송하거나 큐에 적재하고 나중에 Exporter로 전송합니다.
...
void onEmit(Context context, ReadWriteLogRecord logRecord);
...
}
SimpleLogRecordProcessor
logRecord.toLogRecordData()을 통해 로그 리스트로 변환합니다.
logRecordExporter.export(logs) 추출합니다.
반환된 CompletableResultCode를 pendingExports 콜백을 전달받습니다.
결과적으로 로그 한 줄당 Export가 한번씩 발생합니다.
@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
try {
List<LogRecordData> logs = Collections.singletonList(logRecord.toLogRecordData());
CompletableResultCode result;
synchronized (exporterLock) {
result = logRecordExporter.export(logs);
}
pendingExports.add(result);
result.whenComplete(
() -> {
pendingExports.remove(result);
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
}
});
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
}
}
BatchLogRecordProcessor
SimpleLogRecordProcessor는 들어온 로그를 바로 전송하지만,
BatchLogRecordProcessor는 큐에 넣어두고 일정 크기만큼 모였거나, 특정 주기가 지나면 배치를 통해 전송합니다.
BatchLogRecordProcessor는 고정 길이의 큐를 가지고 있는데,
큐가 가득 차 있는 상태에서 새로운 로그가 emit()되면 drop하게 됩니다.
drop된 로그의 내용은 기록하지는 않지만 drop된 count는 체크합니다.
워커 스레드를 하나 만드는데, 해당 스레드는 백그라운드에서 계속 큐를 바라봅니다.
추가적인 스레드를 사용하기에 emit() 호출은 큐에 넣는 작업만 하게되고 export하는 호출은 만들어진 워커 스레드에서 백그라운드에서 처리합니다.
이렇게 메인스레드와 워커스레드를 분리하기에, 실제 추적 시스템이 존재하는 메인스레드의 부하를 줄일 수 있습니다.
배치도 maxExportBatchSize 라는 크기와 scheduleDelayNanos라는 주기를 기준으로 워커 스레드가 배치의 값들을 진짜 exporter로 전송합니다.
run()
먼저 updateNextExportTime()을 사용해서 scheduleDelayNanos 값을 기준으로 다음 export시간을 계산합니다.
이후 내부 큐에서 로그를 배치 리스트에 담습니다.
이때 maxExportBatchSize 차거나, 지정된 시간, scheduleDelayNanos이 경과하면 내보냅니다.
배치에 모인 로그가 Exporter로 전달되면, 성공 여부에 따라 processedLogsCounter를 기록합니다.
큐가 비어 있으면 다음 export 시간까지 대기하고, 일시적으로 대기((poll))상태로 들어갑니다.
이 때 워커 스레드는 block상태가 되는게 emit()메소드가 실행되며 signal값이 true가 되면 run()이 다시 실행됩니다.
...
@Override
public void run() {
updateNextExportTime();
while (continueWork) {
if (flushRequested.get() != null) {
flush();
}
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) {
batch.add(queue.poll().toLogRecordData());
}
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
updateNextExportTime();
}
if (queue.isEmpty()) {
try {
long pollWaitTime = nextExportTime - System.nanoTime();
if (pollWaitTime > 0) {
logsNeeded.set(maxExportBatchSize - batch.size());
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
logsNeeded.set(Integer.MAX_VALUE);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
...
자동 연동(로깅 프레임워크 브리지)
OTel은 OTel Java 에이전트를 사용해, 서비스에서 사용하는 로깅 프레임워크에 자동으로 TraceId, SpanId를 주입해줍니다.
자동연동을 사용하는 방법은 간단한데 -javaagent:opentelemetry-javaagent.jar 와 같은 명령어를 사용합니다.
아니면 xml파일에 추가하는 방법도 존재합니다.
아래는 logback에 OTel의 TraceId, SpanId를 추가하는 예시입니다.
<!-- logback.xml -->
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder> <!-- %X{trace_id}, %X{span_id} : MDC 값 --> <pattern>%d %-5level [%thread] trace=%X{trace_id} span=%X{span_id} %logger - %msg%n</pattern> </encoder> </appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/> </root>
</configuration>
OTel은 MDC를 사용해서 TraceId와 SpanId를 저장합니다.
MDC는 SLF4J 계열에서 사용하는 스레드 로컬 기반 key-value 저장소입니다.
예를 들어 지금 스레드에서 MDC.put("userId", "1") 을 저장하면
해당 스레드에서 찍는 모든 로그 패턴에서 {userId}와 같은 방식으로 value를 사용할 수 있게되는데
이를 사용해 OTel 에이전트에서 현재 TraceContext((TraceId, SpanId)) MDC에 자동으로 주입하여 로깅 프레임워크와 연동됩니다.
참조 : https://opentelemetry.io/docs/concepts/signals/traces/
'APM만들기' 카테고리의 다른 글
| 피쳐 추출 시스템에 대해 알아보자 (0) | 2025.08.29 |
|---|---|
| 피벗 추적에 대해 알아보자 (0) | 2025.08.03 |
| 분산추적의 샘플링에 대해서 알아보자 (0) | 2025.07.18 |
| 분산추적에서 다른 프로세스, 외부 서비스간 전파는 어떻게 이루어질까 (0) | 2025.06.29 |