Skip to content

Layer 1 - LLM Abstraction

This layer is about abstracting the function of an LLM, so that you can think about prompting and output and tool use in a way that does not tie you to a specific LLM, its calling conventions, and the quirks of its specific library.

At this layer we have:

  • LLMBroker: This is the main entrypoint to the layer. It leverages an LLM specific Gateway, and is the primary interface for interacting with the LLM on the other side. The LLMBroker correctly handles text generation, structured output, and tool use.

  • ChatSession: This is a simple class that wraps the LLMBroker and provides a conversational interface to the LLM with context size management. It is a good starting point for building a chatbot.

  • OllamaGateway, OpenAIGateway: These are out-of-the-box adapters that will interact with models available through Ollama and OpenAI.

Note: - OpenAIGateway supports environment-variable defaults. If api_key is not provided, it uses OPENAI_API_KEY. If base_url is not provided, it uses OPENAI_API_ENDPOINT. - Explicit constructor arguments always take precedence over environment variables.

  • LLMGateway: This is the abstract class that all LLM adapters must inherit from. It provides a common interface and isolation point for interacting with LLMs.

  • MessageBuilder: This is a utility class for constructing messages with text, images, and file contents using a fluent interface.

Architecture Overview

The following diagram illustrates how the key classes in Layer 1 relate to each other:

classDiagram
    class LLMBroker {
        +model: str
        +adapter: LLMGateway
        +tokenizer: TokenizerGateway
        +tracer: TracerSystem
        +generate(messages, tools, temperature) str
        +generate_object(messages, object_model) BaseModel
    }

    class ChatSession {
        +broker: LLMBroker
        +messages: List[LLMMessage]
        +chat(message) str
        +clear_history()
    }

    class LLMGateway {
        <<abstract>>
        +complete(model, messages, tools) LLMGatewayResponse
        +calculate_embeddings(text, model) List[float]
    }

    class OllamaGateway {
        +complete(model, messages, tools) LLMGatewayResponse
        +calculate_embeddings(text, model) List[float]
    }

    class OpenAIGateway {
        +complete(model, messages, tools) LLMGatewayResponse
        +calculate_embeddings(text, model) List[float]
    }

    class TokenizerGateway {
        +encode(text) List
        +decode(tokens) str
    }

    class TracerSystem {
        <<interface>>
        +record_llm_call()
        +record_llm_response()
        +record_tool_call()
    }

    class LLMGatewayResponse {
        +content: str
        +tool_calls: List[LLMToolCall]
        +object: BaseModel
    }

    class LLMMessage {
        +role: MessageRole
        +content: str
        +tool_calls: List[LLMToolCall]
    }

    class MessageBuilder {
        +add_text(text) MessageBuilder
        +add_image(path) MessageBuilder
        +add_file(path) MessageBuilder
        +build() LLMMessage
    }

    ChatSession --> LLMBroker : wraps
    LLMBroker --> LLMGateway : uses via adapter
    LLMBroker --> TokenizerGateway : uses
    LLMBroker --> TracerSystem : uses (optional)
    OllamaGateway --|> LLMGateway : extends
    OpenAIGateway --|> LLMGateway : extends
    LLMGateway --> LLMGatewayResponse : returns
    LLMBroker --> LLMMessage : sends/receives
    MessageBuilder --> LLMMessage : builds
    LLMGatewayResponse --> LLMMessage : contains

Working with Embeddings

Mojentic provides embeddings functionality through the calculate_embeddings method in both the OllamaGateway and OpenAIGateway classes. Embeddings are vector representations of text that capture semantic meaning, making them useful for similarity comparisons, clustering, and other NLP tasks.

Usage Example

from mojentic.llm.gateways import OllamaGateway, OpenAIGateway

# Initialize the gateways
ollama_gateway = OllamaGateway()
openai_gateway = OpenAIGateway(api_key="your-api-key")

# Calculate embeddings using Ollama
text = "This is a sample text for embeddings."
ollama_embeddings = ollama_gateway.calculate_embeddings(
    text=text,
    model="mxbai-embed-large"  # Default model for Ollama
)

# Calculate embeddings using OpenAI
openai_embeddings = openai_gateway.calculate_embeddings(
    text=text,
    model="text-embedding-3-large"  # Default model for OpenAI
)

# Use the embeddings for similarity comparison, clustering, etc.
print(f"Ollama embeddings dimension: {len(ollama_embeddings)}")
print(f"OpenAI embeddings dimension: {len(openai_embeddings)}")

Important Notes

  • Available Models:
  • For Ollama: Models like "mxbai-embed-large" (default), "nomic-embed-text" are commonly used
  • For OpenAI: Models like "text-embedding-3-large" (default), "text-embedding-3-small", "text-embedding-ada-002" are available

  • Embedding Dimensions:

  • Different models produce embeddings with different dimensions
  • Ollama's "mxbai-embed-large" typically produces 1024-dimensional embeddings
  • OpenAI's "text-embedding-3-large" typically produces 3072-dimensional embeddings

  • Performance Considerations:

  • Embedding generation is generally faster and less resource-intensive than text generation
  • Local embedding models (via Ollama) may be more cost-effective for high-volume applications

Working with Images

Mojentic supports sending images to LLMs using the MessageBuilder class. This allows you to perform image analysis, OCR, and other vision-based tasks with a clean, fluent interface.

Usage Example

from mojentic.llm import LLMBroker
from mojentic.llm import MessageBuilder
from pathlib import Path

# Initialize the LLM broker
llm = LLMBroker(model="gemma3:27b")  # Use an image-capable model

# Build a message with an image
message = MessageBuilder("Describe what you see in this image.") \
    .add_image(Path.cwd() / "images" / "example.jpg") \
    .build()

# Generate a response
result = llm.generate(messages=[message])

print(result)

Important Notes

  • Image-Capable Models: You must use an image-capable model to process images. Not all models support image analysis.
  • For Ollama: Models like "gemma3", "llava", and "bakllava" support image analysis
  • For OpenAI: Models like "gpt-4-vision-preview" and "gpt-4o" support image analysis

  • Image Formats: Supported image formats include JPEG, PNG, GIF, and WebP.

  • Implementation Details:

  • The MessageBuilder handles the appropriate encoding of images for different LLM providers
  • For Ollama: Images are passed as file paths (handled internally by MessageBuilder)
  • For OpenAI: Images are base64-encoded and included in the message content (handled internally by MessageBuilder)

  • Performance Considerations: Image analysis may require more tokens and processing time than text-only requests.

Building Blocks

mojentic.llm.LLMBroker

This class is responsible for managing interaction with a Large Language Model. It abstracts the user from the specific mechanics of the LLM and provides a common interface for generating responses.

