Skip to content

Commit b94793d

Browse files
committed
wip: fixing thread delay issues
1 parent c2ddfc7 commit b94793d

4 files changed

Lines changed: 46 additions & 134 deletions

File tree

rlink.h

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -370,72 +370,13 @@ struct BundleData {
370370
}
371371
};
372372

373-
/*
374-
struct BundleData {
375-
BundleStatus status;
376-
int numreads;
377-
//GBamReader bamreader;
378-
int64_t bamStart; //start of bundle in BAM file
379-
char firstPass; //0=2nd pass, 1=1st pass, 2=single pass requested
380-
int idx; //index in the main bundles array
381-
int start;
382-
int end;
383-
bool covSaturated;
384-
GStr refseq;
385-
GList<CReadAln> readlist;
386-
GVec<float> bpcov;
387-
GList<CJunction> junction;
388-
GPVec<GffObj> keepguides;
389-
GList<CPrediction> pred;
390-
BundleData():status(BUNDLE_STATUS_CLEAR), numreads(0), bamStart(-1),
391-
firstPass(singlePass ? 2 : 1),
392-
idx(0), start(0), end(0), covSaturated(false), refseq(), readlist(false,true),
393-
bpcov(1024), junction(true, true, true), keepguides(false), pred(false) { }
394-
395-
void getReady(int currentstart, int currentend, GStr& ref) {
396-
start=currentstart;
397-
end=currentend;
398-
refseq=ref;
399-
status=BUNDLE_STATUS_READY;
400-
}
401-
402-
void Clear() {
403-
keepguides.Clear();
404-
pred.Clear();
405-
status=BUNDLE_STATUS_CLEAR;
406-
numreads=0;
407-
bamStart=-1;
408-
firstPass = singlePass ? 2 : 1;
409-
start=0;
410-
end=0;
411-
covSaturated=false;
412-
refseq="";
413-
readlist.Clear();
414-
bpcov.Clear();
415-
bpcov.setCapacity(1024);
416-
junction.Clear();
417-
}
418-
419-
~BundleData() {
420-
Clear();
421-
}
422-
};
423-
*/
424373
int processRead(int currentstart, int currentend, BundleData& bdata,
425374
GHash<int>& hashread, GBamRecord& brec, char strand, int nh, int hi);
426375

427376
void countRead(BundleData& bdata, GBamRecord& brec, int hi);
428377

429-
//int process_read(int currentstart, int currentend, GList<CReadAln>& readlist, GHash<int>& hashread,
430-
// GList<CJunction>& junction, GBamRecord& brec, char strand, int nh, int hi, GVec<float>& bpcov);
431-
432378
int printResults(BundleData* bundleData, int ngenes, int geneno, GStr& refname);
433379

434-
//int print_transcripts(GList<CPrediction>& pred, int ngenes, int geneno, GStr& refname);
435-
436-
//int infer_transcripts(int refstart, GList<CReadAln>& readlist,
437-
// GList<CJunction>& junction, GPVec<GffObj>& guides, GVec<float>& bpcov, GList<CPrediction>& pred, bool fast);
438-
439380
int infer_transcripts(BundleData* bundle, bool fast);
440381

441382
// --- utility functions

stringtie.cpp

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -137,21 +137,26 @@ int refseqCount=0;
137137

138138
#ifndef NOTHREADS
139139

140-
GFastMutex dataMutex; //to manage availability of data records ready to be loaded by main thread
140+
GFastMutex dataMutex; //manage availability of data records ready to be loaded by main thread
141141
GVec<int> dataClear; //indexes of data bundles cleared for loading by main thread (clear data pool)
142142

143-
GFastMutex waitMutex; // for main program to make sure there are threads ready/waiting
143+
GFastMutex waitMutex; // controls threadsWaiting (idle threads counter)
144144
int threadsWaiting=0; // how many worker threads are waiting
145145

146+
GMutex queueMutex; //controls bundleQueue and bundleWork access
147+
int bundleWork=1; // bit 0 set if bundles are still being prepared (BAM file not exhausted yet)
148+
// bit 1 set if there are Bundles ready in the queue
149+
GConditionVar haveBundles; //will notify all threads when bundles are pushed in the ready queue
150+
//or no more bundles are coming
151+
152+
153+
146154
GFastMutex printMutex; //for writing the output to file
155+
147156
GFastMutex logMutex; //only when verbose - to avoid mangling the log output
148-
GMutex queueMutex; //whenever bundleQueue is updated
157+
149158
GFastMutex bamReadingMutex;
150-
GConditionVar haveBundles; //will notify all threads when bundles are pushed in the ready queue
151-
//or no more bundles are coming
152159

153-
int bundleWork=1; // bit 0 set if bundles are still being prepared (BAM file not exhausted yet)
154-
// bit 1 set if there are Bundles ready in the queue
155160
#endif
156161

