Search
🌍

Langfuse @observe를 조건부로 우회하며 벤더 중립적인 계측 경계 설계하기

컨트롤러는 번역 유즈케이스를 여러 방식으로 호출합니다. 일반 번역 경로에서는 최종 번역 문자열만 반환하면 되지만, 스트리밍 번역 경로에서는 번역 결과를 chunk 단위로 계속 흘려보내야 합니다. 그런데 두 경로 모두 내부적으로는 같은 번역 유즈케이스를 사용하고, 그 안에서는 번역 요청 문장에서 단어 후보를 리트리벌하고, RAG로 비슷한 문장을 찾고, LLM으로 리트리벌된 단어 중 적절한 후보를 솎아내고, 마지막으로 LLM에 실제 번역을 요청하는 여러 단계의 메서드가 호출됩니다. 이 메서드들 중 상당수가 계측 대상이기 때문에, 그 유즈케이스 메서드들 중 상당수는 Langfuse의 @observe로 데코레이팅되어 있거나 with 구문을 이용하여 내부에서 Langfuse를 호출하고 있었습니다.
class TranslationUseCase: async def set_user(self, headers: dict[str, str]) -> None: ... @observe async def retrieve_term_candidates(self, sentence: str) -> list[str]: ... @observe async def retrieve_similar_sentences(self, sentence: str) -> list[str]: ... @observe async def select_relevant_terms_with_llm( self, sentence: str, term_candidates: list[str], similar_sentences: list[str], ) -> list[str]: ... @observe async def translation( self, sentence: str, selected_terms: list[str], similar_sentences: list[str], ) -> str: ... # orchestration code @asynccontextmanager async def translation_stream( self, sentence: str, selected_terms: list[str], similar_sentences: list[str], ): ... with langfuse_client.start_as_current_observation(...) as span: ... # orchestration code span.update
Python
복사
from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse router = APIRouter() @router.post("/translate") async def translate(request: Request, payload: TranslateRequest): usecase = TranslationUseCase( user_service=user_service, retriever=retriever, llm_client=llm_client, ) await usecase.set_user(dict(request.headers)) translated_text = await usecase.translate( sentence=payload.sentence, ) return {"translated_text": translated_text} @router.post("/translate/stream") async def translate_stream(request: Request, payload: TranslateRequest): usecase = TranslationUseCase( user_service=user_service, retriever=retriever, llm_client=llm_client, ) await usecase.set_user(dict(request.headers)) async def event_stream(): async with usecase.translate_stream( sentence=payload.sentence, ) as stream: yield "event: content_block_start\ndata: {}\n\n" async for translated_text in stream: yield ( "event: content_cumulative\n" f"data: {translated_text}\n\n" ) yield "event: content_block_stop\ndata: {}\n\n" return StreamingResponse( event_stream(), media_type="text/event-stream", )
Python
복사
문제는 사용자에 따라 이들 중 일부는 Langfuse에 기록되어야 했고, 일부는 Langfuse를 아예 타면 안 된다는 요구에서 시작됐습니다.
이 요구를 처음 마주했을 때는 capture_input=False, capture_output=False 같은 Langfuse의 옵션이나 decorator 사용법을 조금 더 잘 조합하면 해결될 것처럼 보였습니다. 하지만 실제로는 라이브러리 사용법의 문제가 아니라, “어디를 계측 경계로 볼 것인가”에 대한 설계 문제에 더 가까웠습니다.
가장 큰 문제는 @observe가 붙은 함수를 호출하는 순간, 이미 계측 wrapper를 타게 됩니다. 즉 어떤 사용자에게는 Langfuse를 완전히 우회해야 한다면, 그때는 단순히 input/output 캡처를 줄이는 정도로는 부족하고, 애초에 그 wrapper를 타지 않는 호출 경로가 필요합니다.
“이 요청에서 계측을 켤지 말지를, 누가 언제 결정할 것인가?”
여기서 중요한 것은 그 판단 시점이었습니다. 우리 코드에서는 사용자를 식별하기 전까지는 계측 여부를 알 수 없습니다. set_user(...)가 먼저 실행되어 현재 요청의 사용자를 알아낸 뒤에야, 이후에 호출되는 메서드들이 계측 대상인지 아닌지를 결정할 수 있습니다.
그래서 자연스럽게 정책은 유즈케이스 인스턴스가 가지도록 두었습니다. 각 요청마다 유즈케이스 인스턴스가 새로 만들어지고, set_user(...) 이후에는 그 인스턴스가 “이 요청에서 계측을 켤지 말지”를 알고 있게 만드는 식입니다.
그래서 중간 지점이 필요했습니다. 그 결과로 나온 구조는 ConditionalInstrumentationBase입니다. 이 베이스 클래스는 어떤 도구를 쓰는지 알지 않습니다. Langfuse도 모르고, OpenTelemetry도 모릅니다. 단지 현재 인스턴스가 계측을 켜야 하는지 꺼야 하는지를 알려주는 _should_enable_instrumentation() 계약만 강제합니다. 그리고 별도 모듈에 있는 instrument_if_enabled(...) decorator는 이 계약을 만족하는 인스턴스 메서드에만 사용할 수 있게 했습니다.
from abc import ABC, abstractmethod class ConditionalInstrumentationBase(ABC): @abstractmethod def _should_enable_instrumentation(self) -> bool: """현재 실행 문맥에서 이 인스턴스의 계측 활성화 여부를 반환합니다.""" raise NotImplementedError
Python
복사
이 계약만 있으면, 실제 사용처에서는 전역 설정과 사용자 ID를 조합해 유즈케이스 관심사의 정책을 구현할 수 있습니다.
class TranslationUseCase(ConditionalInstrumentationBase): def __init__(self, user_service, retriever, llm_client) -> None: self.user_service = user_service self.retriever = retriever self.llm_client = llm_client self._user_id: str | None = None async def set_user(self, headers: dict[str, str]) -> None: user = await self.user_service.get_user_from_headers(headers) self._user_id = user.id def _should_enable_instrumentation(self) -> bool: if self._user_id is None: raise RuntimeError("set_user() must be called first.") return ( settings.USE_LANGFUSE and self._user_id not in settings.INSTRUMENTATION_EXCLUDED_USER_IDS )
Python
복사
가장 중요한 줄은 이것입니다.
target_func = ( instrumented_func if self._should_enable_instrumentation() else func )
Python
복사
계측이 켜져 있으면 wrapper를 탄 함수를 호출하고, 꺼져 있으면 원본 함수를 그대로 호출합니다. 이 방식으로 가면 기존에 @observe를 직접 붙이던 자리를 조금 더 통제 가능한 decorator로 바꿀 수 있습니다.
여기서 한 가지 더 욕심을 내보면, 이 decorator 모듈은 Langfuse 자체를 직접 import하지 않게 만들 수 있습니다. 지금은 Langfuse의 observe를 쓰고 있지만, 계측 여부를 조건부로 제어하는 문제는 꼭 Langfuse에만 국한되지 않기 때문입니다. 그래서 모듈은 vendor-agnostic하게 두고, 실제 observe 함수는 외부에서 주입받도록 정리했습니다.
작게 보면 시그니처는 이런 모양입니다.
def instrument_if_enabled( *, observe, **observe_kwargs, ): ...
Python
복사
사용하는 쪽에서는 현재 벤더의 observe를 넘겨줍니다.
from langfuse import observe class TranslationUseCase(ConditionalInstrumentationBase): @instrument_if_enabled(observe=observe) async def retrieve_term_candidates(self, sentence: str) -> list[str]: ... @instrument_if_enabled(observe=observe) async def retrieve_similar_sentences(self, sentence: str) -> list[str]: ... @instrument_if_enabled(observe=observe) async def select_relevant_terms_with_llm(...): ... @instrument_if_enabled(observe=observe, as_type="generation") async def translation(...): ...
Python
복사
이 시점에서 decorator는 “계측을 할지 말지”만 책임지고, “어떤 벤더의 어떤 observe를 쓸지”는 사용처가 결정합니다. 우리가 처음 이 문제를 풀기 시작한 이유가 “Langfuse 전용 유틸을 잘 만드는 것”이 아니라 “요청별로 계측을 우회할 수 있는 경계”를 만드는 것이었다는 점을 떠올리면, 이 방향이 더 자연스럽습니다.
다만 이 규칙이 모든 메서드에 똑같이 적용되지는 않았습니다. 특히 스트리밍 메서드는 일반 async def 메서드와 성격이 다릅니다. translation_stream(...)은 단순히 값을 하나 반환하고 끝나는 함수가 아니라, 세션을 열고 닫는 흐름입니다. 이런 경우에는 decorator보다 context manager가 더 자연스럽습니다.
예를 들어 일반 번역은 이런 흐름입니다.
async def translate(self, sentence: str) -> str: term_candidates = await self.retrieve_term_candidates(sentence) similar_sentences = await self.retrieve_similar_sentences(sentence) selected_terms = await self.select_relevant_terms_with_llm( sentence=sentence, term_candidates=term_candidates, similar_sentences=similar_sentences, ) return await self.translation( sentence=sentence, selected_terms=selected_terms, similar_sentences=similar_sentences, )
Python
복사
반면 스트리밍 번역은 이런 구조입니다.
@asynccontextmanager async def translate_stream(self, sentence: str): term_candidates = await self.retrieve_term_candidates(sentence) similar_sentences = await self.retrieve_similar_sentences(sentence) selected_terms = await self.select_relevant_terms_with_llm( sentence=sentence, term_candidates=term_candidates, similar_sentences=similar_sentences, ) final_output = "" async def stream_generator(): nonlocal final_output translated_chunks = self.llm_client.stream_translate( sentence=sentence, selected_terms=selected_terms, similar_sentences=similar_sentences, ) async for chunk in translated_chunks: final_output += chunk yield chunk ...
Python
복사
이쯤 오면 context manager 쪽은 억지로 공통 모듈로 더 빼지 않는 편이 낫다는 판단이 섭니다. start_as_current_observation(...), observation.update(...) 같은 API는 벤더별 구현과 상당히 강하게 결합될 수밖에 없고, 이 영역이 어디까지 일반화될 수 있는지도 지금 당장 명확하지 않습니다. 무리해서 instrumentation_context(...), update_instrumentation(...) 같은 얇은 래퍼를 계속 만들기 시작하면, 추상화가 문제를 가리기보다 오히려 흐름을 더 읽기 어렵게 만들 가능성이 큽니다.
이런 판단은 지나친 인터페이스화보다는 직접적이고 읽기 쉬운 코드를 선호하는 파이썬의 감각과도 잘 맞습니다. 그래서 context manager 쪽은 각 사용처 파일에서 직접 구현하고, 일반 메서드에 대한 조건부 분기만 공통 decorator로 빼는 것이 가장 균형이 좋습니다.
스트리밍 메서드는 결국 이런 식으로 정리되었습니다.
from contextlib import asynccontextmanager, nullcontext from langfuse import get_client langfuse_client = get_client() class TranslationUseCase(ConditionalInstrumentationBase): @asynccontextmanager async def translation_stream( self, sentence: str, selected_terms: list[str], similar_sentences: list[str], ): final_output = "" async def stream_generator(): nonlocal final_output translated_chunks = self.llm_client.stream_translate( sentence=sentence, selected_terms=selected_terms, similar_sentences=similar_sentences, ) async for chunk in translated_chunks: final_output += chunk yield chunk observation_context = ( langfuse_client.start_as_current_observation( name="translation_stream", as_type="generation", ) if self._should_enable_instrumentation() else nullcontext(None) ) with observation_context as span: try: yield stream_generator() except Exception as exc: if span is not None: span.update(metadata={"stream_error": str(exc)}) raise finally: if span is not None: span.update(output=final_output)
Python
복사
이제 일반(sync, async) 메서드용 공통 모듈은 간단해집니다.
# conditional_instrumentation.py from __future__ import annotations import inspect from abc import ABC, abstractmethod from functools import wraps from typing import Any, Callable, ParamSpec, Protocol, TypeVar, cast P = ParamSpec("P") R = TypeVar("R") class ConditionalInstrumentationBase(ABC): """현재 실행 문맥에서 인프라 레벨 계측 활성화 여부를 결정하는 기본 계약입니다.""" @abstractmethod def _should_enable_instrumentation(self) -> bool: """현재 실행 문맥에서 이 인스턴스의 계측 활성화 여부를 반환합니다.""" raise NotImplementedError def _ensure_instrumentable_instance( instance: Any, func: Callable[..., Any], ) -> None: """데코레이팅된 대상이 올바른 인스턴스 메서드인지 검증합니다.""" if not isinstance(instance, ConditionalInstrumentationBase): raise TypeError( "@instrument_if_enabled(...)는 " "ConditionalInstrumentationBase를 상속한 클래스의 " "인스턴스 메서드에만 적용할 수 있습니다. " f"{func.__qualname__!r} 호출 시 첫 번째 인자 타입은 " f"{type(instance).__name__!r} 입니다." ) def instrument_if_enabled( *, observe, **observe_kwargs: Any, ) -> Callable[[Callable[P, R]], Callable[P, R]]: """함수 단위 계측을 조건부로 적용하는 데코레이터입니다.""" def decorator(func: Callable[P, R]) -> Callable[P, R]: if inspect.isgeneratorfunction(func): raise TypeError( "@instrument_if_enabled(...)는 generator 함수에 적용할 수 없습니다. " f"대상: {func.__qualname__!r}" ) if inspect.isasyncgenfunction(func): raise TypeError( "@instrument_if_enabled(...)는 async generator 함수에 적용할 수 없습니다. " f"대상: {func.__qualname__!r}. " "스트리밍/세션성 흐름은 각 사용처에서 context manager로 직접 처리하세요." ) instrumented_func = observe(**observe_kwargs)(func) if inspect.iscoroutinefunction(func): @wraps(func) async def async_wrapper( self: ConditionalInstrumentationBase, *args: P.args, **kwargs: P.kwargs, ) -> R: _ensure_instrumentable_instance(self, func) target_func = ( instrumented_func if self._should_enable_instrumentation() else func ) return await cast(Callable[..., Any], target_func)( self, *args, **kwargs ) return cast(Callable[P, R], async_wrapper) @wraps(func) def sync_wrapper( self: ConditionalInstrumentationBase, *args: P.args, **kwargs: P.kwargs, ) -> R: _ensure_instrumentable_instance(self, func) target_func = ( instrumented_func if self._should_enable_instrumentation() else func ) return cast(Callable[..., R], target_func)(self, *args, **kwargs) return cast(Callable[P, R], sync_wrapper) return decorator
Python
복사
여기서 이름을 observe_if_enabled가 아니라 instrument_if_enabled로 둔 것도 의도적인 선택입니다. 현재 구현에서는 Langfuse의 observe를 주입받아 사용하지만, observe라는 이름은 Langfuse의 특정 wrapper에 더 가깝습니다. 반면 instrument는 애플리케이션에 계측 코드를 심는다는 더 넓은 의미를 가집니다. 이 추상화가 특정 벤더에 갇히지 않기를 원했기 때문에 이름에서도 Langfuse를 제거했습니다.
이렇게 정리하면 규칙은 간단해집니다. 함수처럼 호출해서 끝나는 메서드는 decorator를 사용합니다. 세션을 열고 닫는 메서드는 context manager를 직접 사용합니다. 이 규칙은 Langfuse뿐 아니라 다른 계측 도구를 사용하더라도 유지할 수 있는 애플리케이션 레벨의 규칙입니다.
결국 이 설계의 핵심은 Langfuse를 얼마나 예쁘게 감싸느냐가 아니었습니다. 더 중요한 것은, 일반 함수 호출과 세션성 흐름을 같은 방식으로 취급하지 않고, 각각에 맞는 계측 경계를 분리한 뒤, 요청 단위의 정책을 유즈케이스 인스턴스가 책임지게 만드는 것이었습니다.
이렇게 해두면 지금은 Langfuse를 쓰더라도, 코드 전체가 특정 벤더의 이름과 개념에 지나치게 잠기지 않습니다. 일반 메서드용 decorator는 vendor-agnostic하게 남겨 둘 수 있고, 정말 강결합이 unavoidable한 context manager 쪽만 사용처에서 직접 드러내면 됩니다. 그 정도의 결합은 오히려 숨기지 않는 편이 코드를 더 솔직하게 만들어 줍니다.
아래는 최종적으로 분리한 conditional_instrumentation.py입니다.
# conditional_instrumentation.py """ 조건부 인프라 계측을 위한 공통 경계입니다. 애플리케이션에 코드를 심어 기록을 남기는 일을 instrumentation 이라고 합니다. 현재 우리가 LLM observability 도구로 채택하고 있는 Langfuse 공식 문서는 Python SDK에서 계측 방식을 크게 native integrations와 manual instrumentation으로 설명합니다. 이 중 manual instrumentation은 다시 observe wrapper, context manager, manual observations 방식으로 나뉘며, 이 방식들은 서로 함께 사용할 수 있고 중첩도 가능합니다. observe wrapper는 wrapped function의 입력, 출력, 실행 시간, 에러를 자동 캡처하는 데 적합하고, context manager는 특정 블록 동안 active observation을 만들고 그 안의 child observation을 자동으로 연결하는 데 적합합니다. 우리 쪽에서는 이를 다음과 같은 규칙으로 사용합니다. - 함수처럼 호출해서 끝나는 메서드면 decorator - 세션을 열고 닫는 메서드면 context manager 다만 이 패턴은 꼭 Langfuse에만 국한되는 문제는 아닙니다. OpenTelemetry, Datadog, Sentry tracing, custom telemetry 등 어떤 infrastructure-level 도구가 들어오더라도, 어떤 상황에서는 계측을 켜고 어떤 상황에서는 계측을 꺼야 할 수 있습니다. 따라서 현재 구현은 Langfuse를 사용하더라도, 추상화의 이름은 특정 벤더에 종속되지 않도록 Langfuse를 직접 드러내지 않는 방향으로 관리합니다. 이 모듈은 함수 호출 단위의 조건부 계측만 담당합니다. 즉, 일반 `def` / `async def` 인스턴스 메서드에는 `instrument_if_enabled(...)`를 사용합니다. 반면 `@asynccontextmanager`, streaming session, 긴 블록 생명주기를 가진 흐름은 각 사용처에서 context manager를 직접 구현합니다. context manager helper를 이 모듈로 빼지 않는 이유는, 그 순간 특정 벤더의 context object, update API, lifecycle API와 강하게 결합되기 때문입니다. 예를 들어 Langfuse에서는 `start_as_current_observation(...)`, `observation.update(...)` 같은 API를 직접 다루게 됩니다. 이를 공통화하려고 하면 결국 `update_instrumentation(...)` 같은 얇은 래퍼가 계속 생기거나, 이 모듈이 특정 벤더의 동작 방식을 암묵적으로 알아야 합니다. 그래서 이 모듈의 책임은 다음 두 가지로 제한합니다. 1. 계측 가능 여부를 판단하는 베이스 계약 제공 2. 일반 함수형 인스턴스 메서드에 외부에서 주입된 계측 데코레이터를 조건부로 적용 사용 예시: from langfuse import observe from app.common.conditional_instrumentation import ( ConditionalInstrumentationBase, instrument_if_enabled, ) class TranslationUseCase(ConditionalInstrumentationBase): def _should_enable_instrumentation(self) -> bool: return ( settings.USE_LANGFUSE and self._user_id not in settings.INSTRUMENTATION_EXCLUDED_USER_IDS ) @instrument_if_enabled(instrumenter=observe) def normalize_text(self, text: str) -> str: return text.strip().lower() @instrument_if_enabled( instrumenter=observe, as_type="generation", ) async def translate(self, text: str, target_language: str) -> str: return await self.translator.translate( text=text, target_language=target_language, ) `@asynccontextmanager` 기반 스트리밍 메서드는 이 모듈의 데코레이터를 사용하지 않고, 사용처에서 직접 context manager를 엽니다. 사용 예시: from contextlib import asynccontextmanager, nullcontext from langfuse import get_client langfuse = get_client() class TranslationUseCase(ConditionalInstrumentationBase): @asynccontextmanager async def stream_translate(self, text: str, target_language: str): final_output = "" async def stream_generator(): nonlocal final_output async for chunk in self.translator.stream_translate( text=text, target_language=target_language, ): final_output += chunk yield chunk observation_context = ( langfuse.start_as_current_observation( name="stream_translate", as_type="generation", ) if self._should_enable_instrumentation() else nullcontext(None) ) with observation_context as observation: try: yield stream_generator() except Exception as exc: if observation is not None: observation.update( metadata={"stream_error": str(exc)}, ) raise finally: if observation is not None: observation.update(output=final_output) """ from __future__ import annotations import inspect from abc import ABC, abstractmethod from functools import wraps from typing import Any, Callable, ParamSpec, TypeVar, cast P = ParamSpec("P") R = TypeVar("R") class ConditionalInstrumentationBase(ABC): """ 현재 실행 문맥에서 인프라 레벨 계측을 활성화할지 여부를 결정하는 기본 계약입니다. 이 클래스를 상속하는 객체는 `_should_enable_instrumentation()`을 구현해야 합니다. 이 메서드는 보통 요청 단위 상태를 기반으로 계측 여부를 판단합니다. 예를 들어 다음과 같은 조건을 이 메서드 안에서 함께 처리할 수 있습니다. - 전역 계측 활성화 여부 - 특정 user id 제외 여부 - 특정 tenant, environment, feature flag에 따른 제외 여부 이 클래스는 특정 계측 도구를 직접 알지 않습니다. 실제 계측 도구는 `instrument_if_enabled(...)`에 주입되는 `instrumenter`가 담당합니다. """ @abstractmethod def _should_enable_instrumentation(self) -> bool: """ 현재 실행 문맥에서 이 인스턴스의 계측을 활성화할지 여부를 반환합니다. 이 메서드는 데코레이팅된 메서드가 호출될 때마다 평가됩니다. 따라서 `set_user(...)` 같은 선행 메서드에서 요청 단위 상태를 먼저 확정해두는 것이 좋습니다. """ raise NotImplementedError def _ensure_instrumentable_instance( instance: Any, func: Callable[..., Any], ) -> None: """ 데코레이팅된 대상이 올바른 인스턴스 메서드인지 검증합니다. `@instrument_if_enabled(...)`는 `ConditionalInstrumentationBase`를 상속한 클래스의 인스턴스 메서드에서만 사용하는 것을 전제로 합니다. 일반 함수, staticmethod, classmethod, 또는 다른 타입의 객체에 잘못 적용되면 런타임에서 명확한 오류를 발생시킵니다. """ if not isinstance(instance, ConditionalInstrumentationBase): raise TypeError( "@instrument_if_enabled(...)는 " "ConditionalInstrumentationBase를 상속한 클래스의 " "인스턴스 메서드에만 적용할 수 있습니다. " f"{func.__qualname__!r} 호출 시 첫 번째 인자 타입은 " f"{type(instance).__name__!r} 입니다." ) def _ensure_plain_instance_method_target(func: Any) -> None: """ 데코레이터가 지원하지 않는 대상에 적용되는 것을 빠르게 차단합니다. """ func_name = getattr(func, "__qualname__", repr(func)) if isinstance(func, staticmethod): raise TypeError( "@instrument_if_enabled(...)는 staticmethod에 적용할 수 없습니다. " f"대상: {func_name!r}. " "인스턴스 상태를 기반으로 계측 여부를 판단해야 하므로 인스턴스 메서드에만 사용하세요." ) if isinstance(func, classmethod): raise TypeError( "@instrument_if_enabled(...)는 classmethod에 적용할 수 없습니다. " f"대상: {func_name!r}. " "인스턴스 상태를 기반으로 계측 여부를 판단해야 하므로 인스턴스 메서드에만 사용하세요." ) if not callable(func): raise TypeError( "@instrument_if_enabled(...)는 callable에만 적용할 수 있습니다. " f"대상: {func_name!r}" ) def instrument_if_enabled( *, instrumenter: Callable[..., Any], **instrumenter_kwargs: Any, ) -> Callable[[Callable[P, R]], Callable[P, R]]: """ 함수 호출 단위 계측을 조건부로 적용하는 데코레이터입니다. 이 데코레이터는 특정 벤더의 계측 함수를 직접 import하지 않습니다. 대신 외부에서 `instrumenter`를 주입받습니다. Langfuse를 사용하는 경우: from langfuse import observe @instrument_if_enabled(instrumenter=observe) def normalize_text(self, text: str) -> str: return text.strip().lower() @instrument_if_enabled( instrumenter=observe, as_type="generation", ) async def translate(self, text: str) -> str: return await self.translator.translate(text) 지원 대상: - `def`로 정의된 인스턴스 메서드 - `async def`로 정의된 인스턴스 메서드 지원하지 않는 대상: - generator 함수 (`def ...: yield ...`) - async generator 함수 (`async def ...: yield ...`) - `@contextmanager` / `@asynccontextmanager` 기반 세션 메서드 - 일반 함수 - staticmethod - classmethod 세션, 스트리밍, context manager처럼 블록의 생명주기가 중요한 흐름은 이 데코레이터가 아니라 사용처에서 직접 context manager를 구현해야 합니다. 이 데코레이터는 계측이 실제로 활성화되는 첫 호출 시점까지 `instrumenter(**instrumenter_kwargs)(func)` 호출을 지연합니다. 따라서 `_should_enable_instrumentation()`이 계속 False인 경우, 주입된 계측 데코레이터는 실제로 적용되지 않습니다. """ if not callable(instrumenter): raise TypeError( "`instrumenter`는 kwargs를 받아 데코레이터를 반환하는 callable이어야 합니다." ) def decorator(func: Callable[P, R]) -> Callable[P, R]: _ensure_plain_instance_method_target(func) original_func = inspect.unwrap(func) if inspect.isgeneratorfunction(func) or inspect.isgeneratorfunction(original_func): raise TypeError( "@instrument_if_enabled(...)는 generator 함수에 적용할 수 없습니다. " f"대상: {func.__qualname__!r}. " "세션/스트리밍성 흐름은 사용처에서 context manager로 직접 처리하세요." ) if inspect.isasyncgenfunction(func) or inspect.isasyncgenfunction(original_func): raise TypeError( "@instrument_if_enabled(...)는 async generator 함수에 적용할 수 없습니다. " f"대상: {func.__qualname__!r}. " "세션/스트리밍성 흐름은 사용처에서 context manager로 직접 처리하세요." ) instrumented_func: Callable[..., Any] | None = None def get_instrumented_func() -> Callable[..., Any]: nonlocal instrumented_func if instrumented_func is not None: return instrumented_func instrumentation_decorator = instrumenter(**instrumenter_kwargs) if not callable(instrumentation_decorator): raise TypeError( "`instrumenter(**kwargs)`는 데코레이터 callable을 반환해야 합니다. " f"대상: {func.__qualname__!r}" ) wrapped_func = instrumentation_decorator(func) if not callable(wrapped_func): raise TypeError( "`instrumenter(**kwargs)(func)`는 callable을 반환해야 합니다. " f"대상: {func.__qualname__!r}" ) instrumented_func = wrapped_func return instrumented_func if inspect.iscoroutinefunction(func): @wraps(func) async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: if not args: raise TypeError( "@instrument_if_enabled(...)는 인스턴스 메서드에만 적용할 수 있습니다. " f"{func.__qualname__!r} 호출 시 인스턴스 인자가 전달되지 않았습니다." ) instance = args[0] _ensure_instrumentable_instance(instance, func) target_func = ( get_instrumented_func() if instance._should_enable_instrumentation() else func ) return await cast(Callable[..., Any], target_func)(*args, **kwargs) return cast(Callable[P, R], async_wrapper) @wraps(func) def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: if not args: raise TypeError( "@instrument_if_enabled(...)는 인스턴스 메서드에만 적용할 수 있습니다. " f"{func.__qualname__!r} 호출 시 인스턴스 인자가 전달되지 않았습니다." ) instance = args[0] _ensure_instrumentable_instance(instance, func) target_func = ( get_instrumented_func() if instance._should_enable_instrumentation() else func ) return cast(Callable[..., R], target_func)(*args, **kwargs) return cast(Callable[P, R], sync_wrapper) return decorator
Python
복사
언젠가 이 메모에 쓰이면 좋을 것 같은 재료들입니다.
1.
인프라 레이어(Langfuse를 비롯한 계측 도구들)를 분리해야 한다는 생각은 CA, 포트-어답터 패턴과 맞닿아 있다. 하지만 이 글의 예제 코드에서 명시적으로 포트-어답터 패턴을 사용하지는 않는다.
from: 이 메모에 쓰인 생각을 만든 앞의 생각들입니다. 앞의 생각과 연관관계를 설명합니다.
1.
이 코드에서 컨트롤러는 유즈케이스를 직접 호출한다.
2.
Langfuse 혹은 다른 계측 프레임워크를 사용해야 할 경우 이 글을 고려할 수 있다.
supplementary: 이 메모에 작성된 생각을 뒷받침하는 생각의 새로운 메모입니다.
1.
opposite: 이 메모에 작성된 생각과 대조되는 생각의 새로운 메모입니다.
1.
to: 이 메모에 작성된 생각으로부터 발전된 생각의 새로운 메모입니다.
1.
ref: 생각에 참고한 자료입니다.
1.
프로젝트메모 템플릿 버전 2026.03.20