Skip to content

Client

PyFlueClient connects an application to a deployed PyFlue server. It mirrors the reference SDK client: namespaces for agents, workflows, runs, and admin, plus WebSocket helpers for live interaction.

Create a client

from pyflue import PyFlueClient

client = PyFlueClient("http://127.0.0.1:2024")
# or the factory form, matching the reference:
# from pyflue import create_flue_client
# client = create_flue_client(base_url="http://127.0.0.1:2024")

print(await client.health())
await client.close()

The client owns its HTTP connection unless you pass your own httpx.AsyncClient. Use it as an async context manager to close it automatically.

Agents

Send a prompt to a persistent agent instance, stream activity, or open a WebSocket for several prompts over one connection.

# List discovered agents
agents = await client.agents()

# One prompt
result = await client.agents.invoke("support_assistant", "ticket-8472", payload={"message": "Status?"})
print(result["result"]["text"])

# Multiple prompts over one connection
async with client.agents.connect("support_assistant", "ticket-8472") as conn:
    first = await conn.prompt("Summarize the case.", session="billing")
    second = await conn.prompt("Now draft a reply.", session="billing")

Persistent direct agent calls return {"result": ...} and do not include a run id. Runs belong to workflows.

Workflows

Start a workflow and choose how to observe it: accept a run id, wait for the result, stream events, or open a WebSocket for one invocation.

# Accept a run id immediately
receipt = await client.workflows.invoke("summarize", {"text": "..."})
run_id = receipt["run_id"]

# Wait for the completed result
done = await client.workflows.invoke("summarize", {"text": "..."}, wait=True)
print(done["result"]["summary"])

# Stream run events
async for event in client.workflows.stream("summarize", {"text": "..."}):
    print(event.type)

# One invocation over WebSocket
async with client.workflows.connect("summarize") as conn:
    messages = await conn.run({"text": "..."})

Runs

Inspect a workflow run after it starts. Runs apply to workflows only.

record = await client.runs.get(run_id)

# Persisted events, optionally after an index
events = await client.runs.events(run_id, after=0)

# Replay then tail until completion
async for event in client.runs.stream(run_id):
    print(event.type)

Admin

The read only admin namespace lists agents, instances, and runs for an operations view. Mount it behind your own authentication in production.

agents = await client.admin.agents.list()
instances = await client.admin.instances.list("support_assistant")
runs = await client.admin.runs.list()
record = await client.admin.runs.get(run_id)

WebSocket URLs

The client derives WebSocket URLs from the base URL: http becomes ws and https becomes wss. Pass an https:// base URL in production so connections use wss://.

Naming

Both snake_case and camelCase entry points are available (create_flue_client and createFlueClient) so code ported from the reference keeps working.