Skip to content

Chat

PyFlue follows the Flue chat pattern: chat platforms connect through ordinary application code, then verified events are dispatched to a continuing agent instance. The agent replies through an explicit tool that your application owns.

chat webhook -> verify request -> dispatch(agent, id=thread, session=thread)
             -> continuing agent session -> reply tool -> platform API

This keeps platform authentication, thread mapping, and outbound permissions out of the model-facing transport. The chat provider does not choose a PyFlue route or agent id directly; your application maps the provider event to an instance and session after verification.

Receive Events

Use your web framework to verify the inbound event and normalize only the fields the agent should see.

from fastapi import FastAPI, Request, Response
from pyflue import create_agent, dispatch

assistant = create_agent(
    lambda ctx: {
        "model": "anthropic/claude-haiku-4-5",
        "instructions": "Assist in this chat thread.",
    }
)

app = FastAPI()


@app.post("/webhooks/chat", status_code=202)
async def chat_webhook(request: Request):
    body = await request.json()
    if not verify_provider_signature(request):
        return Response(status_code=401)

    thread_id = str(body["thread"]["id"])
    receipt = await dispatch(
        assistant,
        id=thread_id,
        session=thread_id,
        input={
            "type": "chat.message",
            "text": body["message"]["text"],
            "sender": body["message"]["sender"],
        },
    )
    return {"status": "accepted", "dispatch_id": receipt.dispatch_id}

dispatch() accepts work for asynchronous processing. It does not create a workflow run; it is an agent operation in the selected instance and session. On the current Python path, dispatch admission is process-memory based. For production chat, enqueue the verified event in durable application storage before a worker calls dispatch(...).

Reply Explicitly

Outbound chat is a normal tool. Close over the verified thread id in the agent initializer so the model chooses reply text, not the destination.

from typing import Any
from pyflue import create_agent, define_tool


def build_chat_agent(ctx: Any):
    thread_id = ctx.id

    async def reply_to_chat_thread(args: dict[str, str]) -> str:
        await chat_client.post_message(thread_id, args["text"])
        return "Reply sent."

    return {
        "model": "anthropic/claude-haiku-4-5",
        "instructions": "Reply in the current chat thread when useful.",
        "tools": [
            define_tool(
                "reply_to_chat_thread",
                reply_to_chat_thread,
                description="Post a response into the current chat thread.",
                parameters={
                    "type": "object",
                    "properties": {"text": {"type": "string"}},
                    "required": ["text"],
                },
            )
        ],
    }


assistant = create_agent(build_chat_agent)

State Boundaries

A chat thread is usually the best default for both the agent instance id and the session name. That gives each thread a continuing conversation while keeping platform routing outside the agent.

Persist chat-side state, such as webhook deduplication and subscription data, in the platform integration. Persist agent conversation state with a PyFlue SessionStore when the conversation must survive process restarts. See Production for the durable delivery checklist.

See examples/chat/ for a runnable GitHub-style webhook example.