157162
bool NoMoreBundles=false;
@@ -285,9 +290,13 @@ const char* ERR_BAM_SORT="\nError: the input alignment file is not sorted!\n";
285290
if (ballgown)
286291
Ballgown_setupFiles(f_tdata, f_edata, f_idata, f_e2t, f_i2t);
287292
#ifndef NOTHREADS
288-
GThread* threads=new GThread[num_cpus];
289-
GPVec<BundleData> bundleQueue(false);
290-
BundleData* bundles=new BundleData[num_cpus+1]; //extra one being prepared while all others are processed
293+
GThread* threads=new GThread[num_cpus]; //bundle processing threads
294+
295+
GPVec<BundleData> bundleQueue(false); //queue of loaded bundles
296+
297+
BundleData* bundles=new BundleData[num_cpus+1];
298+
//bundles[1..num_cpus] are processed by threads, bundles[0] is being prepared
299+
291300
dataClear.setCapacity(num_cpus+1);
292301
for (int b=0;b<num_cpus;b++) {
293302
threads[b].kickStart(workerThread, (void*) &bundleQueue);
@@ -366,20 +375,28 @@ if (ballgown)
366375
}*/
367376
bundle->getReady(currentstart, currentend);
368377
#ifndef NOTHREADS
369-
//push this in the bundle queue, where it'll be picked up by the threads
378+
//push this in the bundle queue where it'll be picked up by the threads
370379
DBGPRINT2("##> Locking queueMutex to push loaded bundle into the queue (bundle.start=%d)\n", bundle->start);
380+
int qCount=0;
371381
queueMutex.lock();
372-
bundleQueue.Push(bundle);
373-
bundleWork |= 0x02; //set bit 1
374-
int qCount=bundleQueue.Count();
382+
//push the bundle in the processing queue
383+
bundleQueue.Push(bundle);
384+
bundleWork |= 0x02; //set bit 1
385+
qCount=bundleQueue.Count();
386+
DBGPRINT2("##> bundleQueue.Count()=%d)\n", qCount);
375387
queueMutex.unlock();
388+
/*
376389
do {
377390
waitForThreads();
378391
DBGPRINT("##> NOTIFY any thread...\n");
379392
haveBundles.notify_one();
380393
//this_thread::sleep_for(chrono::milliseconds(1));
381-
sleep(0);
394+
//sleep(0);
382395
} while (!queuePopped(bundleQueue, qCount));
396+
*/
397+
waitForThreads(); //wait for any threads to become available
398+
queuePopped(bundleQueue, qCount); //pop the next bundle from the queue and process it
399+
383400
#else //no threads
384401
Num_Fragments+=bundle->num_fragments;
385402
Frag_Len+=bundle->frag_len;
@@ -394,7 +411,7 @@ if (ballgown)
394411
dataClear.Push(bundle->idx);
395412
dataMutex.unlock();
396413
#endif
397-
}
414+
} //nothing to do with this bundle
398415

399416
if (chr_changed) {
400417
if (guided) {
@@ -450,25 +467,15 @@ if (ballgown)
450467
ng_ovlstart++;
451468
}
452469
if (ng_ovlstart>ng_start) ng_end=ng_ovlstart-1;
453-
/*
454-
while(ng_end+1<ng && (int)(*guides)[ng_end+1]->start<=pos) {
455-
ng_end++;
456-
if(currentend<(int)(*guides)[ng_end]->end) {
457-
currentend=(*guides)[ng_end]->end;
458-
}
459-
}
460-
*/
461470
} //guides present on the current chromosome
462471
bundle->refseq=lastref;
463472
bundle->start=currentstart;
464473
bundle->end=currentend;
465-
} //<---- new bundle
466-
//currentend=process_read(currentstart, currentend, bundle->readlist, hashread,
467-
// bundle->junction, *brec, strand, nh, hi, bundle->bpcov);
468-
//currentend=
474+
} //<---- new bundle just started
475+
469476
if (currentend<(int)brec->end) {
470-
//current read just pushed upper boundary of the bundle
471-
//this might never happen if a longer guide was added already to the bundle
477+
//current read extends the bundle
478+
//this might not happen if a longer guide had already been added to the bundle
472479
currentend=brec->end;
473480
if (guides) { //add any newly overlapping guides to bundle
474481
bool cend_changed;
@@ -969,8 +976,10 @@ bool waitForThreads() {
969976
waitMutex.lock();
970977
noneWaiting=(threadsWaiting<1);
971978
waitMutex.unlock();
972-
if (noneWaiting)
973-
this_thread::sleep_for(chrono::milliseconds(30));
979+
if (noneWaiting) {
980+
DBGPRINT("##>none waiting, sleep_for(2ms)\n");
981+
this_thread::sleep_for(chrono::milliseconds(2));
982+
}
974983
}
975984
DBGPRINT("##> there are workers ready now.\n");
976985
return(!noneWaiting);
@@ -1033,7 +1042,7 @@ int waitForData(BundleData* bundles) {
10331042
return bidx;
10341043
}
10351044
dataMutex.unlock();
1036-
this_thread::sleep_for(chrono::milliseconds(20));
1045+
//this_thread::sleep_for(chrono::milliseconds(20));
10371046
}
10381047
return -1; // should NEVER happen
10391048
}

