-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmodal_extract_pg.py
More file actions
304 lines (245 loc) · 10.9 KB
/
modal_extract_pg.py
File metadata and controls
304 lines (245 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
Modal app for parallel insight extraction from Paul Graham essays.
Runs Qwen2.5-7B on multiple GPU instances for fast parallel processing.
Usage:
modal run insights_first/modal_extract_pg.py --db pg_essays.db
"""
import modal
# Define the Modal app
app = modal.App("pg-insight-extractor")
# Model to use - Qwen2.5-7B-Instruct is a good balance of quality/speed
MODEL_ID = "Qwen/Qwen2.5-7B-Instruct"
# Create a volume to cache the model weights
model_volume = modal.Volume.from_name("qwen-model-cache", create_if_missing=True)
# Define the container image with vLLM
vllm_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install(
"vllm>=0.6.0",
"torch",
"transformers",
"huggingface_hub",
)
)
# Adapted prompt for essays - more focused on ideas and philosophy than business tactics
EXTRACTION_PROMPT = """You are an insight extractor analyzing Paul Graham's essays. Your job is to identify genuinely novel, non-obvious insights — the kind that make a reader stop and think "I didn't know that" or "I never thought about it that way."
## What IS an insight (high bar):
An insight must be SPECIFIC + NON-OBVIOUS + THOUGHT-PROVOKING.
EXAMPLES OF GENUINE INSIGHTS:
- "The best ideas have an element of surprise because they challenge our model of the world — which means they were initially filtered out by our common sense" — Counterintuitive mechanism about innovation
- "Startups die from indigestion, not starvation — too many initiatives kill more companies than too few" — Memorable framing of non-obvious truth
- "Writing doesn't just communicate ideas, it generates them — the act of writing forces you to figure out what you actually think" — Deep insight about the writing process
## What is NOT an insight (even if it sounds good):
NOT INSIGHTS — too generic/obvious:
- "Work hard" — Everyone knows this
- "Focus on what matters" — Vague truism
- "Follow your passion" — Standard advice
- "Read more books" — No mechanism or specificity
NOT INSIGHTS — just descriptions:
- "Paul Graham started Y Combinator" — Biographical fact, not insight
- "Startups need funding" — Obvious statement
## Calibration Guide:
Ask yourself: "If I told this to a smart, intellectually curious person, would they:
- Already know this? → NO_INSIGHT
- Find it mildly interesting but obvious on reflection? → NO_INSIGHT
- Genuinely learn something or change how they think? → INSIGHT
## Instructions:
If the excerpt contains a genuine insight, respond with:
INSIGHT: [The core claim in 1-2 specific sentences. Must stand alone without context.]
NOVELTY: [1-10] How surprising is this to a well-read, intellectually curious person? (7+ = genuinely surprising)
SPECIFICITY: [1-10] How concrete and applicable is this? (7+ = clear mechanism or principle)
EVIDENCE_TYPE: [anecdote | data | framework | observation | contrarian_claim]
CONTEXT: [Brief note on why Paul Graham's perspective matters here - his experience founding startups, writing software, observing patterns at YC, etc.]
If NO genuine insight exists, respond ONLY with:
NO_INSIGHT
## Excerpt:
{chunk_text}
## Essay:
{context}"""
@app.cls(
gpu="A10G", # Good balance of cost/performance
image=vllm_image,
volumes={"/model-cache": model_volume},
timeout=600,
scaledown_window=300, # Keep warm for 5 min
)
class InsightExtractor:
@modal.enter()
def load_model(self):
"""Load the model when container starts."""
from vllm import LLM, SamplingParams
self.llm = LLM(
model=MODEL_ID,
download_dir="/model-cache",
trust_remote_code=True,
max_model_len=4096,
gpu_memory_utilization=0.9,
)
self.sampling_params = SamplingParams(
temperature=0.3,
max_tokens=400,
stop=["## Excerpt:", "## Essay:"],
)
@modal.method()
def extract_batch(self, chunks: list[dict]) -> list[dict]:
"""
Extract insights from a batch of chunks.
Args:
chunks: List of dicts with keys: chunk_id, document_id, content, title
Returns:
List of result dicts
"""
# Build prompts
prompts = []
for chunk in chunks:
prompt = EXTRACTION_PROMPT.format(
chunk_text=chunk["content"],
context=f"Paul Graham - {chunk['title']}"
)
prompts.append(prompt)
# Run batch inference
outputs = self.llm.generate(prompts, self.sampling_params)
# Parse results
results = []
for chunk, output in zip(chunks, outputs):
response_text = output.outputs[0].text.strip()
result = self._parse_response(response_text, chunk)
results.append(result)
return results
def _parse_response(self, response_text: str, chunk: dict) -> dict:
"""Parse LLM response into structured result."""
import re
result = {
"chunk_id": chunk["chunk_id"],
"document_id": chunk["document_id"],
"essay_title": chunk["title"],
"has_insight": False,
"insight": None,
"raw_response": response_text,
"error": None,
}
# Check for NO_INSIGHT anywhere in response
if "NO_INSIGHT" in response_text.upper():
return result
try:
insight_text = ""
novelty = 5
specificity = 5
evidence_type = "observation"
context = ""
for line in response_text.split("\n"):
line = line.strip()
if line.upper().startswith("INSIGHT:"):
insight_text = line[8:].strip()
elif line.upper().startswith("NOVELTY:"):
match = re.search(r'(\d+)', line)
if match:
novelty = int(match.group(1))
elif line.upper().startswith("SPECIFICITY:"):
match = re.search(r'(\d+)', line)
if match:
specificity = int(match.group(1))
elif line.upper().startswith("EVIDENCE_TYPE:"):
evidence_type = line[14:].strip().lower()
elif line.upper().startswith("CONTEXT:"):
context = line[8:].strip()
# Filter out template copies and low-quality extractions
is_template = "[The core claim" in insight_text or "[1-2 specific sentences" in insight_text
is_too_short = len(insight_text) < 20
if insight_text and not is_template and not is_too_short:
result["has_insight"] = True
result["insight"] = {
"chunk_id": chunk["chunk_id"],
"document_id": chunk["document_id"],
"essay_title": chunk["title"],
"essay_url": chunk.get("url", ""),
"insight_text": insight_text,
"novelty_score": novelty,
"specificity_score": specificity,
"evidence_type": evidence_type,
"context": context,
"raw_chunk": chunk["content"],
}
except Exception as e:
result["error"] = str(e)
return result
@app.local_entrypoint()
def main(db: str = "pg_essays.db", batch_size: int = 50, max_chunks: int = 0, concurrency: int = 10):
"""
Run parallel insight extraction on Paul Graham essays.
Args:
db: Path to SQLite database
batch_size: Chunks per batch (sent to single GPU)
max_chunks: Max chunks to process (0 = all)
concurrency: Number of parallel containers
"""
import sqlite3
import json
from datetime import datetime
# Load chunks from database
print(f"Loading chunks from {db}...")
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# For essays, we don't need to filter by timestamp or ads
cursor.execute("""
SELECT c.id as chunk_id, c.document_id, c.content,
d.title, d.url
FROM chunks c
JOIN documents d ON c.document_id = d.id
""")
chunks = [dict(row) for row in cursor.fetchall()]
conn.close()
if max_chunks > 0:
chunks = chunks[:max_chunks]
print(f"Loaded {len(chunks)} chunks from {len(set(c['document_id'] for c in chunks))} essays")
# Split into batches
batches = [chunks[i:i+batch_size] for i in range(0, len(chunks), batch_size)]
print(f"Split into {len(batches)} batches of ~{batch_size} chunks")
# Process in parallel
print(f"Processing with {concurrency} parallel containers...")
extractor = InsightExtractor()
import time
start_time = time.time()
# Use Modal's map for parallel execution
all_results = []
for batch_results in extractor.extract_batch.map(batches, order_outputs=False):
all_results.extend(batch_results)
insights_so_far = sum(1 for r in all_results if r["has_insight"])
print(f" Progress: {len(all_results)}/{len(chunks)} chunks, {insights_so_far} insights found")
elapsed = time.time() - start_time
# Summary
insights_found = sum(1 for r in all_results if r["has_insight"])
errors = sum(1 for r in all_results if r["error"])
print(f"\n{'='*50}")
print("EXTRACTION COMPLETE")
print(f"{'='*50}")
print(f"Total chunks: {len(all_results)}")
print(f"Insights found: {insights_found} ({insights_found/len(all_results)*100:.1f}%)")
print(f"Errors: {errors}")
print(f"Time: {elapsed:.1f}s ({elapsed/60:.1f} min)")
print(f"Speed: {len(all_results)/elapsed:.1f} chunks/sec")
# Save results
output_file = f"insights_first/data/pg_extraction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump({
"metadata": {
"timestamp": datetime.now().isoformat(),
"model": MODEL_ID,
"source": "Paul Graham Essays",
"total_chunks": len(all_results),
"elapsed_seconds": elapsed,
},
"results": all_results,
}, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to: {output_file}")
# Show sample insights
sample_insights = [r for r in all_results if r["has_insight"]][:5]
if sample_insights:
print(f"\n{'='*50}")
print("SAMPLE INSIGHTS")
print(f"{'='*50}")
for i, r in enumerate(sample_insights, 1):
ins = r["insight"]
print(f"\n{i}. [{ins['essay_title']}] (N={ins['novelty_score']}, S={ins['specificity_score']})")
print(f" {ins['insight_text'][:120]}...")