Source code in src/mojentic/llm/llm_broker.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
class LLMBroker():
    """
    This class is responsible for managing interaction with a Large Language Model. It abstracts
    the user
    from the specific mechanics of the LLM and provides a common interface for generating responses.
    """

    adapter: LLMGateway
    tokenizer: TokenizerGateway
    model: str
    tracer: Optional[TracerSystem]

    def __init__(self, model: str, gateway: Optional[LLMGateway] = None,
                 tokenizer: Optional[TokenizerGateway] = None,
                 tracer: Optional[TracerSystem] = None):
        """
        Create an instance of the LLMBroker.

        Parameters
        ----------
        model
            The name of the model to use.
        gateway
            The gateway to use for communication with the LLM. If None, a gateway is created that
            will utilize a local
            Ollama server.
        tokenizer
            The gateway to use for tokenization. This is used to log approximate token counts for
            the LLM calls. If
            None, tiktoken's `cl100k_base` tokenizer is used.
        tracer
            Optional tracer system to record LLM calls and responses.
        """
        self.model = model

        # Use null_tracer if no tracer is provided
        from mojentic.tracer import null_tracer
        self.tracer = tracer or null_tracer

        if tokenizer is None:
            self.tokenizer = TokenizerGateway()
        else:
            self.tokenizer = tokenizer
        if gateway is None:
            self.adapter = OllamaGateway()
        else:
            self.adapter = gateway

    def generate(self, messages: List[LLMMessage], tools=None,
                 config: Optional[CompletionConfig] = None,
                 temperature: Optional[float] = None, num_ctx: Optional[int] = None,
                 num_predict: Optional[int] = None, max_tokens: Optional[int] = None,
                 correlation_id: str = None) -> str:
        """
        Generate a text response from the LLM.

        Parameters
        ----------
        messages : LLMMessage
            A list of messages to send to the LLM.
        tools : List[Tool]
            A list of tools to use with the LLM. If a tool call is requested, the tool will be
            called and the output
            will be included in the response.
        config : Optional[CompletionConfig]
            Configuration object for LLM completion (recommended). If provided with individual
            kwargs, a DeprecationWarning is emitted.
        temperature : Optional[float]
            The temperature to use for the response. Deprecated: use config.
        num_ctx : Optional[int]
            The number of context tokens to use. Deprecated: use config.
        num_predict : Optional[int]
            The number of tokens to predict. Deprecated: use config.
        max_tokens : Optional[int]
            The maximum number of tokens to generate. Deprecated: use config.
        correlation_id : str
            UUID string that is copied from cause-to-affect for tracing events.

        Returns
        -------
        str
            The response from the LLM.
        """
        # Handle config vs individual kwargs
        if config is not None and any(
                param is not None for param in [temperature, num_ctx, num_predict, max_tokens]):
            warnings.warn(
                "Both config and individual kwargs provided. Using config and ignoring kwargs. "
                "Individual kwargs are deprecated, use config=CompletionConfig(...) instead.",
                DeprecationWarning,
                stacklevel=2
            )
        elif config is None:
            # Build config from individual kwargs
            config = CompletionConfig(
                temperature=temperature if temperature is not None else 1.0,
                num_ctx=num_ctx if num_ctx is not None else 32768,
                num_predict=num_predict if num_predict is not None else -1,
                max_tokens=max_tokens if max_tokens is not None else 16384
            )
        approximate_tokens = len(self.tokenizer.encode(self._content_to_count(messages)))
        logger.info(f"Requesting llm response with approx {approximate_tokens} tokens")

        # Convert messages to serializable dict for audit
        messages_for_tracer = [m.model_dump() for m in messages]

        # Record LLM call in tracer
        tools_for_tracer = [{"name": t.name, "description": t.description} for t in
                            tools] if tools else None
        self.tracer.record_llm_call(
            self.model,
            messages_for_tracer,
            config.temperature,
            tools=tools_for_tracer,
            source=type(self),
            correlation_id=correlation_id
        )

        # Measure call duration for audit
        start_time = time.time()

        result: LLMGatewayResponse = self.adapter.complete(
            model=self.model,
            messages=messages,
            tools=tools,
            config=config,
            temperature=config.temperature,
            num_ctx=config.num_ctx,
            num_predict=config.num_predict,
            max_tokens=config.max_tokens)

        call_duration_ms = (time.time() - start_time) * 1000

        # Record LLM response in tracer
        tool_calls_for_tracer = [tc.model_dump() for tc in
                                 result.tool_calls] if result.tool_calls else None
        self.tracer.record_llm_response(
            self.model,
            result.content,
            tool_calls=tool_calls_for_tracer,
            call_duration_ms=call_duration_ms,
            source=type(self),
            correlation_id=correlation_id
        )

        if result.tool_calls and tools is not None:
            logger.info("Tool call requested")
            for tool_call in result.tool_calls:
                if tool := next((t for t in tools if
                                 t.matches(tool_call.name)),
                                None):
                    logger.info('Calling function', function=tool_call.name)
                    logger.info('Arguments:', arguments=tool_call.arguments)

                    # Get the arguments before calling the tool
                    tool_arguments = tool_call.arguments

                    # Measure tool execution time
                    tool_start_time = time.time()

                    # Call the tool
                    output = tool.run(**tool_call.arguments)

                    tool_duration_ms = (time.time() - tool_start_time) * 1000

                    # Record tool call in tracer
                    self.tracer.record_tool_call(
                        tool_call.name,
                        tool_arguments,
                        output,
                        caller="LLMBroker",
                        call_duration_ms=tool_duration_ms,
                        source=type(self),
                        correlation_id=correlation_id
                    )

                    logger.info('Function output', output=output)
                    messages.append(LLMMessage(role=MessageRole.Assistant, tool_calls=[tool_call]))
                    messages.append(
                        LLMMessage(role=MessageRole.Tool, content=json.dumps(output),
                                   tool_calls=[tool_call]))
                    # {'role': 'tool', 'content': str(output), 'name': tool_call.name,
                    # 'tool_call_id': tool_call.id})
                    return self.generate(messages, tools, config=config,
                                         correlation_id=correlation_id)
                else:
                    logger.warn('Function not found', function=tool_call.name)
                    logger.info('Expected usage of missing function', expected_usage=tool_call)
                    # raise Exception('Unknown tool function requested:',
                    # requested_tool.function.name)

        return result.content

    def generate_stream(self, messages: List[LLMMessage], tools=None,
                        config: Optional[CompletionConfig] = None,
                        temperature: Optional[float] = None, num_ctx: Optional[int] = None,
                        num_predict: Optional[int] = None, max_tokens: Optional[int] = None,
                        correlation_id: str = None) -> Iterator[str]:
        """
        Generate a streaming text response from the LLM.

        This method mirrors generate() but yields content chunks as they arrive from the LLM,
        providing a better user experience for long-running requests. When tool calls are
        detected, tools are executed and the LLM is called recursively, with the new response
        also being streamed.

        Parameters
        ----------
        messages : List[LLMMessage]
            A list of messages to send to the LLM.
        tools : List[Tool]
            A list of tools to use with the LLM. If a tool call is requested, the tool will be
            called and the output will be included in the response.
        config : Optional[CompletionConfig]
            Configuration object for LLM completion (recommended). If provided with individual
            kwargs, a DeprecationWarning is emitted.
        temperature : Optional[float]
            The temperature to use for the response. Deprecated: use config.
        num_ctx : Optional[int]
            The number of context tokens to use. Deprecated: use config.
        num_predict : Optional[int]
            The number of tokens to predict. Deprecated: use config.
        max_tokens : Optional[int]
            The maximum number of tokens to generate. Deprecated: use config.
        correlation_id : str
            UUID string that is copied from cause-to-affect for tracing events.

        Yields
        ------
        str
            Content chunks as they arrive from the LLM.
        """
        # Handle config vs individual kwargs
        if config is not None and any(
                param is not None for param in [temperature, num_ctx, num_predict, max_tokens]):
            warnings.warn(
                "Both config and individual kwargs provided. Using config and ignoring kwargs. "
                "Individual kwargs are deprecated, use config=CompletionConfig(...) instead.",
                DeprecationWarning,
                stacklevel=2
            )
        elif config is None:
            # Build config from individual kwargs
            config = CompletionConfig(
                temperature=temperature if temperature is not None else 1.0,
                num_ctx=num_ctx if num_ctx is not None else 32768,
                num_predict=num_predict if num_predict is not None else -1,
                max_tokens=max_tokens if max_tokens is not None else 16384
            )
        # Check if gateway supports streaming
        if not hasattr(self.adapter, 'complete_stream'):
            raise NotImplementedError(f"Gateway {type(self.adapter).__name__} does not support streaming")

        approximate_tokens = len(self.tokenizer.encode(self._content_to_count(messages)))
        logger.info(f"Requesting streaming llm response with approx {approximate_tokens} tokens")

        # Convert messages to serializable dict for audit
        messages_for_tracer = [m.model_dump() for m in messages]

        # Record LLM call in tracer
        tools_for_tracer = [{"name": t.name, "description": t.description} for t in
                            tools] if tools else None
        self.tracer.record_llm_call(
            self.model,
            messages_for_tracer,
            config.temperature,
            tools=tools_for_tracer,
            source=type(self),
            correlation_id=correlation_id
        )

        # Measure call duration for audit
        start_time = time.time()

        # Accumulate content and tool calls from stream
        accumulated_content = ""
        accumulated_tool_calls = []

        stream = self.adapter.complete_stream(
            model=self.model,
            messages=messages,
            tools=tools,
            config=config,
            temperature=config.temperature,
            num_ctx=config.num_ctx,
            num_predict=config.num_predict,
            max_tokens=config.max_tokens
        )

        for chunk in stream:
            # Handle content chunks
            if hasattr(chunk, 'content') and chunk.content:
                accumulated_content += chunk.content
                yield chunk.content

            # Handle tool calls if present
            if hasattr(chunk, 'tool_calls') and chunk.tool_calls:
                accumulated_tool_calls.extend(chunk.tool_calls)

        call_duration_ms = (time.time() - start_time) * 1000

        # Record LLM response in tracer
        tool_calls_for_tracer = [tc.model_dump() if hasattr(tc, 'model_dump') else tc for tc in
                                 accumulated_tool_calls] if accumulated_tool_calls else None
        self.tracer.record_llm_response(
            self.model,
            accumulated_content,
            tool_calls=tool_calls_for_tracer,
            call_duration_ms=call_duration_ms,
            source=type(self),
            correlation_id=correlation_id
        )

        # Process tool calls if any were accumulated
        if accumulated_tool_calls and tools is not None:
            logger.info("Tool call requested in streaming response")
            for tool_call in accumulated_tool_calls:
                # Handle both LLMToolCall objects and raw tool call data
                if hasattr(tool_call, 'name'):
                    tool_name = tool_call.name
                    tool_arguments = tool_call.arguments
                else:
                    # Handle ollama's tool call format
                    tool_name = tool_call.function.name
                    tool_arguments = tool_call.function.arguments

                if tool := next((t for t in tools if t.matches(tool_name)), None):
                    logger.info('Calling function', function=tool_name)
                    logger.info('Arguments:', arguments=tool_arguments)

                    # Measure tool execution time
                    tool_start_time = time.time()

                    # Call the tool
                    output = tool.run(**tool_arguments)

                    tool_duration_ms = (time.time() - tool_start_time) * 1000

                    # Record tool call in tracer
                    self.tracer.record_tool_call(
                        tool_name,
                        tool_arguments,
                        output,
                        caller="LLMBroker.generate_stream",
                        call_duration_ms=tool_duration_ms,
                        source=type(self),
                        correlation_id=correlation_id
                    )

                    logger.info('Function output', output=output)

                    # Convert to LLMToolCall if needed, preserving the ID if it exists
                    if not isinstance(tool_call, LLMToolCall):
                        # Extract ID if available from the tool_call object
                        tool_call_id = None
                        if hasattr(tool_call, 'id'):
                            tool_call_id = tool_call.id
                        elif hasattr(tool_call, 'function') and hasattr(tool_call.function, 'id'):
                            tool_call_id = tool_call.function.id

                        tool_call = LLMToolCall(id=tool_call_id, name=tool_name, arguments=tool_arguments)

                    messages.append(LLMMessage(role=MessageRole.Assistant, tool_calls=[tool_call]))
                    messages.append(
                        LLMMessage(role=MessageRole.Tool, content=json.dumps(output),
                                   tool_calls=[tool_call]))

                    # Recursively stream the response after tool execution
                    yield from self.generate_stream(
                        messages, tools, config=config, correlation_id=correlation_id
                    )
                    return  # Exit after recursive call
                else:
                    logger.warn('Function not found', function=tool_name)

    def _content_to_count(self, messages: List[LLMMessage]):
        content = ""
        for message in messages:
            if message.content:
                content += message.content
        return content

    def generate_object(self, messages: List[LLMMessage], object_model: Type[BaseModel],
                        config: Optional[CompletionConfig] = None,
                        temperature: Optional[float] = None, num_ctx: Optional[int] = None,
                        num_predict: Optional[int] = None, max_tokens: Optional[int] = None,
                        correlation_id: str = None) -> BaseModel:
        """
        Generate a structured response from the LLM and return it as an object.

        Parameters
        ----------
        messages : List[LLMMessage]
            A list of messages to send to the LLM.
        object_model : BaseModel
            The class of the model to use for the structured response data.
        config : Optional[CompletionConfig]
            Configuration object for LLM completion (recommended). If provided with individual
            kwargs, a DeprecationWarning is emitted.
        temperature : Optional[float]
            The temperature to use for the response. Deprecated: use config.
        num_ctx : Optional[int]
            The number of context tokens to use. Deprecated: use config.
        num_predict : Optional[int]
            The number of tokens to predict. Deprecated: use config.
        max_tokens : Optional[int]
            The maximum number of tokens to generate. Deprecated: use config.
        correlation_id : str
            UUID string that is copied from cause-to-affect for tracing events.

        Returns
        -------
        BaseModel
            An instance of the model class provided containing the structured response data.
        """
        # Handle config vs individual kwargs
        if config is not None and any(
                param is not None for param in [temperature, num_ctx, num_predict, max_tokens]):
            warnings.warn(
                "Both config and individual kwargs provided. Using config and ignoring kwargs. "
                "Individual kwargs are deprecated, use config=CompletionConfig(...) instead.",
                DeprecationWarning,
                stacklevel=2
            )
        elif config is None:
            # Build config from individual kwargs
            config = CompletionConfig(
                temperature=temperature if temperature is not None else 1.0,
                num_ctx=num_ctx if num_ctx is not None else 32768,
                num_predict=num_predict if num_predict is not None else -1,
                max_tokens=max_tokens if max_tokens is not None else 16384
            )
        approximate_tokens = len(self.tokenizer.encode(self._content_to_count(messages)))
        logger.info(f"Requesting llm response with approx {approximate_tokens} tokens")

        # Convert messages to serializable dict for audit
        messages_for_tracer = [m.model_dump() for m in messages]

        # Record LLM call in tracer
        self.tracer.record_llm_call(
            self.model,
            messages_for_tracer,
            config.temperature,
            tools=None,
            source=type(self),
            correlation_id=correlation_id
        )

        # Measure call duration for audit
        start_time = time.time()

        result = self.adapter.complete(model=self.model, messages=messages,
                                       object_model=object_model,
                                       config=config,
                                       temperature=config.temperature, num_ctx=config.num_ctx,
                                       num_predict=config.num_predict, max_tokens=config.max_tokens)

        call_duration_ms = (time.time() - start_time) * 1000

        # Record LLM response in tracer with object representation
        # Convert object to string for tracer
        object_str = str(result.object.model_dump()) if hasattr(result.object,
                                                                "model_dump") else str(
            result.object)
        self.tracer.record_llm_response(
            self.model,
            f"Structured response: {object_str}",
            call_duration_ms=call_duration_ms,
            source=type(self),
            correlation_id=correlation_id
        )

        return result.object

__init__(model, gateway=None, tokenizer=None, tracer=None)

Create an instance of the LLMBroker.

Parameters:

Name Type Description Default
model str

The name of the model to use.

required
gateway Optional[LLMGateway]

The gateway to use for communication with the LLM. If None, a gateway is created that will utilize a local Ollama server.

None
tokenizer Optional[TokenizerGateway]

The gateway to use for tokenization. This is used to log approximate token counts for the LLM calls. If None, tiktoken's cl100k_base tokenizer is used.

None
tracer Optional[TracerSystem]

Optional tracer system to record LLM calls and responses.

None
Source code in src/mojentic/llm/llm_broker.py
def __init__(self, model: str, gateway: Optional[LLMGateway] = None,
             tokenizer: Optional[TokenizerGateway] = None,
             tracer: Optional[TracerSystem] = None):
    """
    Create an instance of the LLMBroker.

    Parameters
    ----------
    model
        The name of the model to use.
    gateway
        The gateway to use for communication with the LLM. If None, a gateway is created that
        will utilize a local
        Ollama server.
    tokenizer
        The gateway to use for tokenization. This is used to log approximate token counts for
        the LLM calls. If
        None, tiktoken's `cl100k_base` tokenizer is used.
    tracer
        Optional tracer system to record LLM calls and responses.
    """
    self.model = model

    # Use null_tracer if no tracer is provided
    from mojentic.tracer import null_tracer
    self.tracer = tracer or null_tracer

    if tokenizer is None:
        self.tokenizer = TokenizerGateway()
    else:
        self.tokenizer = tokenizer
    if gateway is None:
        self.adapter = OllamaGateway()
    else:
        self.adapter = gateway

generate(messages, tools=None, config=None, temperature=None, num_ctx=None, num_predict=None, max_tokens=None, correlation_id=None)

Generate a text response from the LLM.

Parameters:

Name Type Description Default
messages LLMMessage

A list of messages to send to the LLM.

required
tools List[Tool]

A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output will be included in the response.

None
config Optional[CompletionConfig]

Configuration object for LLM completion (recommended). If provided with individual kwargs, a DeprecationWarning is emitted.

None
temperature Optional[float]

The temperature to use for the response. Deprecated: use config.

None
num_ctx Optional[int]

The number of context tokens to use. Deprecated: use config.

None
num_predict Optional[int]

The number of tokens to predict. Deprecated: use config.

None
max_tokens Optional[int]

The maximum number of tokens to generate. Deprecated: use config.

None
correlation_id str

UUID string that is copied from cause-to-affect for tracing events.

None

Returns:

Type Description
str

The response from the LLM.

