Handler Classes

class conflux.handlers.AssignRole(role)

Bases: Handler

Assign a role to the last message sent to this Handler by the current HandlerChain.

Parameters:

role (str)

async process(msg, chain)

Process a message and return a new transformed message.

Parameters:
  • msg (Message) – Message to be processed / transformed.

  • chain (HandlerChain) – Current HandlerChain object that is executing this handler.

Raises:

NotImplementedError – If the process method is not implemented.

Returns:

Transformed message.

Return type:

str | Message

class conflux.handlers.BatchInputOpenAiLLM(role=None, model='gpt-4o-mini', **openai_kwgs)

Bases: Handler

Parameters:
  • role (str)

  • model (str)

async process(msg, chain)

Generate a response using the message passed to this handler. If OpenAI api key is not set in the environment, then the api key can be passed as a variable in the HandlerChain.variables dictionary.

Parameters:
  • msg (Message) – user response sent to OpenAI chatGPT.

  • chain (HandlerChain) – Casccade that this handler is a part of.

Returns:

response from OpenAI chatGPT.

Return type:

Message

class conflux.handlers.GeminiLLM(role=None, model='gemini-2.5-flash-preview-05-20', structure=None, **openai_kwgs)

Bases: OpenAiLLM

Parameters:
  • role (str)

  • model (str)

  • structure (type[BaseModel] | None)

class conflux.handlers.ManyToolCalls(*, tool_calls)

Bases: BaseModel

Parameters:

tool_calls (list[SingleToolCall])

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

tool_calls: list[SingleToolCall]
class conflux.handlers.McpToolCall(config, *, llm, tool_call_strategy='single', **llm_kwgs)

Bases: Handler

Parameters:
  • config (ClientTransport | FastMCP | MCPConfig | AnyUrl | Path | dict[str, Any] | str)

  • llm (type[Handler])

  • tool_call_strategy (Literal['many', 'single'])

async many_tool_calls(msg)
Parameters:

msg (Message)

Return type:

ManyToolCalls

async process(msg, chain)

Process a message and return a new transformed message.

Parameters:
  • msg (Message) – Message to be processed / transformed.

  • chain (HandlerChain) – Current HandlerChain object that is executing this handler.

Raises:

NotImplementedError – If the process method is not implemented.

Returns:

Transformed message.

Return type:

str | Message

role: str = 'mcp_tool_call'
async single_tool_call(msg)
Parameters:

msg (Message)

Return type:

SingleToolCall

class conflux.handlers.OpenAiLLM(role=None, model='gpt-4o-mini', structure=None, **openai_kwgs)

Bases: Handler

Handler for generating a response using OpenAI’s Language Model.

Parameters:
  • role (str)

  • model (str)

  • structure (type[BaseModel] | None)

async process(msg, chain)

Generate a response using the message passed to this handler. If OpenAI api key is not set in the environment, then the api key can be passed as a variable in the HandlerChain.variables dictionary.

Parameters:
  • msg (Message) – user response sent to OpenAI chatGPT.

  • chain (HandlerChain) – Casccade that this handler is a part of.

Returns:

response from OpenAI chatGPT.

Return type:

Message

async request(completion_config, content)
Parameters:
  • completion_config (dict)

  • content (str | Iterable[ChatCompletionContentPartTextParam | ChatCompletionContentPartImageParam | ChatCompletionContentPartInputAudioParam | File])

Return type:

ChatCompletion

class conflux.handlers.OpenRouterLLM(role=None, model='anthropic/claude-sonnet-4', structure=None, extra_headers=None, extra_body=None, **openai_kwgs)

Bases: OpenAiLLM

Parameters:
  • role (str)

  • model (str)

  • structure (type[BaseModel] | None)

  • extra_headers (dict | None)

  • extra_body (dict | None)

class conflux.handlers.RetryHandlerChain(sub_chain, max_attempts=3)

Bases: Handler

Retry a HandlerChain until it produces some output before max_attempts.

Parameters:
  • sub_chain (HandlerChain) – HandlerChain to be retried.

  • max_attempts (int, optional) – Maximum number of times to retry. Defaults to 3.

  • role (str, optional) – Role of the handler. Defaults to “RetryHandlerChain”.

async process(msg, chain)

Process a message and return a new transformed message.

Parameters:
  • msg (Message) – Message to be processed / transformed.

  • chain (HandlerChain) – Current HandlerChain object that is executing this handler.

Raises:

NotImplementedError – If the process method is not implemented.

Returns:

Transformed message.

Return type:

str | Message

class conflux.handlers.SimilarityRetriever(index_db, k=5, reranker=None, k_reranked=3, join_policy='\n\n')

Bases: Handler

Initialize a Retriever object.

Parameters:
  • index_db (VectorDB) – The index database.

  • k (int, optional) – The number of records to retrieve. Defaults to 5.

  • reranker (Callable[[list[Record]], list[Record]] | None, optional) – The reranker function. Defaults to None.

  • k_reranked (int, optional) – The number of records to keep after reranking. Defaults to 3.

  • join_policy (str | Callable[[list[Record]], str | Message], optional) –

    The policy for joining the retrieved records. It can be a string or a callable function. Defaults to “\n\n”.

    • If it is a string, the records will be joined using the string as a separator.

    • If it is a callable function, the function should accept a list of records and return a str or a Message.

async process(msg, chain)

Process the given message and retrieve similar records. :param msg: The message to process. :type msg: Message :param chain: The handler chain. :type chain: HandlerChain

Returns:

The joined records as a string or a message.

Return type:

str | Message

Raises:

ValueError – If the join policy is invalid.

Parameters:
role: str = 'retriever'
class conflux.handlers.SingleToolCall(*, tool_name, arguments)

Bases: BaseModel

Parameters:
  • tool_name (str)

  • arguments (dict[str, Any])

arguments: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

tool_name: str