forked from ultraworkers/claw-code
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery_engine.py
More file actions
193 lines (173 loc) · 7.58 KB
/
query_engine.py
File metadata and controls
193 lines (173 loc) · 7.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from __future__ import annotations
import json
from dataclasses import dataclass, field
from uuid import uuid4
from .commands import build_command_backlog
from .models import PermissionDenial, UsageSummary
from .port_manifest import PortManifest, build_port_manifest
from .session_store import StoredSession, load_session, save_session
from .tools import build_tool_backlog
from .transcript import TranscriptStore
@dataclass(frozen=True)
class QueryEngineConfig:
max_turns: int = 8
max_budget_tokens: int = 2000
compact_after_turns: int = 12
structured_output: bool = False
structured_retry_limit: int = 2
@dataclass(frozen=True)
class TurnResult:
prompt: str
output: str
matched_commands: tuple[str, ...]
matched_tools: tuple[str, ...]
permission_denials: tuple[PermissionDenial, ...]
usage: UsageSummary
stop_reason: str
@dataclass
class QueryEnginePort:
manifest: PortManifest
config: QueryEngineConfig = field(default_factory=QueryEngineConfig)
session_id: str = field(default_factory=lambda: uuid4().hex)
mutable_messages: list[str] = field(default_factory=list)
permission_denials: list[PermissionDenial] = field(default_factory=list)
total_usage: UsageSummary = field(default_factory=UsageSummary)
transcript_store: TranscriptStore = field(default_factory=TranscriptStore)
@classmethod
def from_workspace(cls) -> 'QueryEnginePort':
return cls(manifest=build_port_manifest())
@classmethod
def from_saved_session(cls, session_id: str) -> 'QueryEnginePort':
stored = load_session(session_id)
transcript = TranscriptStore(entries=list(stored.messages), flushed=True)
return cls(
manifest=build_port_manifest(),
session_id=stored.session_id,
mutable_messages=list(stored.messages),
total_usage=UsageSummary(stored.input_tokens, stored.output_tokens),
transcript_store=transcript,
)
def submit_message(
self,
prompt: str,
matched_commands: tuple[str, ...] = (),
matched_tools: tuple[str, ...] = (),
denied_tools: tuple[PermissionDenial, ...] = (),
) -> TurnResult:
if len(self.mutable_messages) >= self.config.max_turns:
output = f'Max turns reached before processing prompt: {prompt}'
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage,
stop_reason='max_turns_reached',
)
summary_lines = [
f'Prompt: {prompt}',
f'Matched commands: {", ".join(matched_commands) if matched_commands else "none"}',
f'Matched tools: {", ".join(matched_tools) if matched_tools else "none"}',
f'Permission denials: {len(denied_tools)}',
]
output = self._format_output(summary_lines)
projected_usage = self.total_usage.add_turn(prompt, output)
stop_reason = 'completed'
if projected_usage.input_tokens + projected_usage.output_tokens > self.config.max_budget_tokens:
stop_reason = 'max_budget_reached'
self.mutable_messages.append(prompt)
self.transcript_store.append(prompt)
self.permission_denials.extend(denied_tools)
self.total_usage = projected_usage
self.compact_messages_if_needed()
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage,
stop_reason=stop_reason,
)
def stream_submit_message(
self,
prompt: str,
matched_commands: tuple[str, ...] = (),
matched_tools: tuple[str, ...] = (),
denied_tools: tuple[PermissionDenial, ...] = (),
):
yield {'type': 'message_start', 'session_id': self.session_id, 'prompt': prompt}
if matched_commands:
yield {'type': 'command_match', 'commands': matched_commands}
if matched_tools:
yield {'type': 'tool_match', 'tools': matched_tools}
if denied_tools:
yield {'type': 'permission_denial', 'denials': [denial.tool_name for denial in denied_tools]}
result = self.submit_message(prompt, matched_commands, matched_tools, denied_tools)
yield {'type': 'message_delta', 'text': result.output}
yield {
'type': 'message_stop',
'usage': {'input_tokens': result.usage.input_tokens, 'output_tokens': result.usage.output_tokens},
'stop_reason': result.stop_reason,
'transcript_size': len(self.transcript_store.entries),
}
def compact_messages_if_needed(self) -> None:
if len(self.mutable_messages) > self.config.compact_after_turns:
self.mutable_messages[:] = self.mutable_messages[-self.config.compact_after_turns :]
self.transcript_store.compact(self.config.compact_after_turns)
def replay_user_messages(self) -> tuple[str, ...]:
return self.transcript_store.replay()
def flush_transcript(self) -> None:
self.transcript_store.flush()
def persist_session(self) -> str:
self.flush_transcript()
path = save_session(
StoredSession(
session_id=self.session_id,
messages=tuple(self.mutable_messages),
input_tokens=self.total_usage.input_tokens,
output_tokens=self.total_usage.output_tokens,
)
)
return str(path)
def _format_output(self, summary_lines: list[str]) -> str:
if self.config.structured_output:
payload = {
'summary': summary_lines,
'session_id': self.session_id,
}
return self._render_structured_output(payload)
return '\n'.join(summary_lines)
def _render_structured_output(self, payload: dict[str, object]) -> str:
last_error: Exception | None = None
for _ in range(self.config.structured_retry_limit):
try:
return json.dumps(payload, indent=2)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive branch
last_error = exc
payload = {'summary': ['structured output retry'], 'session_id': self.session_id}
raise RuntimeError('structured output rendering failed') from last_error
def render_summary(self) -> str:
command_backlog = build_command_backlog()
tool_backlog = build_tool_backlog()
sections = [
'# Python Porting Workspace Summary',
'',
self.manifest.to_markdown(),
'',
f'Command surface: {len(command_backlog.modules)} mirrored entries',
*command_backlog.summary_lines()[:10],
'',
f'Tool surface: {len(tool_backlog.modules)} mirrored entries',
*tool_backlog.summary_lines()[:10],
'',
f'Session id: {self.session_id}',
f'Conversation turns stored: {len(self.mutable_messages)}',
f'Permission denials tracked: {len(self.permission_denials)}',
f'Usage totals: in={self.total_usage.input_tokens} out={self.total_usage.output_tokens}',
f'Max turns: {self.config.max_turns}',
f'Max budget tokens: {self.config.max_budget_tokens}',
f'Transcript flushed: {self.transcript_store.flushed}',
]
return '\n'.join(sections)