Source code in src/mojentic/llm/llm_broker.py
def generate(self, messages: List[LLMMessage], tools=None,
             config: Optional[CompletionConfig] = None,
             temperature: Optional[float] = None, num_ctx: Optional[int] = None,
             num_predict: Optional[int] = None, max_tokens: Optional[int] = None,
             correlation_id: str = None) -> str:
    """
    Generate a text response from the LLM.

    Parameters
    ----------
    messages : LLMMessage
        A list of messages to send to the LLM.
    tools : List[Tool]
        A list of tools to use with the LLM. If a tool call is requested, the tool will be
        called and the output
        will be included in the response.
    config : Optional[CompletionConfig]
        Configuration object for LLM completion (recommended). If provided with individual
        kwargs, a DeprecationWarning is emitted.
    temperature : Optional[float]
        The temperature to use for the response. Deprecated: use config.
    num_ctx : Optional[int]
        The number of context tokens to use. Deprecated: use config.
    num_predict : Optional[int]
        The number of tokens to predict. Deprecated: use config.
    max_tokens : Optional[int]
        The maximum number of tokens to generate. Deprecated: use config.
    correlation_id : str
        UUID string that is copied from cause-to-affect for tracing events.

    Returns
    -------
    str
        The response from the LLM.
    """
    # Handle config vs individual kwargs
    if config is not None and any(
            param is not None for param in [temperature, num_ctx, num_predict, max_tokens]):
        warnings.warn(
            "Both config and individual kwargs provided. Using config and ignoring kwargs. "
            "Individual kwargs are deprecated, use config=CompletionConfig(...) instead.",
            DeprecationWarning,
            stacklevel=2
        )
    elif config is None:
        # Build config from individual kwargs
        config = CompletionConfig(
            temperature=temperature if temperature is not None else 1.0,
            num_ctx=num_ctx if num_ctx is not None else 32768,
            num_predict=num_predict if num_predict is not None else -1,
            max_tokens=max_tokens if max_tokens is not None else 16384
        )
    approximate_tokens = len(self.tokenizer.encode(self._content_to_count(messages)))
    logger.info(f"Requesting llm response with approx {approximate_tokens} tokens")

    # Convert messages to serializable dict for audit
    messages_for_tracer = [m.model_dump() for m in messages]

    # Record LLM call in tracer
    tools_for_tracer = [{"name": t.name, "description": t.description} for t in
                        tools] if tools else None
    self.tracer.record_llm_call(
        self.model,
        messages_for_tracer,
        config.temperature,
        tools=tools_for_tracer,
        source=type(self),
        correlation_id=correlation_id
    )

    # Measure call duration for audit
    start_time = time.time()

    result: LLMGatewayResponse = self.adapter.complete(
        model=self.model,
        messages=messages,
        tools=tools,
        config=config,
        temperature=config.temperature,
        num_ctx=config.num_ctx,
        num_predict=config.num_predict,
        max_tokens=config.max_tokens)

    call_duration_ms = (time.time() - start_time) * 1000

    # Record LLM response in tracer
    tool_calls_for_tracer = [tc.model_dump() for tc in
                             result.tool_calls] if result.tool_calls else None
    self.tracer.record_llm_response(
        self.model,
        result.content,
        tool_calls=tool_calls_for_tracer,
        call_duration_ms=call_duration_ms,
        source=type(self),
        correlation_id=correlation_id
    )

    if result.tool_calls and tools is not None:
        logger.info("Tool call requested")
        for tool_call in result.tool_calls:
            if tool := next((t for t in tools if
                             t.matches(tool_call.name)),
                            None):
                logger.info('Calling function', function=tool_call.name)
                logger.info('Arguments:', arguments=tool_call.arguments)

                # Get the arguments before calling the tool
                tool_arguments = tool_call.arguments

                # Measure tool execution time
                tool_start_time = time.time()

                # Call the tool
                output = tool.run(**tool_call.arguments)

                tool_duration_ms = (time.time() - tool_start_time) * 1000

                # Record tool call in tracer
                self.tracer.record_tool_call(
                    tool_call.name,
                    tool_arguments,
                    output,
                    caller="LLMBroker",
                    call_duration_ms=tool_duration_ms,
                    source=type(self),
                    correlation_id=correlation_id
                )

                logger.info('Function output', output=output)
                messages.append(LLMMessage(role=MessageRole.Assistant, tool_calls=[tool_call]))
                messages.append(
                    LLMMessage(role=MessageRole.Tool, content=json.dumps(output),
                               tool_calls=[tool_call]))
                # {'role': 'tool', 'content': str(output), 'name': tool_call.name,
                # 'tool_call_id': tool_call.id})
                return self.generate(messages, tools, config=config,
                                     correlation_id=correlation_id)
            else:
                logger.warn('Function not found', function=tool_call.name)
                logger.info('Expected usage of missing function', expected_usage=tool_call)
                # raise Exception('Unknown tool function requested:',
                # requested_tool.function.name)

    return result.content

generate_object(messages, object_model, config=None, temperature=None, num_ctx=None, num_predict=None, max_tokens=None, correlation_id=None)

Generate a structured response from the LLM and return it as an object.

Parameters:

Name Type Description Default
messages List[LLMMessage]

A list of messages to send to the LLM.

required
object_model BaseModel

The class of the model to use for the structured response data.

required
config Optional[CompletionConfig]

Configuration object for LLM completion (recommended). If provided with individual kwargs, a DeprecationWarning is emitted.

None
temperature Optional[float]

The temperature to use for the response. Deprecated: use config.

None
num_ctx Optional[int]

The number of context tokens to use. Deprecated: use config.

None
num_predict Optional[int]

The number of tokens to predict. Deprecated: use config.

None
max_tokens Optional[int]

The maximum number of tokens to generate. Deprecated: use config.

None
correlation_id str

UUID string that is copied from cause-to-affect for tracing events.

None

Returns:

Type Description
BaseModel

An instance of the model class provided containing the structured response data.

Source code in src/mojentic/llm/llm_broker.py
def generate_object(self, messages: List[LLMMessage], object_model: Type[BaseModel],
                    config: Optional[CompletionConfig] = None,
                    temperature: Optional[float] = None, num_ctx: Optional[int] = None,
                    num_predict: Optional[int] = None, max_tokens: Optional[int] = None,
                    correlation_id: str = None) -> BaseModel:
    """
    Generate a structured response from the LLM and return it as an object.

    Parameters
    ----------
    messages : List[LLMMessage]
        A list of messages to send to the LLM.
    object_model : BaseModel
        The class of the model to use for the structured response data.
    config : Optional[CompletionConfig]
        Configuration object for LLM completion (recommended). If provided with individual
        kwargs, a DeprecationWarning is emitted.
    temperature : Optional[float]
        The temperature to use for the response. Deprecated: use config.
    num_ctx : Optional[int]
        The number of context tokens to use. Deprecated: use config.
    num_predict : Optional[int]
        The number of tokens to predict. Deprecated: use config.
    max_tokens : Optional[int]
        The maximum number of tokens to generate. Deprecated: use config.
    correlation_id : str
        UUID string that is copied from cause-to-affect for tracing events.

    Returns
    -------
    BaseModel
        An instance of the model class provided containing the structured response data.
    """
    # Handle config vs individual kwargs
    if config is not None and any(
            param is not None for param in [temperature, num_ctx, num_predict, max_tokens]):
        warnings.warn(
            "Both config and individual kwargs provided. Using config and ignoring kwargs. "
            "Individual kwargs are deprecated, use config=CompletionConfig(...) instead.",
            DeprecationWarning,
            stacklevel=2
        )
    elif config is None:
        # Build config from individual kwargs
        config = CompletionConfig(
            temperature=temperature if temperature is not None else 1.0,
            num_ctx=num_ctx if num_ctx is not None else 32768,
            num_predict=num_predict if num_predict is not None else -1,
            max_tokens=max_tokens if max_tokens is not None else 16384
        )
    approximate_tokens = len(self.tokenizer.encode(self._content_to_count(messages)))
    logger.info(f"Requesting llm response with approx {approximate_tokens} tokens")

    # Convert messages to serializable dict for audit
    messages_for_tracer = [m.model_dump() for m in messages]

    # Record LLM call in tracer
    self.tracer.record_llm_call(
        self.model,
        messages_for_tracer,
        config.temperature,
        tools=None,
        source=type(self),
        correlation_id=correlation_id
    )

    # Measure call duration for audit
    start_time = time.time()

    result = self.adapter.complete(model=self.model, messages=messages,
                                   object_model=object_model,
                                   config=config,
                                   temperature=config.temperature, num_ctx=config.num_ctx,
                                   num_predict=config.num_predict, max_tokens=config.max_tokens)

    call_duration_ms = (time.time() - start_time) * 1000

    # Record LLM response in tracer with object representation
    # Convert object to string for tracer
    object_str = str(result.object.model_dump()) if hasattr(result.object,
                                                            "model_dump") else str(
        result.object)
    self.tracer.record_llm_response(
        self.model,
        f"Structured response: {object_str}",
        call_duration_ms=call_duration_ms,
        source=type(self),
        correlation_id=correlation_id
    )

    return result.object

generate_stream(messages, tools=None, config=None, temperature=None, num_ctx=None, num_predict=None, max_tokens=None, correlation_id=None)

Generate a streaming text response from the LLM.

This method mirrors generate() but yields content chunks as they arrive from the LLM, providing a better user experience for long-running requests. When tool calls are detected, tools are executed and the LLM is called recursively, with the new response also being streamed.

Parameters:

Name Type Description Default
messages List[LLMMessage]

A list of messages to send to the LLM.

required
tools List[Tool]

A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output will be included in the response.

None
config Optional[CompletionConfig]

Configuration object for LLM completion (recommended). If provided with individual kwargs, a DeprecationWarning is emitted.

None
temperature Optional[float]

The temperature to use for the response. Deprecated: use config.

None
num_ctx Optional[int]

The number of context tokens to use. Deprecated: use config.

None
num_predict Optional[int]

The number of tokens to predict. Deprecated: use config.

None
max_tokens Optional[int]

The maximum number of tokens to generate. Deprecated: use config.

None
correlation_id str

UUID string that is copied from cause-to-affect for tracing events.

None

Yields:

Type Description
str

Content chunks as they arrive from the LLM.

Source code in src/mojentic/llm/llm_broker.py
def generate_stream(self, messages: List[LLMMessage], tools=None,
                    config: Optional[CompletionConfig] = None,
                    temperature: Optional[float] = None, num_ctx: Optional[int] = None,
                    num_predict: Optional[int] = None, max_tokens: Optional[int] = None,
                    correlation_id: str = None) -> Iterator[str]:
    """
    Generate a streaming text response from the LLM.

    This method mirrors generate() but yields content chunks as they arrive from the LLM,
    providing a better user experience for long-running requests. When tool calls are
    detected, tools are executed and the LLM is called recursively, with the new response
    also being streamed.

    Parameters
    ----------
    messages : List[LLMMessage]
        A list of messages to send to the LLM.
    tools : List[Tool]
        A list of tools to use with the LLM. If a tool call is requested, the tool will be
        called and the output will be included in the response.
    config : Optional[CompletionConfig]
        Configuration object for LLM completion (recommended). If provided with individual
        kwargs, a DeprecationWarning is emitted.
    temperature : Optional[float]
        The temperature to use for the response. Deprecated: use config.
    num_ctx : Optional[int]
        The number of context tokens to use. Deprecated: use config.
    num_predict : Optional[int]
        The number of tokens to predict. Deprecated: use config.
    max_tokens : Optional[int]
        The maximum number of tokens to generate. Deprecated: use config.
    correlation_id : str
        UUID string that is copied from cause-to-affect for tracing events.

    Yields
    ------
    str
        Content chunks as they arrive from the LLM.
    """
    # Handle config vs individual kwargs
    if config is not None and any(
            param is not None for param in [temperature, num_ctx, num_predict, max_tokens]):
        warnings.warn(
            "Both config and individual kwargs provided. Using config and ignoring kwargs. "
            "Individual kwargs are deprecated, use config=CompletionConfig(...) instead.",
            DeprecationWarning,
            stacklevel=2
        )
    elif config is None:
        # Build config from individual kwargs
        config = CompletionConfig(
            temperature=temperature if temperature is not None else 1.0,
            num_ctx=num_ctx if num_ctx is not None else 32768,
            num_predict=num_predict if num_predict is not None else -1,
            max_tokens=max_tokens if max_tokens is not None else 16384
        )
    # Check if gateway supports streaming
    if not hasattr(self.adapter, 'complete_stream'):
        raise NotImplementedError(f"Gateway {type(self.adapter).__name__} does not support streaming")

    approximate_tokens = len(self.tokenizer.encode(self._content_to_count(messages)))
    logger.info(f"Requesting streaming llm response with approx {approximate_tokens} tokens")

    # Convert messages to serializable dict for audit
    messages_for_tracer = [m.model_dump() for m in messages]

    # Record LLM call in tracer
    tools_for_tracer = [{"name": t.name, "description": t.description} for t in
                        tools] if tools else None
    self.tracer.record_llm_call(
        self.model,
        messages_for_tracer,
        config.temperature,
        tools=tools_for_tracer,
        source=type(self),
        correlation_id=correlation_id
    )

    # Measure call duration for audit
    start_time = time.time()

    # Accumulate content and tool calls from stream
    accumulated_content = ""
    accumulated_tool_calls = []

    stream = self.adapter.complete_stream(
        model=self.model,
        messages=messages,
        tools=tools,
        config=config,
        temperature=config.temperature,
        num_ctx=config.num_ctx,
        num_predict=config.num_predict,
        max_tokens=config.max_tokens
    )

    for chunk in stream:
        # Handle content chunks
        if hasattr(chunk, 'content') and chunk.content:
            accumulated_content += chunk.content
            yield chunk.content

        # Handle tool calls if present
        if hasattr(chunk, 'tool_calls') and chunk.tool_calls:
            accumulated_tool_calls.extend(chunk.tool_calls)

    call_duration_ms = (time.time() - start_time) * 1000

    # Record LLM response in tracer
    tool_calls_for_tracer = [tc.model_dump() if hasattr(tc, 'model_dump') else tc for tc in
                             accumulated_tool_calls] if accumulated_tool_calls else None
    self.tracer.record_llm_response(
        self.model,
        accumulated_content,
        tool_calls=tool_calls_for_tracer,
        call_duration_ms=call_duration_ms,
        source=type(self),
        correlation_id=correlation_id
    )

    # Process tool calls if any were accumulated
    if accumulated_tool_calls and tools is not None:
        logger.info("Tool call requested in streaming response")
        for tool_call in accumulated_tool_calls:
            # Handle both LLMToolCall objects and raw tool call data
            if hasattr(tool_call, 'name'):
                tool_name = tool_call.name
                tool_arguments = tool_call.arguments
            else:
                # Handle ollama's tool call format
                tool_name = tool_call.function.name
                tool_arguments = tool_call.function.arguments

            if tool := next((t for t in tools if t.matches(tool_name)), None):
                logger.info('Calling function', function=tool_name)
                logger.info('Arguments:', arguments=tool_arguments)

                # Measure tool execution time
                tool_start_time = time.time()

                # Call the tool
                output = tool.run(**tool_arguments)

                tool_duration_ms = (time.time() - tool_start_time) * 1000

                # Record tool call in tracer
                self.tracer.record_tool_call(
                    tool_name,
                    tool_arguments,
                    output,
                    caller="LLMBroker.generate_stream",
                    call_duration_ms=tool_duration_ms,
                    source=type(self),
                    correlation_id=correlation_id
                )

                logger.info('Function output', output=output)

                # Convert to LLMToolCall if needed, preserving the ID if it exists
                if not isinstance(tool_call, LLMToolCall):
                    # Extract ID if available from the tool_call object
                    tool_call_id = None
                    if hasattr(tool_call, 'id'):
                        tool_call_id = tool_call.id
                    elif hasattr(tool_call, 'function') and hasattr(tool_call.function, 'id'):
                        tool_call_id = tool_call.function.id

                    tool_call = LLMToolCall(id=tool_call_id, name=tool_name, arguments=tool_arguments)

                messages.append(LLMMessage(role=MessageRole.Assistant, tool_calls=[tool_call]))
                messages.append(
                    LLMMessage(role=MessageRole.Tool, content=json.dumps(output),
                               tool_calls=[tool_call]))

                # Recursively stream the response after tool execution
                yield from self.generate_stream(
                    messages, tools, config=config, correlation_id=correlation_id
                )
                return  # Exit after recursive call
            else:
                logger.warn('Function not found', function=tool_name)

