Judgment Labs Logo
PythonTracer

OfflineTracer

Tracer for offline / experiment-style runs.

OFFLINE_TRACES_PATH

Default:

'otel/v1/offline-traces'

Tracer for offline / experiment-style runs.

Behaves like Tracer for span creation and @Tracer.observe, with two differences:

  • Spans are pushed to the project's offline OTLP endpoint and stored in the offline_otel_traces ClickHouse table. They do not appear on the live monitoring page.
  • Each completed root span produces a new Example that is appended to the caller-supplied dataset list. The example carries the offline_trace_id of the offline trace plus any static example_fields configured at init time.

Unlike Tracer, OfflineTracer requires all credentials upfront and raises ValueError if any are missing — there is no no-op fallback. Prefer Judgeval.offline_tracer(...) over calling OfflineTracer.create directly so credentials are reused from the active Judgeval client.

Attributes

SUPPORTS_LIVE_INSTRUMENTATION

:

bool

Default:

False

project_name

Default:

project_name

project_id

Default:

project_id

api_key

Default:

api_key

organization_id

Default:

organization_id

api_url

Default:

api_url

environment

Default:

environment

serializer

Default:

serializer

TRACER_NAME

Default:

JUDGEVAL_TRACER_INSTRUMENTING_MODULE_NAME


create()

Create and activate a new OfflineTracer.

Args mirror Tracer.init plus: dataset: Caller-owned list. Each completed root span appends a new Example carrying the offline_trace_id of the trace and the static example_fields. example_fields: Static fields copied onto every emitted example (e.g. {"input": ..., "golden_output": ...}).

ValueError: If project_name, api_key, organization_id, or api_url cannot be resolved (explicit arg or env var), or if the project cannot be found on the backend.

def create(project_name=None, api_key=None, organization_id=None, api_url=None, environment=None, set_active=True, serializer=safe_serialize, resource_attributes=None, sampler=None, span_limits=None, span_processors=None, *, dataset, example_fields=None) -> 'OfflineTracer':

Parameters

project_name

:

Optional[str]

Default:

None

api_key

:

Optional[str]

Default:

None

organization_id

:

Optional[str]

Default:

None

api_url

:

Optional[str]

Default:

None

environment

:

Optional[str]

Default:

None

set_active

:

bool

Default:

True

serializer

:

Callable[[Any], str]

Default:

safe_serialize

resource_attributes

:

Optional[Dict[str, Any]]

Default:

None

sampler

:

Optional[Sampler]

Default:

None

span_limits

:

Optional[SpanLimits]

Default:

None

span_processors

:

Optional[Sequence[SpanProcessor]]

Default:

None

dataset

required

:

List[Example]

example_fields

:

Optional[Dict[str, Any]]

Default:

None

Returns

'OfflineTracer'


get_span_exporter()

Return the offline span exporter for this tracer.

Targets the project's offline OTLP endpoint. Credentials are guaranteed present (validated in create).

def get_span_exporter() -> JudgmentSpanExporter:

Returns

JudgmentSpanExporter


get_span_processor()

Return the offline span processor for this tracer.

def get_span_processor() -> OfflineJudgmentSpanProcessor:

Returns

OfflineJudgmentSpanProcessor


get_current_span()

Return the currently active span from the Judgment tracer provider.

def get_current_span() -> opentelemetry.trace.Span:

Returns

opentelemetry.trace.Span - The active Span object.


force_flush()

Send all pending spans to Judgment immediately.

Call this before your process exits (e.g. in a serverless function) to ensure no spans are lost. Does not shut down the tracer.

def lambda_handler(event, context):
    result = process(event)
    Tracer.force_flush()
    return result
def force_flush(timeout_millis=30000) -> bool:

Parameters

timeout_millis

:

int

Maximum wait time in milliseconds.

Default:

30000

Returns

bool - True if all spans were flushed within the timeout.


shutdown()

Flush pending spans and shut down the tracer.

Call this on application exit to ensure all data is exported before the process terminates.

def shutdown(timeout_millis=30000) -> None:

Parameters

timeout_millis

:

int

Maximum wait time in milliseconds.

Default:

30000

Returns

None


