Strategia de paralelizare Am încercat pe cât posibil să am cât mai multă paralelizare la un moment de timp, adică folosind tehnica de la laborator cu împărtirea muncii pe un vector folosind formulele pentru start si end. O să explic in cele ce urmeaza etapele pipeline-ului de paralelizare.
Initial, thread-ul main parseaza argumentele, respectiv fisierele de intrare, initializand cativa vectori si alte structuri de date utile (ex: ConcurrentHashMap). Se citesc categoriile, limbile si cuvintele in engleza care trebuiesc evitate, in acelasi timp incarcandu-se vectorii care urmeaza sa fie paralelizati. Dupa aceste sarcini, thread-ul main lanseaza thread-urile worker si se pune in asteptare apeland join pentru fiecare.
Thread-urile urmeaza urmatoarea secventa de operatii: elimina articolele duplicate folosind 2 CHM-uri (pe post de Set, folosind ca valoare un camp Boolean – este acest articol unic sau nu?). Fiecare thread isi parseaza subsecventa lui de articole din vectorul de articole. Am ales sa folosesc GSON pentru parsare, intrucat am experienta cu el (l-am folosit la aplicatii in Kotlin) si este si foarte usor de configurat.
Fiecare thread isi creeaza in functia removeDuplicates() propria lui lista de articole. Aceasta lista de articole este apoi procesata in functia processArticles(). In aceasta functie se intampla mai multe procesari, printre care filtrarea articolelor duplicate, diverse procesari pentru statistici, organizarea pe limba, pe categorie.
Organizarea articolelor pe categorii – functia writeCategories() :
Pentru a rezolva acest task, am folosit un set de categorii valide. Daca un articol contine macar o categorie invalida, acesta nu va fi luat in considerare. Folosesc un CHM pentru a mapa category -> uuids , unde valoarea este keyset-ul unui CHM, pentru ca in acest set introduc mai multe thread-uri concurent uuid-uri. Imediat dupa processArticles() se asteapta la bariera, pentru ca toate thread-urile sa isi fi procesat datele despre categorii si limbi, si apoi se apeleaza functiile writeCategories() si writeLanguages(). Pentru a scrie fisierele de categorii in paralel folosesc
metoda cu start si end aplicata pe vectorul de categorii citit la inceputul programului. Se normalizeaza numele fisierului si apoi este scrisa toata lista de uuid-uri in fisierul corespunzator.
Fiecare thread lucreaza pe bucata lui de categorii.
Organizarea articolelor pe limbi – functia writeLanguages() :
In aceeasi maniera ca mai sus, paralelizand vectorul de limbi citit in main, iau set-ul de articole corespunzator unei limbi, il transform in lista si il sortez pentru a il scrie in fisierul aferent.
Generearea fisierului global cu articole – all_articles.txt:
Acest task presupune sortarea tuturor articolelor procesate dupa 2 criterii. Pentru a obtine un timp cat mai bun si a maximiza si paralelismul, am decis sa folosesc merge sort iterativ ca la laborator. Pentru a face asta, a trebuit sa calculez cate articole corespund fiecarui thread, ca apoi sa calculez offset-urile pentru fiecare thread in vectorul mare care urma a fi sortat cu merge sort. As fi putut evident sa fac vectorul care contine toate articolele folosind un singur thread si sa il sortez folosind Collections.sort() sau alta metoda standard de sortare, dar am vrut sa paralelizez si acest task. Fiecare thread si-a pus la pozitia indicata in vectorul global valorile, si-au sortat individual bucatile folosind Arrays.sort(), iar apoi dupa ce toate subsecventele au fost sortate, am folosit clasa implementata de mine: ParallelSort , o clasa generica intrucat fix acelasi tip de sortare am folosit si pentru sortarea pentru keywords_count. Pentur ca algoritmul nu trateaza cazul in care numarul de elemente nu este o putere a lui 2, o sa existe situatii in care, aplicand merge pe 2 bucati din vector, una sa existe, iar cealalta sa nu existe (merge intre (i, min (i + width, totalSize) si (mid, min (i + nextWidth, totalSize)). Pentru a corecta acest caz, copiez in vectorul destinatie tot ce a ramas in vectorul sursa.
Cuvinte de interes – keywords_count.txt:
Pentru a paraleliza acest task, a trebuit sa ii atribui fiecarui thread din cele P thread-uri, un set de cuvinte. M-am folosit de hashcode(word) % P, pentru a atribui cat mai uniform articolele ce contin anumite cuvinte thread-urilor. Am folosit fix aceeasi metoda de sortare ca mai sus.
Rapoarte statistice – reports.txt:
In principiu, in acest task, doar m-am folosit de datele deja agregate pe parcursul programului. Pentru best_author, m-am folosit de tehnica cu hashcode(word) % P pentru a distribui sarcina cat mai bine. In rest, am calculat cele mai bune valori ori din sortarile anterioare, ori fiecare thread si-a calculat pe parcurs cea mai buna valoare in functiile de procesare, iar la final thread-ul main a parcurs cele mai bune valori ale fiecarui thread, gasind cea mai buna valoare.
In design-ul meu nu am folosit deloc blocuri synchronized, ci m-am folosit cat mai mult de ConcurrentHashMap-uri pentru a maximiza paralelismul, folosind tehnicile mentionate mai sus pentru a distribui load-ul cat mai uniform. Singurele procesari secventiale, pe care le face thread- ul cu id-ul 0, sunt cele pentru statistici si cele pentru calcularea offset-urilor fiecarui thread in vectorii care trebuie sortati la final, dar acestea se fac in timp mic, intrucat este vorba de parcurgerea a P elemente.
** Analiza de performanță și scalabilitate ** : Setup de testare:
CPU: AMD Ryzen AI 9 HX 370 w/ Radeon 890M
CPU cores: 12
RAM: 16GB RAM
Sistem de operare: Am realizat testarea in WSL (Ubuntu), in /mnt, iar apoi in Ubuntu (/home/)
Versiunea de Java folosita: Java 25
Dimensiunea datasetului de test: test_4 din arhiva de testare (11031 articole).
Rezultate :
Am rulat acest benchmark folosindu-ma de scriptul de la laborator, modificat putin, pe dataset-ul test_4 din arhiva de testare. De asemenea, am realizat 2 testari, una in mediul WSL (/mnt/), alta in mediul Ubuntu (/home/).
| Nr. Threads | Timp 1 (s) | Timp 2 (s) | Timp 3 (s) | Timp Mediu (s) |
|---|---|---|---|---|
| 1 | 46.009 | 44.646 | 43.695 | 44.783 |
| 2 | 22.311 | 22.199 | 22.059 | 22.190 |
| 3 | 16.302 | 16.306 | 16.414 | 16.341 |
| 4 | 13.638 | 13.925 | 13.754 | 13.772 |
| 5 | 12.675 | 13.055 | 12.809 | 12.846 |
| 6 | 11.887 | 12.140 | 11.933 | 11.987 |
| 7 | 11.631 | 11.714 | 11.417 | 11.587 |
| 8 | 11.502 | 11.297 | 11.239 | 11.346 |
| 9 | 11.104 | 11.262 | 10.949 | 11.105 |
| 10 | 10.962 | 11.207 | 10.681 | 10.950 |
| 11 | 10.763 | 10.839 | 10.722 | 10.775 |
| 12 | 10.706 | 11.097 | 10.700 | 10.834 |
Graficele au fost facute in python.
Se poate observa ca timpii de executie scad pana la rularea cu 12 thread-uri. Motivul pentru care rularea cu 12 thread-uri este mai inceata decat cea cu 11 thread-uri este pentru ca sistemul meu are 12 core-uri fizice, iar ruland cu 12 thread-uri, unul dintre acestea ar fi facut context switch cu un alt thread care rula constant batandu-se pe procesor, intrucat mai aveam si alte procese pornite. Orice alt proces activ forteaza practic un context switch si acesta consuma multe cicluri de ceas. Castigul marginal este anulat de acest cost (tradeoff).
O parte din overhead se datoreaza si faptului ca implementarea mea a fost rulata intr-un mediu WSL, citirea si scrierea fisierelor sunt traduse de la kernel-ul de Linux la subsistemul de fisiere Windows, apoi scrise pe disc.
De asemenea, cu cat avem mai multe thread-uri accesand aceleasi structuri de date (ConcurrentHashMap-uri, vectori), apare tot mai frecvent cache contention si false sharing. Cache contention apare atunci cand mai multe thread-uri acceseaza aceeasi linie de cache (64 de bytes pe sistemul meu). Fiecare procesor isi tine o copie a liniei de cache. Cand unul a modificat-o pe a sa, trebuie sa faca broadcast pe magistrala de sistem. Practic apar foarte multe invalidari ale cache- ului.
Se poate observa ca pentru rularea cu 2 thread-uri, programul are o eficienta de 100.9%. Acest lucru este datorat mai multor factori, cum ar fi erorile de masurare, sarcina de pe procesor la 2 momente diferite de timp (alte procese care ruleaza in paralel). De asemenea, rularea cu un thread include un overhead foarte similar cu cel al rularii pentru 2 thread-uri.
Folosind legea lui Amdahl, S = 4.16 ruland cu 11 thread-uri => f = 16.44% procent de operatii care nu se pot executa in paralel. Aceste operatii sunt cele de la inceput pe care am decis sa le faca thread-ul main (citirile de fisiere, alocarile de structuri de date), lucrul facut de thread-ul cu id 0 pre mergesort, timpul petrecut de thread-uri la bariera si scrierea de la final in fisierele all_articles.txt, keywords_count.txt, care se face secvential. Consider ca numarul optim de thread- uri pentru sistemul meu este de 11. Totusi, de la 8 thread-uri in sus, castigurile nu sunt semnificative.
Pe Ubuntu, in afara subsistemului WSL, am obtinut urmatoarele date:
| Nr. Threads | Timp 1 (s) | Timp 2 (s) | Timp 3 (s) | Timp Mediu (s) |
|---|---|---|---|---|
| 1 | 13.231 | 12.645 | 12.709 | 12.862 |
| 2 | 7.082 | 7.084 | 7.223 | 7.130 |
| 3 | 5.181 | 5.432 | 5.603 | 5.405 |
| 4 | 4.806 | 5.005 | 5.329 | 5.047 |
| 5 | 4.640 | 4.766 | 4.703 | 4.703 |
| 6 | 4.462 | 4.337 | 4.813 | 4.537 |
| 7 | 5.095 | 5.383 | 4.375 | 4.951 |
| 8 | 4.285 | 4.376 | 4.271 | 4.311 |
| 9 | 4.215 | 4.400 | 4.699 | 4.438 |
| 10 | 4.292 | 4.191 | 4.523 | 4.335 |
| 11 | 4.216 | 4.259 | 4.426 | 4.300 |
| 12 | 4.730 | 4.455 | 4.348 | 4.511 |
Scalarea timpului de exeutie in mediul Windows/WSL a fost remarcabila si a continuat eficient pana la pragul fizic de 11 thread-uri. Motivul principal, cred eu, este ca programul a rulat, în acest mediu virtualizat, dominat de overhead-ul de IO și de traducerea apelurilor de sistem. Operatiile lente de traducere a apelurilor de sistem Linux->Windows si gestionarea stratificata a sistemului de fisiere aducea o fractiune dT (delta T) suficient de mare cat sa faca simtita prezenta unui numar mai mare de thread-uri. Acest overhead a fost paralelizat, pana cand s-a atins saturatia core-urilor fizice.
In alta ordine de idei, pe Ubuntu s-a atins rapid platoul la 5 thread-uri. Un motiv ar fi pentru ca acest overhead dintre Linux->Windows a disparut, acesta avand o proportie destul de mare in calcule. Adaugarea de thread-uri dupa acest prag nu a imbunatatit timpii de rulare, deoarece operatiile efective ale programului nu erau suficient de intensive.