mojentic.llm.ChatSession

This class is responsible for managing the state of a conversation with the LLM.

Source code in src/mojentic/llm/chat_session.py
class ChatSession:
    """
    This class is responsible for managing the state of a conversation with the LLM.
    """

    messages: List[SizedLLMMessage] = []

    def __init__(self,
                 llm: LLMBroker,
                 system_prompt: str = "You are a helpful assistant.",
                 tools: Optional[List[LLMTool]] = None,
                 max_context: int = 32768,
                 tokenizer_gateway: TokenizerGateway = None,
                 config: Optional[CompletionConfig] = None,
                 temperature: float = 1.0):
        """
        Create an instance of the ChatSession.

        Parameters
        ----------
        llm : LLMBroker
            The broker to use for generating responses.
        system_prompt : str, optional
            The prompt to use for the system messages. Defaults to "You are a helpful assistant."
        tools : List[LLMTool], optional
            The tools you want to make available to the LLM. Defaults to None.
        max_context : int, optional
            The maximum number of tokens to keep in the context. Defaults to 32768.
        tokenizer_gateway : TokenizerGateway, optional
            The gateway to use for tokenization. If None, `mxbai-embed-large` is used on a local Ollama server.
        config : Optional[CompletionConfig], optional
            Configuration object for LLM completion. If None, one is created from temperature and max_context.
        temperature : float, optional
            The temperature to use for the response. Defaults to 1.0. Deprecated: use config.
        """

        self.llm = llm
        self.system_prompt = system_prompt
        self.tools = tools
        self.max_context = max_context

        # Use config if provided, otherwise build from individual kwargs
        if config is not None:
            self.config = config
        else:
            self.config = CompletionConfig(
                temperature=temperature,
                num_ctx=max_context
            )

        if tokenizer_gateway is None:
            self.tokenizer_gateway = TokenizerGateway()
        else:
            self.tokenizer_gateway = tokenizer_gateway

        self.messages = []
        self.insert_message(LLMMessage(role=MessageRole.System, content=self.system_prompt))

    def send(self, query):
        """
        Send a query to the LLM and return the response. Also records the query and response in the ongoing chat
        session.

        Parameters
        ----------
        query : str
            The query to send to the LLM.

        Returns
        -------
        str
            The response from the LLM.
        """
        self.insert_message(LLMMessage(role=MessageRole.User, content=query))
        response = self.llm.generate(self.messages, tools=self.tools, config=self.config)
        self._ensure_all_messages_are_sized()
        self.insert_message(LLMMessage(role=MessageRole.Assistant, content=response))
        return response

    def send_stream(self, query) -> Iterator[str]:
        """
        Send a query to the LLM and yield response chunks as they arrive. Records the query and
        the full assembled response in the ongoing chat session after the stream is consumed.

        Parameters
        ----------
        query : str
            The query to send to the LLM.

        Yields
        ------
        str
            Content chunks from the LLM response as they arrive.
        """
        self.insert_message(LLMMessage(role=MessageRole.User, content=query))
        accumulated = []
        for chunk in self.llm.generate_stream(self.messages, tools=self.tools, config=self.config):
            accumulated.append(chunk)
            yield chunk
        self._ensure_all_messages_are_sized()
        self.insert_message(LLMMessage(role=MessageRole.Assistant, content="".join(accumulated)))

    def insert_message(self, message: LLMMessage):
        """
        Add a message onto the end of the chat session. If the total token count exceeds the max context, the oldest
        messages are removed.

        Parameters
        ----------
        message : LLMMessage
            The message to add to the chat session.
        """
        self.messages.append(self._build_sized_message(message))
        total_length = sum([msg.token_length for msg in self.messages])
        while total_length > self.max_context:
            total_length -= self.messages.pop(1).token_length

    def _build_sized_message(self, message):
        if message.content is None:
            return SizedLLMMessage(**message.model_dump(), token_length=0)
        else:
            new_message_length = len(self.tokenizer_gateway.encode(message.content))
            new_message = SizedLLMMessage(**message.model_dump(), token_length=new_message_length)
            return new_message

    def _ensure_all_messages_are_sized(self):
        for i, message in enumerate(self.messages):
            if not isinstance(message, SizedLLMMessage):
                self.messages[i] = self._build_sized_message(message)

__init__(llm, system_prompt='You are a helpful assistant.', tools=None, max_context=32768, tokenizer_gateway=None, config=None, temperature=1.0)

Create an instance of the ChatSession.

Parameters:

Name Type Description Default
llm LLMBroker

The broker to use for generating responses.

required
system_prompt str

The prompt to use for the system messages. Defaults to "You are a helpful assistant."

'You are a helpful assistant.'
tools List[LLMTool]

The tools you want to make available to the LLM. Defaults to None.

None
max_context int

The maximum number of tokens to keep in the context. Defaults to 32768.

32768
tokenizer_gateway TokenizerGateway

The gateway to use for tokenization. If None, mxbai-embed-large is used on a local Ollama server.

None
config Optional[CompletionConfig]

Configuration object for LLM completion. If None, one is created from temperature and max_context.

None
temperature float

The temperature to use for the response. Defaults to 1.0. Deprecated: use config.

1.0
Source code in src/mojentic/llm/chat_session.py
def __init__(self,
             llm: LLMBroker,
             system_prompt: str = "You are a helpful assistant.",
             tools: Optional[List[LLMTool]] = None,
             max_context: int = 32768,
             tokenizer_gateway: TokenizerGateway = None,
             config: Optional[CompletionConfig] = None,
             temperature: float = 1.0):
    """
    Create an instance of the ChatSession.

    Parameters
    ----------
    llm : LLMBroker
        The broker to use for generating responses.
    system_prompt : str, optional
        The prompt to use for the system messages. Defaults to "You are a helpful assistant."
    tools : List[LLMTool], optional
        The tools you want to make available to the LLM. Defaults to None.
    max_context : int, optional
        The maximum number of tokens to keep in the context. Defaults to 32768.
    tokenizer_gateway : TokenizerGateway, optional
        The gateway to use for tokenization. If None, `mxbai-embed-large` is used on a local Ollama server.
    config : Optional[CompletionConfig], optional
        Configuration object for LLM completion. If None, one is created from temperature and max_context.
    temperature : float, optional
        The temperature to use for the response. Defaults to 1.0. Deprecated: use config.
    """

    self.llm = llm
    self.system_prompt = system_prompt
    self.tools = tools
    self.max_context = max_context

    # Use config if provided, otherwise build from individual kwargs
    if config is not None:
        self.config = config
    else:
        self.config = CompletionConfig(
            temperature=temperature,
            num_ctx=max_context
        )

    if tokenizer_gateway is None:
        self.tokenizer_gateway = TokenizerGateway()
    else:
        self.tokenizer_gateway = tokenizer_gateway

    self.messages = []
    self.insert_message(LLMMessage(role=MessageRole.System, content=self.system_prompt))

insert_message(message)

Add a message onto the end of the chat session. If the total token count exceeds the max context, the oldest messages are removed.

Parameters:

Name Type Description Default
message LLMMessage

The message to add to the chat session.

required
Source code in src/mojentic/llm/chat_session.py
def insert_message(self, message: LLMMessage):
    """
    Add a message onto the end of the chat session. If the total token count exceeds the max context, the oldest
    messages are removed.

    Parameters
    ----------
    message : LLMMessage
        The message to add to the chat session.
    """
    self.messages.append(self._build_sized_message(message))
    total_length = sum([msg.token_length for msg in self.messages])
    while total_length > self.max_context:
        total_length -= self.messages.pop(1).token_length

send(query)

Send a query to the LLM and return the response. Also records the query and response in the ongoing chat session.

Parameters:

Name Type Description Default
query str

The query to send to the LLM.

required

Returns:

Type Description
str

The response from the LLM.

Source code in src/mojentic/llm/chat_session.py
def send(self, query):
    """
    Send a query to the LLM and return the response. Also records the query and response in the ongoing chat
    session.

    Parameters
    ----------
    query : str
        The query to send to the LLM.

    Returns
    -------
    str
        The response from the LLM.
    """
    self.insert_message(LLMMessage(role=MessageRole.User, content=query))
    response = self.llm.generate(self.messages, tools=self.tools, config=self.config)
    self._ensure_all_messages_are_sized()
    self.insert_message(LLMMessage(role=MessageRole.Assistant, content=response))
    return response

send_stream(query)

Send a query to the LLM and yield response chunks as they arrive. Records the query and the full assembled response in the ongoing chat session after the stream is consumed.

Parameters:

Name Type Description Default
query str

The query to send to the LLM.

required

Yields:

Type Description
str

Content chunks from the LLM response as they arrive.

Source code in src/mojentic/llm/chat_session.py
def send_stream(self, query) -> Iterator[str]:
    """
    Send a query to the LLM and yield response chunks as they arrive. Records the query and
    the full assembled response in the ongoing chat session after the stream is consumed.

    Parameters
    ----------
    query : str
        The query to send to the LLM.

    Yields
    ------
    str
        Content chunks from the LLM response as they arrive.
    """
    self.insert_message(LLMMessage(role=MessageRole.User, content=query))
    accumulated = []
    for chunk in self.llm.generate_stream(self.messages, tools=self.tools, config=self.config):
        accumulated.append(chunk)
        yield chunk
    self._ensure_all_messages_are_sized()
    self.insert_message(LLMMessage(role=MessageRole.Assistant, content="".join(accumulated)))

mojentic.llm.gateways.LLMGateway

This is an abstract class from which specific LLM gateways are derived.

To create a new gateway, inherit from this class and implement the complete method.

Source code in src/mojentic/llm/gateways/llm_gateway.py
class LLMGateway:
    """
    This is an abstract class from which specific LLM gateways are derived.

    To create a new gateway, inherit from this class and implement the `complete` method.
    """

    def complete(self,
                 model: str,
                 messages: List[LLMMessage],
                 object_model: Optional[Type[BaseModel]] = None,
                 tools: Optional[List[LLMTool]] = None,
                 config: Optional['CompletionConfig'] = None,
                 temperature: float = 1.0,
                 num_ctx: int = 32768, max_tokens: int = 16384,
                 num_predict: int = -1) -> LLMGatewayResponse:
        """
        Complete the LLM request.

        Parameters
        ----------
        model : str
            The name of the model to use, as appears in `ollama list`.
        messages : List[LLMMessage]
            A list of messages to send to the LLM.
        object_model : Optional[BaseModel]
            The model to use for validating the response.
        tools : Optional[List[LLMTool]]
            A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
            will be included in the response.
        config : Optional[CompletionConfig]
            Configuration object for LLM completion (recommended over individual kwargs).
        temperature : float
            The temperature to use for the response. Defaults to 1.0. (Deprecated: use config)
        num_ctx : int
            The number of context tokens to use. Defaults to 32768. (Deprecated: use config)
        max_tokens : int
            The maximum number of tokens to generate. Defaults to 16384. (Deprecated: use config)
        num_predict : int
            The number of tokens to predict. Defaults to no limit. (Deprecated: use config)

        Returns
        -------
        LLMGatewayResponse
            The response from the Ollama service.
        """
        raise NotImplementedError

    def get_available_models(self) -> List[str]:
        """
        Get the list of available models.

        Returns
        -------
        List[str]
            The list of available models.
        """
        raise NotImplementedError

    def calculate_embeddings(self, text: str, model: str = None) -> List[float]:
        """
        Calculate embeddings for the given text using the specified model.

        Parameters
        ----------
        text : str
            The text to calculate embeddings for.
        model : str, optional
            The name of the model to use for embeddings. Default value depends on the implementation.

        Returns
        -------
        List[Any]
            The embeddings for the text.
        """
        raise NotImplementedError

calculate_embeddings(text, model=None)

Calculate embeddings for the given text using the specified model.

Parameters:

Name Type Description Default
text str

The text to calculate embeddings for.

required
model str

The name of the model to use for embeddings. Default value depends on the implementation.

None

Returns:

Type Description
List[Any]

The embeddings for the text.

Source code in src/mojentic/llm/gateways/llm_gateway.py
def calculate_embeddings(self, text: str, model: str = None) -> List[float]:
    """
    Calculate embeddings for the given text using the specified model.

    Parameters
    ----------
    text : str
        The text to calculate embeddings for.
    model : str, optional
        The name of the model to use for embeddings. Default value depends on the implementation.

    Returns
    -------
    List[Any]
        The embeddings for the text.
    """
    raise NotImplementedError

complete(model, messages, object_model=None, tools=None, config=None, temperature=1.0, num_ctx=32768, max_tokens=16384, num_predict=-1)

Complete the LLM request.

Parameters:

Name Type Description Default
model str

The name of the model to use, as appears in ollama list.

required
messages List[LLMMessage]

A list of messages to send to the LLM.

required
object_model Optional[BaseModel]

The model to use for validating the response.

None
tools Optional[List[LLMTool]]

A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output will be included in the response.

None
config Optional[CompletionConfig]

Configuration object for LLM completion (recommended over individual kwargs).

None
temperature float

The temperature to use for the response. Defaults to 1.0. (Deprecated: use config)

1.0
num_ctx int

The number of context tokens to use. Defaults to 32768. (Deprecated: use config)

32768
max_tokens int

The maximum number of tokens to generate. Defaults to 16384. (Deprecated: use config)

16384
num_predict int

The number of tokens to predict. Defaults to no limit. (Deprecated: use config)