registerOTELInstrumentation()

Register a third-party OpenTelemetry instrumentor with Judgment.

Use this to route spans from libraries like opentelemetry-instrumentation-requests through the Judgment trace pipeline.

def registerOTELInstrumentation(instrumentor) -> None:

Parameters

instrumentor

required

Returns

None


start_span()

Start a new span that must be ended manually with span.end().

Prefer the span context manager for automatic lifecycle management.

def start_span(name, attributes=None) -> opentelemetry.trace.Span:

Parameters

name

required

:

str

Name for the new span.

attributes

:

Optional[Dict[str, Any]]

Optional dictionary of initial span attributes.

Default:

None

Returns

opentelemetry.trace.Span - The newly started Span.


start_as_current_span()

Start a span and set it as the current span in the context.

def start_as_current_span(name, attributes=None) -> Iterator[Span]:

Parameters

name

required

:

str

Name for the new span.

attributes

:

Optional[Dict[str, Any]]

Optional dictionary of initial span attributes.

Default:

None

Returns

Iterator[Span]


continue_trace()

Continue a distributed trace from an upstream service.

Extracts W3C trace context and Judgment baggage from carrier and makes it the active context for the duration of the block. Any span started inside — including @Tracer.observe functions — becomes a child of the upstream parent, stitching your service into the caller's trace.

Use this at the entry point of an inbound request (HTTP handler, message queue consumer, RPC dispatcher, etc.) to join the trace started by the upstream caller.

FastAPI:

@Tracer.observe(span_type="agent")
def handle(payload): ...

@app.post("/run")
async def run(request: Request):
    with Tracer.continue_trace(request.headers):
        return handle(await request.json())

Propagating in the opposite direction (outbound):

from judgeval.trace.propagation import inject

headers = {}
inject(headers)
httpx.post(downstream_url, headers=headers, json=payload)
def continue_trace(carrier) -> Iterator[Any]:

Parameters

carrier

required

:

Any

A mapping containing propagation keys. Typically request.headers from FastAPI, Flask, or Starlette, but any dict-shaped mapping with lowercase keys works (message queue attributes, Lambda event headers, RPC metadata, etc.). If the carrier contains no trace context, the block runs with a fresh context — no error.

Returns

Iterator[Any]


start_linked_trace()

Start a linked trace rooted at a new span.

The new span is the root of a fresh trace. It links back to the current span via an OpenTelemetry Link and stores explicit cross-trace source/target IDs on the linked root and invocation spans.

def start_linked_trace(name, attributes=None, *, span_type='span') -> Iterator[Span]:

Parameters

name

required

:

str

Name for the linked trace root span.

attributes

:

Optional[Dict[str, Any]]

Optional dictionary of initial linked-root-span attributes.

Default:

None

span_type

:

Optional[str]

Span kind to apply to both the parent-side invocation span and the linked trace root span. Set to None to skip setting it.

Default:

'span'

Returns

Iterator[Span]


span()

Open a child span using a with block.

Use this for tracing a section of code that isn't a standalone function. Exceptions are automatically recorded on the span.

with Tracer.span("process-results"):
    results = parse(raw_data)
    Tracer.set_attribute("result_count", len(results))
def span(span_name) -> Iterator[Span]:

Parameters

span_name

required

:

str

Name for this span (visible in the dashboard).

Returns

Iterator[Span]


observe()

Decorator that automatically traces a function call.

Wraps any sync or async function in a span. When fork=True and an active parent span exists, eligible calls run in a fresh linked trace while a parent-side invocation span remains on the current trace. Generator and async-generator functions stay on the normal observation path. Inputs and outputs are captured automatically. Works with or without parentheses.

Basic usage:

@Tracer.observe(span_type="tool")
def search(query: str) -> list[str]:
    return vector_db.search(query)

Async functions work the same way:

@Tracer.observe(span_type="agent")
async def answer(question: str) -> str:
    context = search(question)
    return await llm.generate(question, context)

Fork a call into a linked trace:

@Tracer.observe(span_type="agent", fork=True)
def delegate(task: str) -> str:
    return run_subsystem(task)

Without parentheses (uses default settings):