tablemaker.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ void BundleData::rc_store_t(GffObj* t) {
3131
rc_init(t);
3232
}
3333
rc_data->addTranscript(*t);
34-
//check this read alignment against ref exons and introns
3534
}
3635

3736
struct COvlSorter {

tablemaker.h

Lines changed: 4 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -215,36 +215,21 @@ struct RC_BundleData {
215215
int init_lmin;
216216
int lmin;
217217
int rmax;
218-
//set<RC_ScaffData> tdata; //all transcripts in this bundle
219-
//map<uint, set<uint> > e2t; //mapping exon ID to transcript IDs
220-
//map<uint, set<uint> > i2t; //mapping intron ID to transcript IDs
221-
//set<RC_Feature> exons; //all exons in this bundle, by their start coordinate
222-
//set<RC_Feature> introns; //all introns in this bundle, by their start coordinate
223-
//GList<RC_ScaffData> tdata;
224-
GPVec<RC_ScaffData> tdata;
225-
GList<RC_Feature> exons;
226-
GList<RC_Feature> introns;
218+
GPVec<RC_ScaffData> tdata; //all transcripts in this bundle
219+
GList<RC_Feature> exons; //all unique exons in this bundle, by their start coordinate
220+
GList<RC_Feature> introns; //all unique introns in this bundle, by their start coordinate
227221
//RC_FeatIt xcache; //cache the first exon overlapping xcache_pos to speed up exon-overlap queries (findExons())
228222
int xcache; //exons index of the first exon overlapping xcache_pos
229223
int xcache_pos; // left coordinate of last cached exon overlap query (findExons())
230-
// -- output files
231-
/*
232-
FILE* ftdata; //t_data
233-
FILE* fedata; //e_data
234-
FILE* fidata; //i_data
235-
FILE* fe2t; //e2t
236-
FILE* fi2t; //i2t
237-
*/
238224
vector<float> f_mcov; //coverage data, multi-map aware, per strand
239225
vector<int> f_cov;
240226
vector<float> r_mcov; //coverage data on the reverse strand
241227
vector<int> r_cov;
242228
//
243229
RC_BundleData(int t_l=0, int t_r=0):init_lmin(0), lmin(t_l), rmax(t_r),
244230
tdata(false), // e2t(), i2t(), exons(), introns(),
245-
exons(true, false, true), introns(true,false,true),
231+
exons(true, false, true), introns(true, false, true),
246232
xcache(0), xcache_pos(0)
247-
//, ftdata(NULL), fedata(NULL), fidata(NULL), fe2t(NULL), fi2t(NULL)
248233
{
249234
if (rmax>lmin) updateCovSpan();
250235
}
@@ -256,25 +241,6 @@ struct RC_BundleData {
256241
r_mcov.clear();
257242
}
258243

259-
/*
260-
void addBundleFeature(uint t_id, int l, int r, char strand, uint f_id, set<RC_Feature>& fset,
261-
map<uint, set<uint> >& f2t) {
262-
RC_Feature feat(l, r, strand, f_id);
263-
fset.insert(feat);
264-
//pair<RC_FeatIt, bool> in = fset.insert(feat);
265-
//if (!in.second) { //existing f_id
266-
// f_id=in.first->id;
267-
//}
268-
set<uint> tset;
269-
tset.insert(t_id);
270-
pair<RC_Map2SetIt, bool> mapin=f2t.insert(pair<uint, set<uint> >(f_id, tset));
271-
if (!mapin.second) {
272-
//existing f_id
273-
(*mapin.first).second.insert(t_id);
274-
}
275-
}
276-
*/
277-
278244
void addTranscript(GffObj& t) {
279245
//if (!ps.rc_id_data()) return;
280246
//RC_ScaffIds& sdata = *(ps.rc_id_data());
@@ -288,12 +254,9 @@ struct RC_BundleData {
288254
if (boundary_changed) updateCovSpan();
289255
//for (vector<RC_ScaffSeg>::iterator it=sdata.exons.begin();it!=sdata.exons.end();++it) {
290256
for (int i=0;i<sdata.t_exons.Count();i++) {
291-
//addBundleFeature(sdata.t_id, sdata.exons[i], sdata.strand, exons);
292257
exons.Add(sdata.t_exons[i]);
293258
}
294259
//store introns:
295-
//for (vector<RC_ScaffSeg>::iterator it=sdata.introns.begin();it!=sdata.introns.end();++it) {
296-
// addBundleFeature(sdata.t_id, it->l, it->r, sdata.strand, it->id, introns, i2t);
297260
for (int i=0;i<sdata.t_introns.Count();i++) {
298261
introns.Add(sdata.t_introns[i]);
299262
}

0 commit comments

Comments
 (0)