-1

Returns:

Type Description
LLMGatewayResponse

The response from the Ollama service.

Source code in src/mojentic/llm/gateways/llm_gateway.py
def complete(self,
             model: str,
             messages: List[LLMMessage],
             object_model: Optional[Type[BaseModel]] = None,
             tools: Optional[List[LLMTool]] = None,
             config: Optional['CompletionConfig'] = None,
             temperature: float = 1.0,
             num_ctx: int = 32768, max_tokens: int = 16384,
             num_predict: int = -1) -> LLMGatewayResponse:
    """
    Complete the LLM request.

    Parameters
    ----------
    model : str
        The name of the model to use, as appears in `ollama list`.
    messages : List[LLMMessage]
        A list of messages to send to the LLM.
    object_model : Optional[BaseModel]
        The model to use for validating the response.
    tools : Optional[List[LLMTool]]
        A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
        will be included in the response.
    config : Optional[CompletionConfig]
        Configuration object for LLM completion (recommended over individual kwargs).
    temperature : float
        The temperature to use for the response. Defaults to 1.0. (Deprecated: use config)
    num_ctx : int
        The number of context tokens to use. Defaults to 32768. (Deprecated: use config)
    max_tokens : int
        The maximum number of tokens to generate. Defaults to 16384. (Deprecated: use config)
    num_predict : int
        The number of tokens to predict. Defaults to no limit. (Deprecated: use config)

    Returns
    -------
    LLMGatewayResponse
        The response from the Ollama service.
    """
    raise NotImplementedError

get_available_models()

Get the list of available models.

Returns:

Type Description
List[str]

The list of available models.

Source code in src/mojentic/llm/gateways/llm_gateway.py
def get_available_models(self) -> List[str]:
    """
    Get the list of available models.

    Returns
    -------
    List[str]
        The list of available models.
    """
    raise NotImplementedError

mojentic.llm.gateways.OllamaGateway

Bases: LLMGateway

This class is a gateway to the Ollama LLM service.

Parameters:

Name Type Description Default
host str

The Ollama host to connect to. Defaults to "http://localhost:11434".

'http://localhost:11434'
headers dict

The headers to send with the request. Defaults to an empty dict.

{}
Source code in src/mojentic/llm/gateways/ollama.py
class OllamaGateway(LLMGateway):
    """
    This class is a gateway to the Ollama LLM service.

    Parameters
    ----------
    host : str, optional
        The Ollama host to connect to. Defaults to "http://localhost:11434".
    headers : dict, optional
        The headers to send with the request. Defaults to an empty dict.
    """

    def __init__(self, host="http://localhost:11434", headers={}, timeout=None):
        self.client = Client(host=host, headers=headers, timeout=timeout)

    def _extract_options_from_args(self, args):
        # Extract config if present, otherwise use individual kwargs
        config = args.get('config', None)
        if config:
            options = Options(
                temperature=config.temperature,
                num_ctx=config.num_ctx,
            )
            if config.num_predict > 0:
                options.num_predict = config.num_predict
            if config.max_tokens:
                options.num_predict = config.max_tokens
        else:
            options = Options(
                temperature=args.get('temperature', 1.0),
                num_ctx=args.get('num_ctx', 32768),
            )
            if args.get('num_predict', 0) > 0:
                options.num_predict = args['num_predict']
            if 'max_tokens' in args:
                options.num_predict = args['max_tokens']
        return options

    def complete(self, **args) -> LLMGatewayResponse:
        """
        Complete the LLM request by delegating to the Ollama service.

        Keyword Arguments
        ----------------
        model : str
            The name of the model to use, as appears in `ollama list`.
        messages : List[LLMMessage]
            A list of messages to send to the LLM.
        object_model : Optional[BaseModel]
            The model to use for validating the response.
        tools : Optional[List[LLMTool]]
            A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
            will be included in the response.
        temperature : float, optional
            The temperature to use for the response. Defaults to 1.0.
        num_ctx : int, optional
            The number of context tokens to use. Defaults to 32768.
        max_tokens : int, optional
            The maximum number of tokens to generate. Defaults to 16384.
        num_predict : int, optional
            The number of tokens to predict. Defaults to no limit.

        Returns
        -------
        LLMGatewayResponse
            The response from the Ollama service.
        """
        logger.info("Delegating to Ollama for completion", **args)

        options = self._extract_options_from_args(args)

        ollama_args = {
            'model': args['model'],
            'messages': adapt_messages_to_ollama(args['messages']),
            'options': options
        }

        # Handle reasoning effort - if config has reasoning_effort set, enable thinking
        config = args.get('config', None)
        if config and config.reasoning_effort is not None:
            ollama_args['think'] = True
            logger.info("Enabling extended thinking for Ollama", reasoning_effort=config.reasoning_effort)

        if 'object_model' in args and args['object_model'] is not None:
            ollama_args['format'] = args['object_model'].model_json_schema()

        if 'tools' in args and args['tools'] is not None:
            ollama_args['tools'] = [t.descriptor for t in args['tools']]

        response: ChatResponse = self.client.chat(**ollama_args)

        object = None
        tool_calls = []

        if 'object_model' in args:
            try:
                object = args['object_model'].model_validate_json(response.message.content)
            except Exception as e:
                logger.error("Failed to validate model in", error=str(e), response=response.message.content,
                             object_model=args['object_model'])

        if response.message.tool_calls is not None:
            tool_calls = [LLMToolCall(name=t.function.name,
                                      arguments={str(k): str(t.function.arguments[k]) for k in t.function.arguments})
                          for t in response.message.tool_calls]

        # Extract thinking content if present
        thinking = getattr(response.message, 'thinking', None)

        return LLMGatewayResponse(
            content=response.message.content,
            object=object,
            tool_calls=tool_calls,
            thinking=thinking
        )

    def complete_stream(self, **args) -> Iterator[StreamingResponse]:
        """
        Stream the LLM response from Ollama service.

        Keyword Arguments
        ----------------
        model : str
            The name of the model to use, as appears in `ollama list`.
        messages : List[LLMMessage]
            A list of messages to send to the LLM.
        tools : Optional[List[LLMTool]]
            A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
            will be included in the response.
        temperature : float, optional
            The temperature to use for the response. Defaults to 1.0.
        num_ctx : int, optional
            The number of context tokens to use. Defaults to 32768.
        max_tokens : int, optional
            The maximum number of tokens to generate. Defaults to 16384.
        num_predict : int, optional
            The number of tokens to predict. Defaults to no limit.

        Returns
        -------
        Iterator[StreamingResponse]
            An iterator of StreamingResponse objects containing response chunks.
        """
        logger.info("Delegating to Ollama for streaming completion", **args)

        options = self._extract_options_from_args(args)
        ollama_args = {
            'model': args['model'],
            'messages': adapt_messages_to_ollama(args['messages']),
            'options': options,
            'stream': True
        }

        # Handle reasoning effort - if config has reasoning_effort set, enable thinking
        config = args.get('config', None)
        if config and config.reasoning_effort is not None:
            ollama_args['think'] = True
            logger.info("Enabling extended thinking for Ollama streaming", reasoning_effort=config.reasoning_effort)

        # Enable tool support if tools are provided
        if 'tools' in args and args['tools'] is not None:
            ollama_args['tools'] = [t.descriptor for t in args['tools']]

        stream = self.client.chat(**ollama_args)

        for chunk in stream:
            if chunk.message:
                # Yield content chunks as they arrive
                if chunk.message.content:
                    yield StreamingResponse(content=chunk.message.content)

                # Yield thinking chunks when they arrive
                if hasattr(chunk.message, 'thinking') and chunk.message.thinking:
                    yield StreamingResponse(thinking=chunk.message.thinking)

                # Yield tool calls when they arrive
                if chunk.message.tool_calls:
                    yield StreamingResponse(tool_calls=chunk.message.tool_calls)

    def get_available_models(self) -> List[str]:
        """
        Get the list of available local models, sorted alphabetically.

        Returns
        -------
        List[str]
            The list of available models, sorted alphabetically.
        """
        return sorted([m.model for m in self.client.list().models])

    def pull_model(self, model: str) -> None:
        """
        Pull the model from the Ollama service.

        Parameters
        ----------
        model : str
            The name of the model to pull.
        """
        self.client.pull(model)

    def calculate_embeddings(self, text: str, model: str = "mxbai-embed-large") -> List[float]:
        """
        Calculate embeddings for the given text using the specified model.

        Parameters
        ----------
        text : str
            The text to calculate embeddings for.
        model : str, optional
            The name of the model to use for embeddings. Defaults to "mxbai-embed-large".

        Returns
        -------
        list
            The embeddings for the text.
        """
        logger.debug("calculate_embeddings", text=text, model=model)
        embed = self.client.embeddings(model=model, prompt=text)
        return embed.embedding

calculate_embeddings(text, model='mxbai-embed-large')

Calculate embeddings for the given text using the specified model.

Parameters:

Name Type Description Default
text str

The text to calculate embeddings for.

required
model str

The name of the model to use for embeddings. Defaults to "mxbai-embed-large".

'mxbai-embed-large'

Returns:

Type Description
list

The embeddings for the text.

Source code in src/mojentic/llm/gateways/ollama.py
def calculate_embeddings(self, text: str, model: str = "mxbai-embed-large") -> List[float]:
    """
    Calculate embeddings for the given text using the specified model.

    Parameters
    ----------
    text : str
        The text to calculate embeddings for.
    model : str, optional
        The name of the model to use for embeddings. Defaults to "mxbai-embed-large".

    Returns
    -------
    list
        The embeddings for the text.
    """
    logger.debug("calculate_embeddings", text=text, model=model)
    embed = self.client.embeddings(model=model, prompt=text)
    return embed.embedding

complete(**args)

Complete the LLM request by delegating to the Ollama service.

Keyword Arguments

model : str The name of the model to use, as appears in ollama list. messages : List[LLMMessage] A list of messages to send to the LLM. object_model : Optional[BaseModel] The model to use for validating the response. tools : Optional[List[LLMTool]] A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output will be included in the response. temperature : float, optional The temperature to use for the response. Defaults to 1.0. num_ctx : int, optional The number of context tokens to use. Defaults to 32768. max_tokens : int, optional The maximum number of tokens to generate. Defaults to 16384. num_predict : int, optional The number of tokens to predict. Defaults to no limit.

Returns:

Type Description
LLMGatewayResponse

The response from the Ollama service.

Source code in src/mojentic/llm/gateways/ollama.py
def complete(self, **args) -> LLMGatewayResponse:
    """
    Complete the LLM request by delegating to the Ollama service.

    Keyword Arguments
    ----------------
    model : str
        The name of the model to use, as appears in `ollama list`.
    messages : List[LLMMessage]
        A list of messages to send to the LLM.
    object_model : Optional[BaseModel]
        The model to use for validating the response.
    tools : Optional[List[LLMTool]]
        A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
        will be included in the response.
    temperature : float, optional
        The temperature to use for the response. Defaults to 1.0.
    num_ctx : int, optional
        The number of context tokens to use. Defaults to 32768.
    max_tokens : int, optional
        The maximum number of tokens to generate. Defaults to 16384.
    num_predict : int, optional
        The number of tokens to predict. Defaults to no limit.

    Returns
    -------
    LLMGatewayResponse
        The response from the Ollama service.
    """
    logger.info("Delegating to Ollama for completion", **args)

    options = self._extract_options_from_args(args)

    ollama_args = {
        'model': args['model'],
        'messages': adapt_messages_to_ollama(args['messages']),
        'options': options
    }

    # Handle reasoning effort - if config has reasoning_effort set, enable thinking
    config = args.get('config', None)
    if config and config.reasoning_effort is not None:
        ollama_args['think'] = True
        logger.info("Enabling extended thinking for Ollama", reasoning_effort=config.reasoning_effort)

    if 'object_model' in args and args['object_model'] is not None:
        ollama_args['format'] = args['object_model'].model_json_schema()

    if 'tools' in args and args['tools'] is not None:
        ollama_args['tools'] = [t.descriptor for t in args['tools']]

    response: ChatResponse = self.client.chat(**ollama_args)

    object = None
    tool_calls = []

    if 'object_model' in args:
        try:
            object = args['object_model'].model_validate_json(response.message.content)
        except Exception as e:
            logger.error("Failed to validate model in", error=str(e), response=response.message.content,
                         object_model=args['object_model'])

    if response.message.tool_calls is not None:
        tool_calls = [LLMToolCall(name=t.function.name,
                                  arguments={str(k): str(t.function.arguments[k]) for k in t.function.arguments})
                      for t in response.message.tool_calls]

    # Extract thinking content if present
    thinking = getattr(response.message, 'thinking', None)

    return LLMGatewayResponse(
        content=response.message.content,
        object=object,
        tool_calls=tool_calls,
        thinking=thinking
    )

complete_stream(**args)

Stream the LLM response from Ollama service.

Keyword Arguments

model : str The name of the model to use, as appears in ollama list. messages : List[LLMMessage] A list of messages to send to the LLM. tools : Optional[List[LLMTool]] A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output will be included in the response. temperature : float, optional The temperature to use for the response. Defaults to 1.0. num_ctx : int, optional The number of context tokens to use. Defaults to 32768. max_tokens : int, optional The maximum number of tokens to generate. Defaults to 16384. num_predict : int, optional The number of tokens to predict. Defaults to no limit.

Returns:

Type Description
Iterator[StreamingResponse]

An iterator of StreamingResponse objects containing response chunks.