@Tracer.observe
def my_function():
    ...
def observe(func=None, span_type='span', span_name=None, record_input=True, record_output=True, disable_generator_yield_span=False, fork=False) -> C | Callable[[C], C]:

Parameters

func

:

Optional[C]

The function to wrap (set implicitly when used as @Tracer.observe without parentheses).

Default:

None

span_type

:

Optional[str]

The kind of span. Use "tool", "agent", "llm", or "function" to categorize work in the dashboard. Defaults to "span".

Default:

'span'

span_name

:

Optional[str]

Override the span name (defaults to the function name).

Default:

None

record_input

:

bool

Capture and store function arguments. Set to False for functions with sensitive or very large inputs.

Default:

True

record_output

:

bool

Capture and store the return value.

Default:

True

disable_generator_yield_span

:

bool

Suppress per-yield child spans for generator functions.

Default:

False

fork

:

bool

If True, run the function in a new linked trace instead of the current trace when an active parent span is available. Otherwise, observation falls back to the normal behavior.

Default:

False

Returns

C | Callable[[C], C]


wrap()

Wrap an LLM client for automatic tracing of all API calls.

Supported providers: OpenAI, Anthropic, Together AI, and Google GenAI. Once wrapped, every API call made through the client is recorded as a span with model name, token counts, and cost.

from openai import OpenAI
from anthropic import Anthropic

openai = Tracer.wrap(OpenAI())
anthropic = Tracer.wrap(Anthropic())
def wrap(client) -> TClient:

Parameters

client

required

:

TClient

An LLM provider client instance (e.g. OpenAI(), Anthropic()).

Returns

TClient - The same client instance, now instrumented with tracing.


set_span_kind()

Set the judgment.span_kind attribute on the current span.

def set_span_kind(kind) -> None:

Parameters

kind

required

:

str

Returns

None


set_llm_span()

def set_llm_span() -> None:

Returns

None


set_tool_span()

def set_tool_span() -> None:

Returns

None


set_general_span()

def set_general_span() -> None:

Returns

None


set_attribute()

Attach a custom key-value pair to the current span.

Use this to record application-specific metadata that you want to see in the Judgment dashboard. Non-primitive values (dicts, lists, objects) are serialized to strings automatically.

Tracer.set_attribute("user_tier", "premium")
Tracer.set_attribute("search_results_count", len(results))
def set_attribute(key, value) -> None:

Parameters

key

required

:

str

Attribute name (e.g. "user_tier", "search_results_count").

value

required

:

Any

The value to record.

Returns

None


set_attributes()

Set multiple custom attributes on the current span at once.

def set_attributes(attributes) -> None:

Parameters

attributes

required

:

Dict[str, Any]

Dictionary of key-value pairs to set.

Returns

None


set_input()

Manually set the input for the current span.

Use when @observe(record_input=False) is set but you want to record a sanitized or transformed version of the input.

def set_input(input_data) -> None:

Parameters

input_data

required

:

Any

The input value to record.

Returns

None


set_output()

Manually set the output for the current span.

Use when @observe(record_output=False) is set but you want to record a sanitized or transformed version of the output.

def set_output(output_data) -> None:

Parameters

output_data

required

:

Any

The output value to record.

Returns

None


recordLLMMetadata()

Record model, token usage, and cost on the current span.

If you're using Tracer.wrap() this is called automatically. Use this method when you need to record metadata for a custom LLM integration.

@Tracer.observe(span_type="llm")
def call_custom_model(prompt: str) -> str:
    response = my_model.generate(prompt)
    Tracer.recordLLMMetadata({
        "model": "my-model-v2",
        "output_tokens": response.usage.output,
        "total_cost_usd": response.usage.cost,
    })
    return response.text
def recordLLMMetadata(metadata) -> None:

Parameters

metadata

required

:

LLMMetadata

A dict with keys like model, provider, non_cached_input_tokens, output_tokens, and total_cost_usd. All fields are optional.

Returns

None


set_customer_id()

Associate the current trace with a customer.

Once set, this ID propagates to all child spans and enables per-customer analytics in the Judgment dashboard. Call this early in your request handler.

