@@ -137,21 +137,26 @@ int refseqCount=0;
137137
138138#ifndef NOTHREADS
139139
140- GFastMutex dataMutex; // to manage availability of data records ready to be loaded by main thread
140+ GFastMutex dataMutex; // manage availability of data records ready to be loaded by main thread
141141GVec<int > dataClear; // indexes of data bundles cleared for loading by main thread (clear data pool)
142142
143- GFastMutex waitMutex; // for main program to make sure there are threads ready/waiting
143+ GFastMutex waitMutex; // controls threadsWaiting (idle threads counter)
144144int threadsWaiting=0 ; // how many worker threads are waiting
145145
146+ GMutex queueMutex; // controls bundleQueue and bundleWork access
147+ int bundleWork=1 ; // bit 0 set if bundles are still being prepared (BAM file not exhausted yet)
148+ // bit 1 set if there are Bundles ready in the queue
149+ GConditionVar haveBundles; // will notify all threads when bundles are pushed in the ready queue
150+ // or no more bundles are coming
151+
152+
153+
146154GFastMutex printMutex; // for writing the output to file
155+
147156GFastMutex logMutex; // only when verbose - to avoid mangling the log output
148- GMutex queueMutex; // whenever bundleQueue is updated
157+
149158GFastMutex bamReadingMutex;
150- GConditionVar haveBundles; // will notify all threads when bundles are pushed in the ready queue
151- // or no more bundles are coming
152159
153- int bundleWork=1 ; // bit 0 set if bundles are still being prepared (BAM file not exhausted yet)
154- // bit 1 set if there are Bundles ready in the queue
155160#endif
156161
157162bool NoMoreBundles=false ;
@@ -285,9 +290,13 @@ const char* ERR_BAM_SORT="\nError: the input alignment file is not sorted!\n";
285290if (ballgown)
286291 Ballgown_setupFiles (f_tdata, f_edata, f_idata, f_e2t, f_i2t);
287292#ifndef NOTHREADS
288- GThread* threads=new GThread[num_cpus];
289- GPVec<BundleData> bundleQueue (false );
290- BundleData* bundles=new BundleData[num_cpus+1 ]; // extra one being prepared while all others are processed
293+ GThread* threads=new GThread[num_cpus]; // bundle processing threads
294+
295+ GPVec<BundleData> bundleQueue (false ); // queue of loaded bundles
296+
297+ BundleData* bundles=new BundleData[num_cpus+1 ];
298+ // bundles[1..num_cpus] are processed by threads, bundles[0] is being prepared
299+
291300 dataClear.setCapacity (num_cpus+1 );
292301 for (int b=0 ;b<num_cpus;b++) {
293302 threads[b].kickStart (workerThread, (void *) &bundleQueue);
@@ -366,20 +375,28 @@ if (ballgown)
366375 }*/
367376 bundle->getReady (currentstart, currentend);
368377#ifndef NOTHREADS
369- // push this in the bundle queue, where it'll be picked up by the threads
378+ // push this in the bundle queue where it'll be picked up by the threads
370379 DBGPRINT2 (" ##> Locking queueMutex to push loaded bundle into the queue (bundle.start=%d)\n " , bundle->start );
380+ int qCount=0 ;
371381 queueMutex.lock ();
372- bundleQueue.Push (bundle);
373- bundleWork |= 0x02 ; // set bit 1
374- int qCount=bundleQueue.Count ();
382+ // push the bundle in the processing queue
383+ bundleQueue.Push (bundle);
384+ bundleWork |= 0x02 ; // set bit 1
385+ qCount=bundleQueue.Count ();
386+ DBGPRINT2 (" ##> bundleQueue.Count()=%d)\n " , qCount);
375387 queueMutex.unlock ();
388+ /*
376389 do {
377390 waitForThreads();
378391 DBGPRINT("##> NOTIFY any thread...\n");
379392 haveBundles.notify_one();
380393 //this_thread::sleep_for(chrono::milliseconds(1));
381- sleep (0 );
394+ // sleep(0);
382395 } while (!queuePopped(bundleQueue, qCount));
396+ */
397+ waitForThreads (); // wait for any threads to become available
398+ queuePopped (bundleQueue, qCount); // pop the next bundle from the queue and process it
399+
383400#else // no threads
384401 Num_Fragments+=bundle->num_fragments ;
385402 Frag_Len+=bundle->frag_len ;
@@ -394,7 +411,7 @@ if (ballgown)
394411 dataClear.Push (bundle->idx );
395412 dataMutex.unlock ();
396413#endif
397- }
414+ } // nothing to do with this bundle
398415
399416 if (chr_changed) {
400417 if (guided) {
@@ -450,25 +467,15 @@ if (ballgown)
450467 ng_ovlstart++;
451468 }
452469 if (ng_ovlstart>ng_start) ng_end=ng_ovlstart-1 ;
453- /*
454- while(ng_end+1<ng && (int)(*guides)[ng_end+1]->start<=pos) {
455- ng_end++;
456- if(currentend<(int)(*guides)[ng_end]->end) {
457- currentend=(*guides)[ng_end]->end;
458- }
459- }
460- */
461470 } // guides present on the current chromosome
462471 bundle->refseq =lastref;
463472 bundle->start =currentstart;
464473 bundle->end =currentend;
465- } // <---- new bundle
466- // currentend=process_read(currentstart, currentend, bundle->readlist, hashread,
467- // bundle->junction, *brec, strand, nh, hi, bundle->bpcov);
468- // currentend=
474+ } // <---- new bundle just started
475+
469476 if (currentend<(int )brec->end ) {
470- // current read just pushed upper boundary of the bundle
471- // this might never happen if a longer guide was added already to the bundle
477+ // current read extends the bundle
478+ // this might not happen if a longer guide had already been added to the bundle
472479 currentend=brec->end ;
473480 if (guides) { // add any newly overlapping guides to bundle
474481 bool cend_changed;
@@ -969,8 +976,10 @@ bool waitForThreads() {
969976 waitMutex.lock ();
970977 noneWaiting=(threadsWaiting<1 );
971978 waitMutex.unlock ();
972- if (noneWaiting)
973- this_thread::sleep_for (chrono::milliseconds (30 ));
979+ if (noneWaiting) {
980+ DBGPRINT (" ##>none waiting, sleep_for(2ms)\n " );
981+ this_thread::sleep_for (chrono::milliseconds (2 ));
982+ }
974983 }
975984 DBGPRINT (" ##> there are workers ready now.\n " );
976985 return (!noneWaiting);
@@ -1033,7 +1042,7 @@ int waitForData(BundleData* bundles) {
10331042 return bidx;
10341043 }
10351044 dataMutex.unlock ();
1036- this_thread::sleep_for (chrono::milliseconds (20 ));
1045+ // this_thread::sleep_for(chrono::milliseconds(20));
10371046 }
10381047 return -1 ; // should NEVER happen
10391048}
0 commit comments