Source code in src/mojentic/llm/gateways/ollama.py
def complete_stream(self, **args) -> Iterator[StreamingResponse]:
    """
    Stream the LLM response from Ollama service.

    Keyword Arguments
    ----------------
    model : str
        The name of the model to use, as appears in `ollama list`.
    messages : List[LLMMessage]
        A list of messages to send to the LLM.
    tools : Optional[List[LLMTool]]
        A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
        will be included in the response.
    temperature : float, optional
        The temperature to use for the response. Defaults to 1.0.
    num_ctx : int, optional
        The number of context tokens to use. Defaults to 32768.
    max_tokens : int, optional
        The maximum number of tokens to generate. Defaults to 16384.
    num_predict : int, optional
        The number of tokens to predict. Defaults to no limit.

    Returns
    -------
    Iterator[StreamingResponse]
        An iterator of StreamingResponse objects containing response chunks.
    """
    logger.info("Delegating to Ollama for streaming completion", **args)

    options = self._extract_options_from_args(args)
    ollama_args = {
        'model': args['model'],
        'messages': adapt_messages_to_ollama(args['messages']),
        'options': options,
        'stream': True
    }

    # Handle reasoning effort - if config has reasoning_effort set, enable thinking
    config = args.get('config', None)
    if config and config.reasoning_effort is not None:
        ollama_args['think'] = True
        logger.info("Enabling extended thinking for Ollama streaming", reasoning_effort=config.reasoning_effort)

    # Enable tool support if tools are provided
    if 'tools' in args and args['tools'] is not None:
        ollama_args['tools'] = [t.descriptor for t in args['tools']]

    stream = self.client.chat(**ollama_args)

    for chunk in stream:
        if chunk.message:
            # Yield content chunks as they arrive
            if chunk.message.content:
                yield StreamingResponse(content=chunk.message.content)

            # Yield thinking chunks when they arrive
            if hasattr(chunk.message, 'thinking') and chunk.message.thinking:
                yield StreamingResponse(thinking=chunk.message.thinking)

            # Yield tool calls when they arrive
            if chunk.message.tool_calls:
                yield StreamingResponse(tool_calls=chunk.message.tool_calls)

get_available_models()

Get the list of available local models, sorted alphabetically.

Returns:

Type Description
List[str]

The list of available models, sorted alphabetically.

Source code in src/mojentic/llm/gateways/ollama.py
def get_available_models(self) -> List[str]:
    """
    Get the list of available local models, sorted alphabetically.

    Returns
    -------
    List[str]
        The list of available models, sorted alphabetically.
    """
    return sorted([m.model for m in self.client.list().models])

pull_model(model)

Pull the model from the Ollama service.

Parameters:

Name Type Description Default
model str

The name of the model to pull.

required
Source code in src/mojentic/llm/gateways/ollama.py
def pull_model(self, model: str) -> None:
    """
    Pull the model from the Ollama service.

    Parameters
    ----------
    model : str
        The name of the model to pull.
    """
    self.client.pull(model)

mojentic.llm.gateways.OpenAIGateway

Bases: LLMGateway

This class is a gateway to the OpenAI LLM service.

Parameters:

Name Type Description Default
api_key str

The OpenAI API key to use. If not provided, defaults to the value of the OPENAI_API_KEY environment variable.

None
base_url str

The base URL for the OpenAI API. If not provided, defaults to the value of the OPENAI_API_ENDPOINT environment variable, or None if not set.

None
Source code in src/mojentic/llm/gateways/openai.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
class OpenAIGateway(LLMGateway):
    """
    This class is a gateway to the OpenAI LLM service.

    Parameters
    ----------
    api_key : str, optional
        The OpenAI API key to use. If not provided, defaults to the value of the
        OPENAI_API_KEY environment variable.
    base_url : str, optional
        The base URL for the OpenAI API. If not provided, defaults to the value of the
        OPENAI_API_ENDPOINT environment variable, or None if not set.
    """

    def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None):
        if api_key is None:
            api_key = os.getenv("OPENAI_API_KEY")
        if base_url is None:
            base_url = os.getenv("OPENAI_API_ENDPOINT")
        self.client = OpenAI(api_key=api_key, base_url=base_url)
        self.model_registry = get_model_registry()

    def _is_reasoning_model(self, model: str) -> bool:
        """
        Determine if a model is a reasoning model that requires max_completion_tokens.

        Parameters
        ----------
        model : str
            The model name to classify.

        Returns
        -------
        bool
            True if the model is a reasoning model, False if it's a chat model.
        """
        return self.model_registry.is_reasoning_model(model)

    def _adapt_parameters_for_model(self, model: str, args: dict) -> dict:
        """
        Adapt parameters based on the model type and capabilities.

        Parameters
        ----------
        model : str
            The model name.
        args : dict
            The original arguments.

        Returns
        -------
        dict
            The adapted arguments with correct parameter names for the model type.
        """
        adapted_args = args.copy()
        capabilities = self.model_registry.get_model_capabilities(model)

        logger.debug("Adapting parameters for model",
                     model=model,
                     model_type=capabilities.model_type.value,
                     supports_tools=capabilities.supports_tools,
                     supports_streaming=capabilities.supports_streaming)

        # Handle token limit parameter conversion
        if 'max_tokens' in adapted_args:
            token_param = capabilities.get_token_limit_param()
            if token_param != 'max_tokens':
                # Convert max_tokens to max_completion_tokens for reasoning models
                adapted_args[token_param] = adapted_args.pop('max_tokens')
                logger.info("Converted token limit parameter for model",
                            model=model,
                            from_param='max_tokens',
                            to_param=token_param,
                            value=adapted_args[token_param])

        # Validate tool usage for models that don't support tools
        if 'tools' in adapted_args and adapted_args['tools'] and not capabilities.supports_tools:
            logger.warning("Model does not support tools, removing tool configuration",
                           model=model,
                           num_tools=len(adapted_args['tools']))
            adapted_args['tools'] = None  # Set to None instead of removing the key

        # Handle temperature restrictions for specific models
        if 'temperature' in adapted_args:
            temperature = adapted_args['temperature']

            # Check if model supports temperature parameter at all
            if capabilities.supported_temperatures == []:
                # Model doesn't support temperature parameter at all - remove it
                logger.warning("Model does not support temperature parameter at all",
                               model=model,
                               requested_temperature=temperature)
                adapted_args.pop('temperature', None)
            elif not capabilities.supports_temperature(temperature):
                # Model supports temperature but not this specific value - use default
                default_temp = 1.0
                logger.warning(
                    "Model does not support requested temperature, using default",
                    model=model,
                    requested_temperature=temperature,
                    default_temperature=default_temp,
                    supported_temperatures=capabilities.supported_temperatures)
                adapted_args['temperature'] = default_temp

        # Handle reasoning_effort for reasoning models
        if 'reasoning_effort' in adapted_args and adapted_args['reasoning_effort'] is not None:
            if capabilities.model_type == ModelType.REASONING:
                # Keep reasoning_effort for reasoning models
                logger.info("Adding reasoning_effort parameter for reasoning model",
                            model=model,
                            reasoning_effort=adapted_args['reasoning_effort'])
            else:
                # Warn and remove for non-reasoning models
                logger.warning("Model does not support reasoning_effort, ignoring parameter",
                               model=model,
                               requested_reasoning_effort=adapted_args['reasoning_effort'])
                adapted_args.pop('reasoning_effort', None)

        return adapted_args

    def _validate_model_parameters(self, model: str, args: dict) -> None:
        """
        Validate that the parameters are compatible with the model.

        Parameters
        ----------
        model : str
            The model name.
        args : dict
            The arguments to validate.
        """
        capabilities = self.model_registry.get_model_capabilities(model)

        # Warning for tools on reasoning models that don't support them
        if (capabilities.model_type == ModelType.REASONING and
                not capabilities.supports_tools and
                'tools' in args and args['tools']):
            logger.warning(
                "Reasoning model may not support tools",
                model=model,
                num_tools=len(args['tools']))

        # Validate token limits (check both possible parameter names)
        token_value = args.get('max_tokens') or args.get('max_completion_tokens')
        if token_value and capabilities.max_output_tokens:
            if token_value > capabilities.max_output_tokens:
                logger.warning(
                    "Requested token limit exceeds model maximum",
                    model=model,
                    requested=token_value,
                    max_allowed=capabilities.max_output_tokens)

    def complete(self, **kwargs) -> LLMGatewayResponse:
        """
        Complete the LLM request by delegating to the OpenAI service.

        Keyword Arguments
        ----------------
        model : str
            The name of the model to use.
        messages : List[LLMMessage]
            A list of messages to send to the LLM.
        object_model : Optional[Type[BaseModel]]
            The model to use for validating the response.
        tools : Optional[List[LLMTool]]
            A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
            will be included in the response.
        temperature : float, optional
            The temperature to use for the response. Defaults to 1.0.
        num_ctx : int, optional
            The number of context tokens to use. Defaults to 32768.
        max_tokens : int, optional
            The maximum number of tokens to generate. Defaults to 16384.
        num_predict : int, optional
            The number of tokens to predict. Defaults to no limit.

        Returns
        -------
        LLMGatewayResponse
            The response from the OpenAI service.
        """
        # Extract parameters from kwargs with defaults
        model = kwargs.get('model')
        messages = kwargs.get('messages')
        object_model = kwargs.get('object_model', None)
        tools = kwargs.get('tools', None)
        config = kwargs.get('config', None)

        # Use config if provided, otherwise use individual kwargs
        if config:
            temperature = config.temperature
            num_ctx = config.num_ctx
            max_tokens = config.max_tokens
            num_predict = config.num_predict
            reasoning_effort = config.reasoning_effort
        else:
            temperature = kwargs.get('temperature', 1.0)
            num_ctx = kwargs.get('num_ctx', 32768)
            max_tokens = kwargs.get('max_tokens', 16384)
            num_predict = kwargs.get('num_predict', -1)
            reasoning_effort = None

        if not model:
            raise ValueError("'model' parameter is required")
        if not messages:
            raise ValueError("'messages' parameter is required")

        # Convert parameters to dict for processing
        args = {
            'model': model,
            'messages': messages,
            'object_model': object_model,
            'tools': tools,
            'temperature': temperature,
            'num_ctx': num_ctx,
            'max_tokens': max_tokens,
            'num_predict': num_predict,
            'reasoning_effort': reasoning_effort
        }

        # Adapt parameters based on model type
        try:
            adapted_args = self._adapt_parameters_for_model(model, args)
        except Exception as e:
            logger.error("Failed to adapt parameters for model",
                         model=model,
                         error=str(e))
            raise

        # Validate parameters after adaptation
        self._validate_model_parameters(model, adapted_args)

        openai_args = {
            'model': adapted_args['model'],
            'messages': adapt_messages_to_openai(adapted_args['messages']),
        }

        # Add temperature if specified
        if 'temperature' in adapted_args:
            openai_args['temperature'] = adapted_args['temperature']

        completion = self.client.chat.completions.create

        if adapted_args['object_model'] is not None:
            completion = self.client.beta.chat.completions.parse
            openai_args['response_format'] = adapted_args['object_model']

        if adapted_args.get('tools') is not None:
            openai_args['tools'] = [t.descriptor for t in adapted_args['tools']]

        # Handle both max_tokens (for chat models) and max_completion_tokens (for reasoning models)
        if 'max_tokens' in adapted_args:
            openai_args['max_tokens'] = adapted_args['max_tokens']
        elif 'max_completion_tokens' in adapted_args:
            openai_args['max_completion_tokens'] = adapted_args['max_completion_tokens']

        # Add reasoning_effort if present in adapted args
        if 'reasoning_effort' in adapted_args and adapted_args['reasoning_effort'] is not None:
            openai_args['reasoning_effort'] = adapted_args['reasoning_effort']

        logger.debug("Making OpenAI API call",
                     model=openai_args['model'],
                     has_tools='tools' in openai_args,
                     has_object_model='response_format' in openai_args,
                     has_reasoning_effort='reasoning_effort' in openai_args,
                     token_param='max_completion_tokens' if 'max_completion_tokens' in openai_args else 'max_tokens')

        try:
            response = completion(**openai_args)
        except BadRequestError as e:
            # Enhanced error handling for parameter issues
            if "max_tokens" in str(e) and "max_completion_tokens" in str(e):
                logger.error(
                    "Parameter error detected - model may require different token parameter",
                    model=model,
                    error=str(e),
                    suggestion="This model may be a reasoning model requiring max_completion_tokens")
            raise e
        except Exception as e:
            logger.error("OpenAI API call failed",
                         model=model,
                         error=str(e))
            raise e

        object = None
        tool_calls: List[LLMToolCall] = []

        if adapted_args.get('object_model') is not None:
            try:
                response_content = response.choices[0].message.content
                if response_content is not None:
                    object = adapted_args['object_model'].model_validate_json(response_content)
                else:
                    logger.error(
                        "No response content available for object validation",
                        object_model=adapted_args['object_model'])
            except Exception as e:
                response_content = (response.choices[0].message.content
                                    if response.choices else "No response content")
                logger.error("Failed to validate model",
                             error=str(e),
                             response=response_content,
                             object_model=adapted_args['object_model'])

        if response.choices[0].message.tool_calls is not None:
            for t in response.choices[0].message.tool_calls:
                arguments = {}
                args_dict = json.loads(t.function.arguments)
                for k in args_dict:
                    arguments[str(k)] = str(args_dict[k])
                tool_call = LLMToolCall(id=t.id, name=t.function.name, arguments=arguments)
                tool_calls.append(tool_call)

        return LLMGatewayResponse(
            content=response.choices[0].message.content,
            object=object,
            tool_calls=tool_calls,
        )

    def complete_stream(self, **kwargs) -> Iterator[StreamingResponse]:
        """
        Stream the LLM response from OpenAI service.

        OpenAI streams tool call arguments incrementally, so we need to accumulate them
        and yield complete tool calls only when the stream finishes.

        Keyword Arguments
        ----------------
        model : str
            The name of the model to use.
        messages : List[LLMMessage]
            A list of messages to send to the LLM.
        tools : Optional[List[LLMTool]]
            A list of tools to use with the LLM. Tool calls will be accumulated and yielded when complete.
        temperature : float, optional
            The temperature to use for the response. Defaults to 1.0.
        num_ctx : int, optional
            The number of context tokens to use. Defaults to 32768.
        max_tokens : int, optional
            The maximum number of tokens to generate. Defaults to 16384.
        num_predict : int, optional
            The number of tokens to predict. Defaults to no limit.

        Returns
        -------
        Iterator[StreamingResponse]
            An iterator of StreamingResponse objects containing response chunks.
        """
        # Extract parameters from kwargs with defaults
        model = kwargs.get('model')
        messages = kwargs.get('messages')
        object_model = kwargs.get('object_model', None)
        tools = kwargs.get('tools', None)
        config = kwargs.get('config', None)

        # Use config if provided, otherwise use individual kwargs
        if config:
            temperature = config.temperature
            num_ctx = config.num_ctx
            max_tokens = config.max_tokens
            num_predict = config.num_predict
            reasoning_effort = config.reasoning_effort
        else:
            temperature = kwargs.get('temperature', 1.0)
            num_ctx = kwargs.get('num_ctx', 32768)
            max_tokens = kwargs.get('max_tokens', 16384)
            num_predict = kwargs.get('num_predict', -1)
            reasoning_effort = None

        if not model:
            raise ValueError("'model' parameter is required")
        if not messages:
            raise ValueError("'messages' parameter is required")

        # Convert parameters to dict for processing
        args = {
            'model': model,
            'messages': messages,
            'object_model': object_model,
            'tools': tools,
            'temperature': temperature,
            'num_ctx': num_ctx,
            'max_tokens': max_tokens,
            'num_predict': num_predict,
            'reasoning_effort': reasoning_effort
        }

        # Adapt parameters based on model type
        try:
            adapted_args = self._adapt_parameters_for_model(model, args)
        except Exception as e:
            logger.error("Failed to adapt parameters for model",
                         model=model,
                         error=str(e))
            raise

        # Validate parameters after adaptation
        self._validate_model_parameters(model, adapted_args)

        # Check if model supports streaming
        capabilities = self.model_registry.get_model_capabilities(model)
        if not capabilities.supports_streaming:
            raise NotImplementedError(f"Model {model} does not support streaming")

        # Structured output doesn't work with streaming
        if adapted_args['object_model'] is not None:
            raise NotImplementedError("Streaming with structured output (object_model) is not supported")

        openai_args = {
            'model': adapted_args['model'],
            'messages': adapt_messages_to_openai(adapted_args['messages']),
            'stream': True,
        }

        # Add temperature if specified
        if 'temperature' in adapted_args:
            openai_args['temperature'] = adapted_args['temperature']

        if adapted_args.get('tools') is not None:
            openai_args['tools'] = [t.descriptor for t in adapted_args['tools']]

        # Handle both max_tokens (for chat models) and max_completion_tokens (for reasoning models)
        if 'max_tokens' in adapted_args:
            openai_args['max_tokens'] = adapted_args['max_tokens']
        elif 'max_completion_tokens' in adapted_args:
            openai_args['max_completion_tokens'] = adapted_args['max_completion_tokens']

        # Add reasoning_effort if present in adapted args
        if 'reasoning_effort' in adapted_args and adapted_args['reasoning_effort'] is not None:
            openai_args['reasoning_effort'] = adapted_args['reasoning_effort']

        logger.debug("Making OpenAI streaming API call",
                     model=openai_args['model'],
                     has_tools='tools' in openai_args,
                     has_reasoning_effort='reasoning_effort' in openai_args,
                     token_param='max_completion_tokens' if 'max_completion_tokens' in openai_args else 'max_tokens')

        try:
            stream = self.client.chat.completions.create(**openai_args)
        except BadRequestError as e:
            if "max_tokens" in str(e) and "max_completion_tokens" in str(e):
                logger.error(
                    "Parameter error detected - model may require different token parameter",
                    model=model,
                    error=str(e),
                    suggestion="This model may be a reasoning model requiring max_completion_tokens")
            raise e
        except Exception as e:
            logger.error("OpenAI streaming API call failed",
                         model=model,
                         error=str(e))
            raise e

        # Accumulate tool calls as they stream in
        # OpenAI streams tool arguments incrementally, indexed by tool call index
        tool_calls_accumulator: Dict[int, Dict] = {}

        for chunk in stream:
            if not chunk.choices:
                continue

            delta = chunk.choices[0].delta
            finish_reason = chunk.choices[0].finish_reason

            # Yield content chunks as they arrive
            if delta.content:
                yield StreamingResponse(content=delta.content)

            # Accumulate tool call chunks
            if delta.tool_calls:
                for tool_call_delta in delta.tool_calls:
                    index = tool_call_delta.index

                    # Initialize accumulator for this tool call if needed
                    if index not in tool_calls_accumulator:
                        tool_calls_accumulator[index] = {
                            'id': None,
                            'name': None,
                            'arguments': ''
                        }

                    # First chunk has id and name
                    if tool_call_delta.id:
                        tool_calls_accumulator[index]['id'] = tool_call_delta.id

                    if tool_call_delta.function.name:
                        tool_calls_accumulator[index]['name'] = tool_call_delta.function.name

                    # All chunks may have argument fragments
                    if tool_call_delta.function.arguments:
                        tool_calls_accumulator[index]['arguments'] += tool_call_delta.function.arguments

            # When stream is complete, yield accumulated tool calls
            if finish_reason == 'tool_calls' and tool_calls_accumulator:
                # Parse and yield complete tool calls
                complete_tool_calls = []
                for index in sorted(tool_calls_accumulator.keys()):
                    tc = tool_calls_accumulator[index]
                    try:
                        # Parse the accumulated JSON arguments
                        args_dict = json.loads(tc['arguments'])
                        # Convert to string values as per LLMToolCall format
                        arguments = {str(k): str(v) for k, v in args_dict.items()}

                        tool_call = LLMToolCall(
                            id=tc['id'],
                            name=tc['name'],
                            arguments=arguments
                        )
                        complete_tool_calls.append(tool_call)
                    except json.JSONDecodeError as e:
                        logger.error("Failed to parse tool call arguments",
                                     tool_name=tc['name'],
                                     arguments=tc['arguments'],
                                     error=str(e))

                if complete_tool_calls:
                    # Convert to the format expected by ollama's tool calls for compatibility
                    # We need to create mock objects that match ollama's structure
                    from types import SimpleNamespace
                    ollama_format_calls = []
                    for tc in complete_tool_calls:
                        ollama_format_calls.append(SimpleNamespace(
                            id=tc.id,  # Include ID for proper OpenAI message formatting
                            function=SimpleNamespace(
                                name=tc.name,
                                arguments=tc.arguments
                            )
                        ))
                    yield StreamingResponse(tool_calls=ollama_format_calls)

    def get_available_models(self) -> list[str]:
        """
        Get the list of available OpenAI models, sorted alphabetically.

        Returns
        -------
        list[str]
            The list of available models, sorted alphabetically.
        """
        return sorted([m.id for m in self.client.models.list()])

    def calculate_embeddings(self, text: str, model: str = "text-embedding-3-large") -> List[float]:
        """
        Calculate embeddings for the given text using the specified OpenAI model.

        Parameters
        ----------
        text : str
            The text to calculate embeddings for.
        model : str, optional
            The name of the OpenAI embeddings model to use. Defaults to "text-embedding-3-large".

        Returns
        -------
        list
            The embeddings for the text.
        """
        logger.debug("calculate_embeddings", text=text, model=model)

        embeddings = [self.client.embeddings.create(model=model, input=chunk).data[0].embedding
                      for chunk in self._chunked_tokens(text, 8191)]
        lengths = [len(embedding) for embedding in embeddings]

        average = np.average(embeddings, axis=0, weights=lengths)
        average = average / np.linalg.norm(average)
        average = average.tolist()

        return average

    def _batched(self, iterable: Iterable, n: int):
        """Batch data into tuples of length n. The last batch may be shorter."""
        # batched('ABCDEFG', 3) --> ABC DEF G
        if n < 1:
            raise ValueError('n must be at least one')
        it = iter(iterable)
        while batch := tuple(islice(it, n)):
            yield batch

    def _chunked_tokens(self, text, chunk_length):
        tokenizer = TokenizerGateway()
        tokens = tokenizer.encode(text)
        chunks_iterator = self._batched(tokens, chunk_length)
        yield from chunks_iterator