@Tracer.observe(span_type="agent")
def handle_request(user_id: str, question: str):
    Tracer.set_customer_id(user_id)
    return answer(question)
def set_customer_id(customer_id) -> None:

Parameters

customer_id

required

:

str

Your internal customer identifier.

Returns

None


set_customer_user_id()

Set the customer user ID on the current span and propagate to children.

def set_customer_user_id(customer_user_id) -> None:

Parameters

customer_user_id

required

:

str

The customer user ID to associate with this trace.

Returns

None


set_session_id()

Associate the current trace with a conversation session.

Groups multiple requests into a session in the Judgment dashboard. Propagates to all child spans. Call this early in your request handler.

@Tracer.observe(span_type="agent")
def handle_message(session_id: str, message: str):
    Tracer.set_session_id(session_id)
    return chatbot.respond(message)
def set_session_id(session_id) -> None:

Parameters

session_id

required

:

str

Your session or conversation identifier.

Returns

None


tag()

Add tags to the current trace for filtering in the dashboard.

Tags are sent asynchronously and appear in the Judgment monitoring view. Useful for marking traces by feature, experiment, or user segment.

Tracer.tag("rag-pipeline")
Tracer.tag(["experiment-v2", "premium-user"])
def tag(tags) -> None:

Parameters

tags

required

:

str | list[str]

A single tag string or a list of tags.

Returns

None


async_evaluate()

Run a hosted evaluation on this span when it completes.

The evaluation is queued and processed server-side by the Judgment platform after the span ends. Use this to score live traffic without blocking your application.

@Tracer.observe(span_type="agent")
def answer(question: str) -> str:
    response = llm.generate(question)
    Tracer.async_evaluate(
        "faithfulness",
        {"input": question, "actual_output": response},
    )
    return response
def async_evaluate(judge, example=None) -> None:

Parameters

judge

required

:

str

Name of the hosted judge/scorer (e.g. "faithfulness", "answer_relevancy").

example

:

Optional[Dict[str, Any]]

Optional dict with evaluation data. Keys like input, actual_output, expected_output, and retrieval_context are commonly used.

Default:

None

Returns

None


init()

Create and activate a new Tracer.

This is the recommended way to initialize tracing. Credentials are read from environment variables (JUDGMENT_API_KEY, JUDGMENT_ORG_ID, JUDGMENT_API_URL) when not passed explicitly. If credentials are missing, the tracer still works but spans won't be exported.

tracer = Tracer.init(
    project_name="search-assistant",
    environment="production",
)
def init(project_name=None, api_key=None, organization_id=None, api_url=None, environment=None, set_active=True, serializer=safe_serialize, resource_attributes=None, sampler=None, span_limits=None, span_processors=None) -> Tracer:

Parameters

project_name

:

Optional[str]

Your Judgment project name. Required for span export.

Default:

None

api_key

:

Optional[str]

Judgment API key. Defaults to JUDGMENT_API_KEY env var.

Default:

None

organization_id

:

Optional[str]

Organization ID. Defaults to JUDGMENT_ORG_ID env var.

Default:

None

api_url

:

Optional[str]

API endpoint URL. Defaults to JUDGMENT_API_URL env var.

Default:

None

environment

:

Optional[str]

Label for this deployment (e.g. "staging", "production"). Shows up in the Judgment dashboard.

Default:

None

set_active

:

bool

If True (default), sets this as the global tracer so @Tracer.observe() and other static methods use it.

Default:

True

serializer

:

Callable[[Any], str]

Custom serializer for span inputs/outputs.

Default:

safe_serialize

resource_attributes

:

Optional[Dict[str, Any]]

Extra OpenTelemetry resource attributes.

Default:

None

sampler

:

Optional[Sampler]

Custom OpenTelemetry sampler.

Default:

None

span_limits

:

Optional[SpanLimits]

OpenTelemetry span limits.

Default:

None

span_processors

:

Optional[Sequence[SpanProcessor]]

Additional span processors appended after the default Judgment processor.

Default:

None

Returns

Tracer - A configured and active Tracer instance.


set_active()

Set this tracer as the globally active tracer.

def set_active() -> bool:

Returns

bool - True if the tracer was successfully activated.