Advanced Client Features

Take your Hyperia Client beyond basic call_tool() and unlock rich interaction patterns—live logs, progress bars, on‑device LLM sampling, dynamic roots, auto‑reconnect, and streaming chunk callbacks.

All advanced hooks are opt‑in. Provide the relevant handler when you construct or open a client; otherwise Hyperia defaults to quiet DEBUG logs.


1 Log Streaming

Servers can emit ctx.debug()/info()/warning()/error() messages back to the originator. Attach an async log_handler to intercept.

from hyperia import Client
from hyperia.client.logging import LogMessage

async def log_handler(msg: LogMessage):
    ts = msg.timestamp.isoformat(timespec="seconds")
    print(f"[{ts}] {msg.level.upper():<7} {msg.logger or 'srv'}{msg.data}")

client = Client("server.py", log_handler=log_handler)

LogMessage fields

Field
Type
Example

timestamp

datetime

2025‑06‑10T14:32:01Z

level

"debug" …

"info"

logger

str

"sqlalchemy.engine"

data

str

"SELECT * FROM users"

Default handler simply logger.debug()’s each entry.


2 Progress Tracking (v2.3.5+)

Long‑running tools can call ctx.report_progress(p, total, msg). Clients receive ProgressEvents. Provide either:

  • Global progress_handler when instantiating the client.

  • Per‑call handler via call_tool(..., progress_handler=…).

from hyperia.client.progress import ProgressHandler
class Bar(ProgressHandler):
    async def on_progress(self, p, total, msg):
        pct = (p/total*100) if total else 0
        print(f"\r{pct:3.0f}% {msg or ''}", end="")

client = Client("server.py", progress_handler=Bar())

Edge cases

  • total may be None → treat as unknown size.

  • Handler is invoked from transport thread—keep it non‑blocking.


3 Client‑Side LLM Sampling

Servers can delegate generation tasks to the caller’s local/remote model (useful for retrieval‑augmented workflows).

Handler Signature

async def sampling_handler(
    messages: list[SamplingMessage],
    params: SamplingParams,          # temp, max_tokens, systemPrompt …
    context: RequestContext,         # request_id, tool_name…
) -> str | SamplingResponse:
    ...

Example with OpenAI Assistants API

import openai, os
from hyperia.client.sampling import SamplingMessage, SamplingParams

openai.api_key = os.getenv("OPENAI_API_KEY")

async def oa_samp(msgs: list[SamplingMessage], params: SamplingParams, _):
    resp = await openai.chat.completions.create(
        model="gpt-4o",
        messages=[m.to_openai() for m in msgs],
        temperature=params.temperature or 0.7,
        max_tokens=params.maxTokens or 512,
    )
    return resp.choices[0].message.content

client = Client("https://srv.example.com/mcp", sampling_handler=oa_samp)

If you return a SamplingResponse object you can attach tool‑calls or images (per MCP spec v1.7).


4 Dynamic Roots (Capability Hints)

Root URIs tell the server where it’s allowed to operate (file‑system scope, tenant boundary, etc.).

client = Client("server.py", roots=["/workdir/projectA", "s3://bucket/root"])

Real‑time Updates

Supply an async callback that returns the current root list—Hyperia will re‑poll when the server sends roots/request.

async def roots_provider():
    # e.g. pull from UI state
    return [current_workspace_root()]

client = Client("server.py", roots=roots_provider)

5 Auto‑Reconnect & Back‑off

Network transports can retry transient failures automatically.

client = Client(
    "https://api.acme.com/mcp",
    retries=3,              # total attempts
    backoff_factor=0.4,     # grows exponentially 0.4, 0.8, 1.6 s
)

Streamable HTTP includes idempotency keys on tools/call so safe to retry.


6 Chunk Streaming Handlers (v2.4.0)

For very large outputs, servers may stream TextContent chunks. Register a chunk_handler to receive partial data in real‑time (avoids buffering entire string).

async def on_chunk(content):
    print(content.text, end="", flush=True)

await client.call_tool("generate_report", {}, chunk_handler=on_chunk)

When a chunk_handler is present, the return value of call_tool contains only the final chunk(s) (may be empty). Use chunk events for incremental UI.


7 Cancellation Tokens

Pass cancel_event (an asyncio.Event) to any call—flip it to abort client‑side; Hyperia sends tools/call/cancel upstream.

import asyncio
cancel = asyncio.Event()
asyncio.get_event_loop().call_later(5, cancel.set)   # auto‑cancel after 5s

await client.call_tool("heavy", {}, cancel_event=cancel)

8 Custom Content Codecs

Need protobuf instead of JSON for binary blobs? Implement Codec and register globally:

from hyperia.codec import CodecRegistry, Codec

class ProtobufCodec(Codec):
    type = "application/x-protobuf"
    def encode(obj): ...
    def decode(data): ...
CodecRegistry.register(ProtobufCodec())

Servers & clients negotiate via mime_type fields.


9 Putting It All Together

from hyperia import Client

client = Client(
    "backend.py",
    log_handler=my_logger,
    progress_handler=Bar(),
    sampling_handler=oa_samp,
    roots=["/workspace"],
    timeout=10,
)

async with client:
    await client.call_tool("analyze", {"file": "report.pdf"}, chunk_handler=print)

Now you have a fully‑featured client: live logs, progress bar, on‑device completions, abort switch, and streaming output.

Last updated