calculate_embeddings(text, model='text-embedding-3-large')

Calculate embeddings for the given text using the specified OpenAI model.

Parameters:

Name Type Description Default
text str

The text to calculate embeddings for.

required
model str

The name of the OpenAI embeddings model to use. Defaults to "text-embedding-3-large".

'text-embedding-3-large'

Returns:

Type Description
list

The embeddings for the text.

Source code in src/mojentic/llm/gateways/openai.py
def calculate_embeddings(self, text: str, model: str = "text-embedding-3-large") -> List[float]:
    """
    Calculate embeddings for the given text using the specified OpenAI model.

    Parameters
    ----------
    text : str
        The text to calculate embeddings for.
    model : str, optional
        The name of the OpenAI embeddings model to use. Defaults to "text-embedding-3-large".

    Returns
    -------
    list
        The embeddings for the text.
    """
    logger.debug("calculate_embeddings", text=text, model=model)

    embeddings = [self.client.embeddings.create(model=model, input=chunk).data[0].embedding
                  for chunk in self._chunked_tokens(text, 8191)]
    lengths = [len(embedding) for embedding in embeddings]

    average = np.average(embeddings, axis=0, weights=lengths)
    average = average / np.linalg.norm(average)
    average = average.tolist()

    return average

complete(**kwargs)

Complete the LLM request by delegating to the OpenAI service.

Keyword Arguments

model : str The name of the model to use. messages : List[LLMMessage] A list of messages to send to the LLM. object_model : Optional[Type[BaseModel]] The model to use for validating the response. tools : Optional[List[LLMTool]] A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output will be included in the response. temperature : float, optional The temperature to use for the response. Defaults to 1.0. num_ctx : int, optional The number of context tokens to use. Defaults to 32768. max_tokens : int, optional The maximum number of tokens to generate. Defaults to 16384. num_predict : int, optional The number of tokens to predict. Defaults to no limit.

Returns:

Type Description
LLMGatewayResponse

The response from the OpenAI service.

Source code in src/mojentic/llm/gateways/openai.py
def complete(self, **kwargs) -> LLMGatewayResponse:
    """
    Complete the LLM request by delegating to the OpenAI service.

    Keyword Arguments
    ----------------
    model : str
        The name of the model to use.
    messages : List[LLMMessage]
        A list of messages to send to the LLM.
    object_model : Optional[Type[BaseModel]]
        The model to use for validating the response.
    tools : Optional[List[LLMTool]]
        A list of tools to use with the LLM. If a tool call is requested, the tool will be called and the output
        will be included in the response.
    temperature : float, optional
        The temperature to use for the response. Defaults to 1.0.
    num_ctx : int, optional
        The number of context tokens to use. Defaults to 32768.
    max_tokens : int, optional
        The maximum number of tokens to generate. Defaults to 16384.
    num_predict : int, optional
        The number of tokens to predict. Defaults to no limit.

    Returns
    -------
    LLMGatewayResponse
        The response from the OpenAI service.
    """
    # Extract parameters from kwargs with defaults
    model = kwargs.get('model')
    messages = kwargs.get('messages')
    object_model = kwargs.get('object_model', None)
    tools = kwargs.get('tools', None)
    config = kwargs.get('config', None)

    # Use config if provided, otherwise use individual kwargs
    if config:
        temperature = config.temperature
        num_ctx = config.num_ctx
        max_tokens = config.max_tokens
        num_predict = config.num_predict
        reasoning_effort = config.reasoning_effort
    else:
        temperature = kwargs.get('temperature', 1.0)
        num_ctx = kwargs.get('num_ctx', 32768)
        max_tokens = kwargs.get('max_tokens', 16384)
        num_predict = kwargs.get('num_predict', -1)
        reasoning_effort = None

    if not model:
        raise ValueError("'model' parameter is required")
    if not messages:
        raise ValueError("'messages' parameter is required")

    # Convert parameters to dict for processing
    args = {
        'model': model,
        'messages': messages,
        'object_model': object_model,
        'tools': tools,
        'temperature': temperature,
        'num_ctx': num_ctx,
        'max_tokens': max_tokens,
        'num_predict': num_predict,
        'reasoning_effort': reasoning_effort
    }

    # Adapt parameters based on model type
    try:
        adapted_args = self._adapt_parameters_for_model(model, args)
    except Exception as e:
        logger.error("Failed to adapt parameters for model",
                     model=model,
                     error=str(e))
        raise

    # Validate parameters after adaptation
    self._validate_model_parameters(model, adapted_args)

    openai_args = {
        'model': adapted_args['model'],
        'messages': adapt_messages_to_openai(adapted_args['messages']),
    }

    # Add temperature if specified
    if 'temperature' in adapted_args:
        openai_args['temperature'] = adapted_args['temperature']

    completion = self.client.chat.completions.create

    if adapted_args['object_model'] is not None:
        completion = self.client.beta.chat.completions.parse
        openai_args['response_format'] = adapted_args['object_model']

    if adapted_args.get('tools') is not None:
        openai_args['tools'] = [t.descriptor for t in adapted_args['tools']]

    # Handle both max_tokens (for chat models) and max_completion_tokens (for reasoning models)
    if 'max_tokens' in adapted_args:
        openai_args['max_tokens'] = adapted_args['max_tokens']
    elif 'max_completion_tokens' in adapted_args:
        openai_args['max_completion_tokens'] = adapted_args['max_completion_tokens']

    # Add reasoning_effort if present in adapted args
    if 'reasoning_effort' in adapted_args and adapted_args['reasoning_effort'] is not None:
        openai_args['reasoning_effort'] = adapted_args['reasoning_effort']

    logger.debug("Making OpenAI API call",
                 model=openai_args['model'],
                 has_tools='tools' in openai_args,
                 has_object_model='response_format' in openai_args,
                 has_reasoning_effort='reasoning_effort' in openai_args,
                 token_param='max_completion_tokens' if 'max_completion_tokens' in openai_args else 'max_tokens')

    try:
        response = completion(**openai_args)
    except BadRequestError as e:
        # Enhanced error handling for parameter issues
        if "max_tokens" in str(e) and "max_completion_tokens" in str(e):
            logger.error(
                "Parameter error detected - model may require different token parameter",
                model=model,
                error=str(e),
                suggestion="This model may be a reasoning model requiring max_completion_tokens")
        raise e
    except Exception as e:
        logger.error("OpenAI API call failed",
                     model=model,
                     error=str(e))
        raise e

    object = None
    tool_calls: List[LLMToolCall] = []

    if adapted_args.get('object_model') is not None:
        try:
            response_content = response.choices[0].message.content
            if response_content is not None:
                object = adapted_args['object_model'].model_validate_json(response_content)
            else:
                logger.error(
                    "No response content available for object validation",
                    object_model=adapted_args['object_model'])
        except Exception as e:
            response_content = (response.choices[0].message.content
                                if response.choices else "No response content")
            logger.error("Failed to validate model",
                         error=str(e),
                         response=response_content,
                         object_model=adapted_args['object_model'])

    if response.choices[0].message.tool_calls is not None:
        for t in response.choices[0].message.tool_calls:
            arguments = {}
            args_dict = json.loads(t.function.arguments)
            for k in args_dict:
                arguments[str(k)] = str(args_dict[k])
            tool_call = LLMToolCall(id=t.id, name=t.function.name, arguments=arguments)
            tool_calls.append(tool_call)

    return LLMGatewayResponse(
        content=response.choices[0].message.content,
        object=object,
        tool_calls=tool_calls,
    )

