Pipeline a microservizi Go: RSS -> SQLite queue -> LLM analyzer -> dispatcher.
rss-reader: poll feed RSS, deduplica item e pubblica payload raw inanalyzer_queue.job-analyzer: consumaanalyzer_queue, analizza con LLM locale (llama.cpp), salva payload analizzato e accoda sudispatch_queue.scrapling-sidecar: endpoint HTTP (/extract) per source-enrichment avanzato delle pagine remote quandoANALYZER_SOURCE_EXTRACTORusascrapling/hybrid.message-dispatcher: consumadispatch_queue, renderizza template markdown e invia a file o Telegram.web-admin: UI per feed management, monitor code, runtime analyzer, requeue item e ispezione payload analizzati.test-rss: mini-sito di test che espone feed RSS (/feed.rss) e pagine annuncio locali (/jobs/...) generate a intervallo configurabile.otel-collector: riceve metriche/log/tracce via OTLP da tutti i componenti e inoltra verso backend osservabilità.prometheus,loki,tempo: backend locali per metriche, log e tracce.grafana: dashboard e correlazione tra metriche/log/tracce.
- Queue raw:
analyzer_queue(QUEUED -> LEASED -> DONE). - Queue analyzed:
dispatch_queue(QUEUED -> LEASED -> DONE). - Stato business item:
rss_items.status(NEW,ANALYZED,DISPATCHED,FAILED). - Le code sono implementate via SQLite condiviso, senza broker esterno.
Per i diagrammi completi delle state machine/lifecycle:
cp .env.example .env
cp .env.secrets.example .env.secrets
docker compose --profile dev up --buildOutput file sink di default:
/data/outbox/messages.mdnel volumerwct-data.
Profilo produzione (llama.cpp):
docker compose --profile prod up --buildotel-collector viene avviato sempre (anche senza obs) e inoltra i segnali OTLP a Grafana Cloud.
Stack observability locale opzionale (profilo separato):
docker compose --profile obs up -dPer avviare tutto insieme (prod + observability):
docker compose --profile prod --profile obs up --buildEndpoint observability locale (solo profilo obs):
- Grafana:
http://localhost:${GRAFANA_PORT:-3000}(defaultadmin/admin) - Prometheus:
http://localhost:9090 - Loki:
http://localhost:3100 - Tempo:
http://localhost:3200
LLM_MODEL,LLM_ENDPOINT,LLM_TIMEOUT,LLM_MAX_TOKENSLLM_TIMEOUT_MAX: timeout massimo hard per richiesta LLM.LLM_TIMEOUT_PER_1K_CHARS: budget extra per prompt lunghi (ogni ~1000 caratteri).LLM_TIMEOUT_PER_256_TOKENS: budget extra per risposte lunghe (ogni 256max_tokensrichiesti).LLM_THINKING_ENABLED=true|false: inoltraenable_thinkingalla/v1/chat/completionsquando supportato.ANALYZER_LLM_STRICT_JSON=true|false(defaulttrue): forza structured output con JSON Schema (response_format/json_schema) verso endpoint OpenAI-compatible (es.llama.cpp).ANALYZER_PROMPT_TEMPLATE: prompt principale custom (Gotext/template) con variabiliAllowedCategories,Title,URL,Description,SourcePageText,LinksCSV,ImagesCSV.ANALYZER_COMPACT_PROMPT_TEMPLATE: prompt fallback compatto custom con le stesse variabili.ANALYZER_MAX_PARALLEL_JOBS(default1): massimo numero di job in parallelo per singolo processojob-analyzer(attualmente forzato a1in strict sequential mode).ANALYZER_SOURCE_EXTRACTOR=off|basic|scrapling|hybrid(defaultbasic): strategia di source-enrichment della pagina linkata nel job.ANALYZER_SOURCE_MIN_CHARS_FOR_BASIC(default220): soglia minima testo per attivare fallback inhybrid.ANALYZER_SCRAPLING_ENDPOINT(defaulthttp://scrapling-sidecar:8088): endpoint sidecar Scrapling.ANALYZER_SCRAPLING_TIMEOUT(default8s): timeout chiamata Scrapling.ANALYZER_SCRAPLING_MAX_CHARS(default10000): clamp testo estratto da Scrapling.ANALYZER_MAX_DELIVERY_ATTEMPTS(default3): soglia anti-poison item suanalyzer_queue.LLM_PARALLEL_THREADS(default-1): numero thread CPU perllama.cpp(-t, auto-detect quando-1).LLAMA_ENABLE_METRICS=true|false(defaulttrue): abilita endpoint Prometheus/metricsdillama.cpp(--metrics).
Tutti i componenti (rss-reader, job-analyzer, message-dispatcher, web-admin, test-rss, llm-mock, scrapling-sidecar) sono instrumentati con OpenTelemetry SDK e inviano segnali all'OTel Collector locale.
Variabili principali:
OTEL_EXPORTER_OTLP_ENDPOINT(defaultotel-collector:4317)OTEL_EXPORTER_OTLP_INSECURE(defaulttrue)SERVICE_VERSION(defaultdev)DEPLOY_ENV(defaultlocal)
Forwarding Grafana Cloud (collector):
- endpoint OTLP:
https://otlp-gateway-prod-eu-west-0.grafana.net/otlp - credenziali in
.env.secrets:GRAFANA_INSTANCE_IDGRAFANA_TOKEN
- metriche host abilitate via receiver
hostmetrics(CPU, memoria, disco/filesystem, rete, paging, process count). - metriche
llama.cppcollezionate via receiver Prometheus del collector (job-analyzer:8080/metrics,llama-cpp:8080/metricsquando disponibile).
Metriche pipeline FSM esportate (via OTel Collector):
rwct_pipeline_queue_items{queue,pipeline_stage,state}: elementi nelle code SQLite (analyzer_queue/dispatch_queue) per stato (QUEUED,LEASED,DONE).rwct_pipeline_items_by_status{status}: elementirss_itemsper stato business (NEW,ANALYZED,DISPATCHED,FAILED).rwct_pipeline_items_by_queue_state{queue_state}: stato derivato usato nel monitor (not_enqueued_raw,queued_raw,inflight_raw,processed_raw,not_enqueued_analyzed,queued_analyzed,dispatched,failed,unknown).
Metriche GenAI (analyzer):
gen_ai.client.operation.duration{gen_ai.operation.name,gen_ai.provider.name,gen_ai.request.model,error.type}.gen_ai.client.token.usage{gen_ai.operation.name,gen_ai.provider.name,gen_ai.request.model,gen_ai.token.type}.
Span GenAI (analyzer):
- span client LLM con attributi
gen_ai.request.*,gen_ai.response.*,gen_ai.usage.*,error.typeeserver.*.
Dashboard provisionata automaticamente:
RWCT Observability Overview(Grafana folderRWCT)- file:
observability/grafana/dashboards/rwct-overview.json RWCT Pipeline FSM(Grafana folderRWCT)- file:
observability/grafana/dashboards/rwct-pipeline-fsm.json
Per usare Telegram impostare:
DESTINATION_MODE=telegramTELEGRAM_BOT_TOKEN(in.env.secrets)TELEGRAM_CHAT_ID(in.env.secrets)TELEGRAM_TEMPLATE_FILE(es./app/configs/message.telegram.tmpl.md)
Opzionali:
TELEGRAM_THREAD_IDTELEGRAM_PARSE_MODETELEGRAM_DISABLE_WEB_PAGE_PREVIEW
Template:
DISPATCH_TEMPLATE_FILE: template Gotext/templateusato per destinazioni non-Telegram.TELEGRAM_TEMPLATE_FILE: template Gotext/templatededicato a Telegram (fallback suDISPATCH_TEMPLATE_FILEse non impostato).
- URL:
http://localhost:8090(WEB_ADMIN_PORTper override) - Funzioni principali:
- layout con sidebar sinistra collassabile e registry viste estendibile.
- vista
Feeds & Items: gestione feed (add/remove/enable/disable/force poll) e azioni item. - vista
Pipeline Board: lane Kanban FIFO perraw_backlog,raw_inflight,analyzed_backlog,analyzed_inflight,failed,anomalies. - toggle auto-refresh board (default 5s).
- monitor queue e pipeline con metriche di stuck detector.
- pannello analyzer runtime (modello attivo, thinking, timeout, max tokens, rate).
- requeue intelligente:
- se
analyzed_payload_jsonesiste: enqueue indispatch_queue(mode=analyzed). - se manca: enqueue in
analyzer_queue(mode=raw). - visualizzazione JSON analizzato.
go test ./...