-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmodal_extract.py
More file actions
339 lines (275 loc) · 12.1 KB
/
modal_extract.py
File metadata and controls
339 lines (275 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
Modal app for parallel insight extraction using vLLM.
Runs Qwen2.5-7B on multiple GPU instances for fast parallel processing.
Usage:
modal run insights_first/modal_extract.py --db lennys_full.db
"""
import modal
# Define the Modal app
app = modal.App("insight-extractor")
# Model to use - Qwen2.5-7B-Instruct is a good balance of quality/speed
MODEL_ID = "Qwen/Qwen2.5-7B-Instruct"
# Create a volume to cache the model weights
model_volume = modal.Volume.from_name("qwen-model-cache", create_if_missing=True)
# Define the container image with vLLM
vllm_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install(
"vllm>=0.6.0",
"torch",
"transformers",
"huggingface_hub",
)
)
EXTRACTION_PROMPT = """You are an insight extractor analyzing podcast transcripts. Your job is to identify genuinely novel, non-obvious insights — the kind that make a reader stop and think "I didn't know that" or "I never thought about it that way."
## What IS an insight (high bar):
An insight must be SPECIFIC + NON-OBVIOUS + ACTIONABLE/PERSPECTIVE-SHIFTING.
EXAMPLES OF GENUINE INSIGHTS:
- "Companies should fire their best customers first when pivoting, because they'll hold you back with feature requests for the old product" — Specific, counterintuitive, actionable
- "The best PMs spend 80% of their time on problems they're NOT going to solve, because rigorous prioritization means saying no" — Counterintuitive mechanism
- "Startups die from indigestion, not starvation — too many initiatives kill more companies than too few" — Memorable framing of non-obvious truth
## What is NOT an insight (even if it sounds good):
NOT INSIGHTS — too generic/obvious:
- "Hire great people" — Everyone knows this
- "Focus on what matters" — Vague truism
- "Listen to your customers" — Standard advice
- "Culture is important" — No mechanism or specificity
NOT INSIGHTS — just descriptions or process:
- "We did X, then Y, then Z" — Story without extractable lesson
- "At Company X, we had this process..." — Process description, not insight
## Calibration Guide:
Ask yourself: "If I told this to a smart, well-read business person, would they:
- Already know this? → NO_INSIGHT
- Find it mildly interesting but obvious on reflection? → NO_INSIGHT
- Genuinely learn something or change how they think? → INSIGHT
## Instructions:
If the excerpt contains a genuine insight, respond with:
INSIGHT: [The core claim in 1-2 specific sentences. Must stand alone without context.]
NOVELTY: [1-10] How surprising is this to a well-read business person? (7+ = genuinely surprising)
SPECIFICITY: [1-10] How concrete and actionable is this? (7+ = clear mechanism or action)
EVIDENCE_TYPE: [anecdote | data | framework | observation | contrarian_claim]
SPEAKER_CREDIBILITY: [Why this person's view matters on this topic]
If NO genuine insight exists, respond ONLY with:
NO_INSIGHT
## Excerpt:
{chunk_text}
## Context:
Speaker/Episode: {context}"""
@app.cls(
gpu="A10G", # Good balance of cost/performance
image=vllm_image,
volumes={"/model-cache": model_volume},
timeout=600,
scaledown_window=300, # Keep warm for 5 min
)
class InsightExtractor:
@modal.enter()
def load_model(self):
"""Load the model when container starts."""
from vllm import LLM, SamplingParams
self.llm = LLM(
model=MODEL_ID,
download_dir="/model-cache",
trust_remote_code=True,
max_model_len=4096,
gpu_memory_utilization=0.9,
)
self.sampling_params = SamplingParams(
temperature=0.3,
max_tokens=400,
stop=["## Excerpt:", "## Context:"],
)
@modal.method()
def extract_batch(self, chunks: list[dict]) -> list[dict]:
"""
Extract insights from a batch of chunks.
Args:
chunks: List of dicts with keys: chunk_id, document_id, content, title
Returns:
List of result dicts
"""
# Build prompts
prompts = []
for chunk in chunks:
prompt = EXTRACTION_PROMPT.format(
chunk_text=chunk["content"],
context=chunk["title"]
)
prompts.append(prompt)
# Run batch inference
outputs = self.llm.generate(prompts, self.sampling_params)
# Parse results
results = []
for chunk, output in zip(chunks, outputs):
response_text = output.outputs[0].text.strip()
result = self._parse_response(response_text, chunk)
results.append(result)
return results
def _parse_response(self, response_text: str, chunk: dict) -> dict:
"""Parse LLM response into structured result."""
import re
result = {
"chunk_id": chunk["chunk_id"],
"document_id": chunk["document_id"],
"episode_title": chunk["title"],
"has_insight": False,
"insight": None,
"raw_response": response_text,
"error": None,
}
# Check for NO_INSIGHT anywhere in response
if "NO_INSIGHT" in response_text.upper():
return result
try:
insight_text = ""
novelty = 5
specificity = 5
evidence_type = "observation"
speaker_credibility = ""
for line in response_text.split("\n"):
line = line.strip()
if line.upper().startswith("INSIGHT:"):
insight_text = line[8:].strip()
elif line.upper().startswith("NOVELTY:"):
match = re.search(r'(\d+)', line)
if match:
novelty = int(match.group(1))
elif line.upper().startswith("SPECIFICITY:"):
match = re.search(r'(\d+)', line)
if match:
specificity = int(match.group(1))
elif line.upper().startswith("EVIDENCE_TYPE:"):
evidence_type = line[14:].strip().lower()
elif line.upper().startswith("SPEAKER_CREDIBILITY:"):
speaker_credibility = line[20:].strip()
# Filter out template copies and low-quality extractions
is_template = "[The core claim" in insight_text or "[1-2 specific sentences" in insight_text
is_too_short = len(insight_text) < 20
if insight_text and not is_template and not is_too_short:
# Build timestamp URL if video_url and timestamp are available
video_url = chunk.get("video_url", "")
timestamp_start = chunk.get("timestamp_start", "")
timestamp_url = self._build_timestamp_url(video_url, timestamp_start)
result["has_insight"] = True
result["insight"] = {
"chunk_id": chunk["chunk_id"],
"document_id": chunk["document_id"],
"episode_title": chunk["title"],
"insight_text": insight_text,
"novelty_score": novelty,
"specificity_score": specificity,
"evidence_type": evidence_type,
"speaker_credibility": speaker_credibility,
"raw_chunk": chunk["content"],
"video_url": video_url,
"timestamp_start": timestamp_start,
"timestamp_url": timestamp_url,
}
except Exception as e:
result["error"] = str(e)
return result
def _build_timestamp_url(self, video_url: str, timestamp: str) -> str:
"""Build a YouTube URL with timestamp parameter."""
if not video_url or not timestamp:
return video_url or ""
# Convert HH:MM:SS to seconds
parts = timestamp.split(':')
if len(parts) == 3:
hours, minutes, seconds = map(int, parts)
total_seconds = hours * 3600 + minutes * 60 + seconds
elif len(parts) == 2:
minutes, seconds = map(int, parts)
total_seconds = minutes * 60 + seconds
else:
return video_url
if total_seconds == 0:
return video_url
# Add timestamp parameter
if '?' in video_url:
return f"{video_url}&t={total_seconds}"
else:
return f"{video_url}?t={total_seconds}"
@app.local_entrypoint()
def main(db: str = "lennys_full.db", batch_size: int = 50, max_chunks: int = 0, concurrency: int = 10):
"""
Run parallel insight extraction.
Args:
db: Path to SQLite database
batch_size: Chunks per batch (sent to single GPU)
max_chunks: Max chunks to process (0 = all)
concurrency: Number of parallel containers
"""
import sqlite3
import json
from datetime import datetime
# Load chunks from database
print(f"Loading chunks from {db}...")
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT c.id as chunk_id, c.document_id, c.content, c.timestamp_start,
d.title, d.video_url
FROM chunks c
JOIN documents d ON c.document_id = d.id
WHERE c.timestamp_start > '00:01:30'
AND c.content NOT LIKE '%brought to you by%'
AND c.content NOT LIKE '%sponsored by%'
AND c.content NOT LIKE '%eppo.com%'
AND c.content NOT LIKE '%vanta.com%'
AND c.content NOT LIKE '%workos%'
""")
chunks = [dict(row) for row in cursor.fetchall()]
conn.close()
if max_chunks > 0:
chunks = chunks[:max_chunks]
print(f"Loaded {len(chunks)} chunks")
# Split into batches
batches = [chunks[i:i+batch_size] for i in range(0, len(chunks), batch_size)]
print(f"Split into {len(batches)} batches of ~{batch_size} chunks")
# Process in parallel
print(f"Processing with {concurrency} parallel containers...")
extractor = InsightExtractor()
import time
start_time = time.time()
# Use Modal's map for parallel execution
all_results = []
for batch_results in extractor.extract_batch.map(batches, order_outputs=False):
all_results.extend(batch_results)
insights_so_far = sum(1 for r in all_results if r["has_insight"])
print(f" Progress: {len(all_results)}/{len(chunks)} chunks, {insights_so_far} insights found")
elapsed = time.time() - start_time
# Summary
insights_found = sum(1 for r in all_results if r["has_insight"])
errors = sum(1 for r in all_results if r["error"])
print(f"\n{'='*50}")
print("EXTRACTION COMPLETE")
print(f"{'='*50}")
print(f"Total chunks: {len(all_results)}")
print(f"Insights found: {insights_found} ({insights_found/len(all_results)*100:.1f}%)")
print(f"Errors: {errors}")
print(f"Time: {elapsed:.1f}s ({elapsed/60:.1f} min)")
print(f"Speed: {len(all_results)/elapsed:.1f} chunks/sec")
# Save results
output_file = f"insights_first/data/modal_extraction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump({
"metadata": {
"timestamp": datetime.now().isoformat(),
"model": MODEL_ID,
"total_chunks": len(all_results),
"elapsed_seconds": elapsed,
},
"results": all_results,
}, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to: {output_file}")
# Show sample insights
sample_insights = [r for r in all_results if r["has_insight"]][:5]
if sample_insights:
print(f"\n{'='*50}")
print("SAMPLE INSIGHTS")
print(f"{'='*50}")
for i, r in enumerate(sample_insights, 1):
ins = r["insight"]
print(f"\n{i}. [N={ins['novelty_score']}, S={ins['specificity_score']}]")
print(f" {ins['insight_text'][:100]}...")