complete_stream(**kwargs)

Stream the LLM response from OpenAI service.

OpenAI streams tool call arguments incrementally, so we need to accumulate them and yield complete tool calls only when the stream finishes.

Keyword Arguments

model : str The name of the model to use. messages : List[LLMMessage] A list of messages to send to the LLM. tools : Optional[List[LLMTool]] A list of tools to use with the LLM. Tool calls will be accumulated and yielded when complete. temperature : float, optional The temperature to use for the response. Defaults to 1.0. num_ctx : int, optional The number of context tokens to use. Defaults to 32768. max_tokens : int, optional The maximum number of tokens to generate. Defaults to 16384. num_predict : int, optional The number of tokens to predict. Defaults to no limit.

Returns:

Type Description
Iterator[StreamingResponse]

An iterator of StreamingResponse objects containing response chunks.

Source code in src/mojentic/llm/gateways/openai.py
def complete_stream(self, **kwargs) -> Iterator[StreamingResponse]:
    """
    Stream the LLM response from OpenAI service.

    OpenAI streams tool call arguments incrementally, so we need to accumulate them
    and yield complete tool calls only when the stream finishes.

    Keyword Arguments
    ----------------
    model : str
        The name of the model to use.
    messages : List[LLMMessage]
        A list of messages to send to the LLM.
    tools : Optional[List[LLMTool]]
        A list of tools to use with the LLM. Tool calls will be accumulated and yielded when complete.
    temperature : float, optional
        The temperature to use for the response. Defaults to 1.0.
    num_ctx : int, optional
        The number of context tokens to use. Defaults to 32768.
    max_tokens : int, optional
        The maximum number of tokens to generate. Defaults to 16384.
    num_predict : int, optional
        The number of tokens to predict. Defaults to no limit.

    Returns
    -------
    Iterator[StreamingResponse]
        An iterator of StreamingResponse objects containing response chunks.
    """
    # Extract parameters from kwargs with defaults
    model = kwargs.get('model')
    messages = kwargs.get('messages')
    object_model = kwargs.get('object_model', None)
    tools = kwargs.get('tools', None)
    config = kwargs.get('config', None)

    # Use config if provided, otherwise use individual kwargs
    if config:
        temperature = config.temperature
        num_ctx = config.num_ctx
        max_tokens = config.max_tokens
        num_predict = config.num_predict
        reasoning_effort = config.reasoning_effort
    else:
        temperature = kwargs.get('temperature', 1.0)
        num_ctx = kwargs.get('num_ctx', 32768)
        max_tokens = kwargs.get('max_tokens', 16384)
        num_predict = kwargs.get('num_predict', -1)
        reasoning_effort = None

    if not model:
        raise ValueError("'model' parameter is required")
    if not messages:
        raise ValueError("'messages' parameter is required")

    # Convert parameters to dict for processing
    args = {
        'model': model,
        'messages': messages,
        'object_model': object_model,
        'tools': tools,
        'temperature': temperature,
        'num_ctx': num_ctx,
        'max_tokens': max_tokens,
        'num_predict': num_predict,
        'reasoning_effort': reasoning_effort
    }

    # Adapt parameters based on model type
    try:
        adapted_args = self._adapt_parameters_for_model(model, args)
    except Exception as e:
        logger.error("Failed to adapt parameters for model",
                     model=model,
                     error=str(e))
        raise

    # Validate parameters after adaptation
    self._validate_model_parameters(model, adapted_args)

    # Check if model supports streaming
    capabilities = self.model_registry.get_model_capabilities(model)
    if not capabilities.supports_streaming:
        raise NotImplementedError(f"Model {model} does not support streaming")

    # Structured output doesn't work with streaming
    if adapted_args['object_model'] is not None:
        raise NotImplementedError("Streaming with structured output (object_model) is not supported")

    openai_args = {
        'model': adapted_args['model'],
        'messages': adapt_messages_to_openai(adapted_args['messages']),
        'stream': True,
    }

    # Add temperature if specified
    if 'temperature' in adapted_args:
        openai_args['temperature'] = adapted_args['temperature']

    if adapted_args.get('tools') is not None:
        openai_args['tools'] = [t.descriptor for t in adapted_args['tools']]

    # Handle both max_tokens (for chat models) and max_completion_tokens (for reasoning models)
    if 'max_tokens' in adapted_args:
        openai_args['max_tokens'] = adapted_args['max_tokens']
    elif 'max_completion_tokens' in adapted_args:
        openai_args['max_completion_tokens'] = adapted_args['max_completion_tokens']

    # Add reasoning_effort if present in adapted args
    if 'reasoning_effort' in adapted_args and adapted_args['reasoning_effort'] is not None:
        openai_args['reasoning_effort'] = adapted_args['reasoning_effort']

    logger.debug("Making OpenAI streaming API call",
                 model=openai_args['model'],
                 has_tools='tools' in openai_args,
                 has_reasoning_effort='reasoning_effort' in openai_args,
                 token_param='max_completion_tokens' if 'max_completion_tokens' in openai_args else 'max_tokens')

    try:
        stream = self.client.chat.completions.create(**openai_args)
    except BadRequestError as e:
        if "max_tokens" in str(e) and "max_completion_tokens" in str(e):
            logger.error(
                "Parameter error detected - model may require different token parameter",
                model=model,
                error=str(e),
                suggestion="This model may be a reasoning model requiring max_completion_tokens")
        raise e
    except Exception as e:
        logger.error("OpenAI streaming API call failed",
                     model=model,
                     error=str(e))
        raise e

    # Accumulate tool calls as they stream in
    # OpenAI streams tool arguments incrementally, indexed by tool call index
    tool_calls_accumulator: Dict[int, Dict] = {}

    for chunk in stream:
        if not chunk.choices:
            continue

        delta = chunk.choices[0].delta
        finish_reason = chunk.choices[0].finish_reason

        # Yield content chunks as they arrive
        if delta.content:
            yield StreamingResponse(content=delta.content)

        # Accumulate tool call chunks
        if delta.tool_calls:
            for tool_call_delta in delta.tool_calls:
                index = tool_call_delta.index

                # Initialize accumulator for this tool call if needed
                if index not in tool_calls_accumulator:
                    tool_calls_accumulator[index] = {
                        'id': None,
                        'name': None,
                        'arguments': ''
                    }

                # First chunk has id and name
                if tool_call_delta.id:
                    tool_calls_accumulator[index]['id'] = tool_call_delta.id

                if tool_call_delta.function.name:
                    tool_calls_accumulator[index]['name'] = tool_call_delta.function.name

                # All chunks may have argument fragments
                if tool_call_delta.function.arguments:
                    tool_calls_accumulator[index]['arguments'] += tool_call_delta.function.arguments

        # When stream is complete, yield accumulated tool calls
        if finish_reason == 'tool_calls' and tool_calls_accumulator:
            # Parse and yield complete tool calls
            complete_tool_calls = []
            for index in sorted(tool_calls_accumulator.keys()):
                tc = tool_calls_accumulator[index]
                try:
                    # Parse the accumulated JSON arguments
                    args_dict = json.loads(tc['arguments'])
                    # Convert to string values as per LLMToolCall format
                    arguments = {str(k): str(v) for k, v in args_dict.items()}

                    tool_call = LLMToolCall(
                        id=tc['id'],
                        name=tc['name'],
                        arguments=arguments
                    )
                    complete_tool_calls.append(tool_call)
                except json.JSONDecodeError as e:
                    logger.error("Failed to parse tool call arguments",
                                 tool_name=tc['name'],
                                 arguments=tc['arguments'],
                                 error=str(e))

            if complete_tool_calls:
                # Convert to the format expected by ollama's tool calls for compatibility
                # We need to create mock objects that match ollama's structure
                from types import SimpleNamespace
                ollama_format_calls = []
                for tc in complete_tool_calls:
                    ollama_format_calls.append(SimpleNamespace(
                        id=tc.id,  # Include ID for proper OpenAI message formatting
                        function=SimpleNamespace(
                            name=tc.name,
                            arguments=tc.arguments
                        )
                    ))
                yield StreamingResponse(tool_calls=ollama_format_calls)

get_available_models()

Get the list of available OpenAI models, sorted alphabetically.

Returns:

Type Description
list[str]

The list of available models, sorted alphabetically.

Source code in src/mojentic/llm/gateways/openai.py
def get_available_models(self) -> list[str]:
    """
    Get the list of available OpenAI models, sorted alphabetically.

    Returns
    -------
    list[str]
        The list of available models, sorted alphabetically.
    """
    return sorted([m.id for m in self.client.models.list()])

mojentic.llm.gateways.models.LLMMessage

Bases: BaseModel

A message to be sent to the LLM. These would accumulate during a chat session with an LLM.

Attributes:

Name Type Description

Parameters:

Name Type Description Default
role MessageRole
<MessageRole.User: 'user'>
content str | None
None
object BaseModel | None
None
tool_calls List[LLMToolCall] | None
None
image_paths List[str] | None
None
Source code in src/mojentic/llm/gateways/models.py
class LLMMessage(BaseModel):
    """
    A message to be sent to the LLM. These would accumulate during a chat session with an LLM.

    Attributes
    ----------
    role : MessageRole
        The role of the message in the conversation.
    content : Optional[str]
        The content of the message.
    object : Optional[BaseModel]
        The object representation of the message.
    tool_calls : Optional[List[LLMToolCall]]
        A list of tool calls to be made available to the LLM.
    image_paths : Optional[List[str]]
        A list of file paths to images to be included with the message.
        Note: You must use an image-capable model to process images.
    """
    role: MessageRole = MessageRole.User
    content: Optional[str] = None
    object: Optional[BaseModel] = None
    tool_calls: Optional[List[LLMToolCall]] = None
    image_paths: Optional[List[str]] = None

mojentic.llm.gateways.models.LLMToolCall

Bases: BaseModel

A tool call to be made available to the LLM.

Attributes:

Name Type Description

Parameters:

Name Type Description Default
id str | None
None
name str
required
arguments dict[str, str]
required
Source code in src/mojentic/llm/gateways/models.py
class LLMToolCall(BaseModel):
    """
    A tool call to be made available to the LLM.

    Attributes
    ----------
    id : Optional[str]
        The identifier of the tool call.
    name : str
        The name of the tool call.
    arguments : dict[str, str]
        The arguments for the tool call.
    """
    id: Optional[str] = None
    name: str
    arguments: dict[str, str]

mojentic.llm.gateways.models.LLMGatewayResponse

Bases: BaseModel

The response from the LLM gateway, abstracting you from the quirks of a specific LLM.

Attributes:

Name Type Description

Parameters:

Name Type Description Default
content str | dict[str, str] | None

The content of the response.

None
object BaseModel | None

Parsed response object

None
tool_calls List[LLMToolCall]

List of requested tool calls from the LLM.

<dynamic>
thinking str | None

Model thinking/reasoning trace (populated by some providers)

None
Source code in src/mojentic/llm/gateways/models.py
class LLMGatewayResponse(BaseModel):
    """
    The response from the LLM gateway, abstracting you from the quirks of a specific LLM.

    Attributes
    ----------
    content : Optional[Union[str, dict[str, str]]]
        The content of the response.
    object : Optional[BaseModel]
        Parsed response object.
    tool_calls : List[LLMToolCall]
        List of requested tool calls from the LLM.
    thinking : Optional[str]
        Model thinking/reasoning trace (populated by some providers).
    """
    content: Optional[Union[str, dict[str, str]]] = Field(None, description="The content of the response.")
    object: Optional[BaseModel] = Field(None, description="Parsed response object")
    tool_calls: List[LLMToolCall] = Field(default_factory=list,
                                          description="List of requested tool calls from the LLM.")
    thinking: Optional[str] = Field(None, description="Model thinking/reasoning trace (populated by some providers)")