diff --git a/Makefile b/Makefile index ddbfff899..7658c0514 100644 --- a/Makefile +++ b/Makefile @@ -220,6 +220,7 @@ LIBHTS_OBJS = \ regidx.o \ region.o \ sam.o \ + sam_cache.o \ sam_mods.o \ simd.o \ synced_bcf_reader.o \ @@ -283,6 +284,7 @@ header_h = header.h cram/string_alloc.h cram/pooled_alloc.h $(htslib_khash_h) $( hfile_internal_h = hfile_internal.h $(htslib_hts_defs_h) $(htslib_hfile_h) $(textutils_internal_h) hts_internal_h = hts_internal.h $(htslib_hts_h) $(textutils_internal_h) hts_time_funcs_h = hts_time_funcs.h +sam_cache_h = sam_cache.h $(htslib_khash_h) $(htslib_sam_h) sam_internal_h = sam_internal.h $(htslib_sam_h) textutils_internal_h = textutils_internal.h $(htslib_kstring_h) thread_pool_internal_h = thread_pool_internal.h $(htslib_thread_pool_h) @@ -503,11 +505,11 @@ hfile.o hfile.pico: hfile.c config.h $(htslib_hfile_h) $(hfile_internal_h) $(hts hfile_gcs.o hfile_gcs.pico: hfile_gcs.c config.h $(htslib_hts_h) $(htslib_kstring_h) $(hfile_internal_h) hfile_libcurl.o hfile_libcurl.pico: hfile_libcurl.c config.h $(hfile_internal_h) $(htslib_hts_h) $(htslib_hts_alloc_h) $(htslib_kstring_h) $(htslib_khash_h) hfile_s3.o hfile_s3.pico: hfile_s3.c config.h $(hfile_internal_h) $(htslib_hts_h) $(htslib_hts_alloc_h) $(htslib_kstring_h) $(hts_time_funcs_h) -hts.o hts.pico: hts.c config.h os/lzma_stub.h $(htslib_hts_h) $(htslib_bgzf_h) $(cram_h) $(htslib_hfile_h) $(htslib_hts_endian_h) version.h config_vars.h $(hts_internal_h) $(hfile_internal_h) $(sam_internal_h) $(htslib_hts_alloc_h) $(htslib_hts_expr_h) $(htslib_hts_os_h) $(htslib_khash_h) $(htslib_kseq_h) $(htslib_ksort_h) $(htslib_tbx_h) $(htscodecs_htscodecs_h) +hts.o hts.pico: hts.c config.h os/lzma_stub.h $(htslib_hts_h) $(htslib_bgzf_h) $(cram_h) $(htslib_hfile_h) $(htslib_hts_endian_h) version.h config_vars.h $(hts_internal_h) $(hfile_internal_h) $(sam_internal_h) $(htslib_hts_alloc_h) $(htslib_hts_expr_h) $(htslib_hts_os_h) $(htslib_khash_h) $(htslib_kseq_h) $(htslib_ksort_h) $(htslib_tbx_h) $(htscodecs_htscodecs_h) $(sam_cache_h) hts_expr.o hts_expr.pico: hts_expr.c config.h $(htslib_hts_expr_h) $(htslib_hts_alloc_h) $(htslib_hts_log_h) $(textutils_internal_h) hts_os.o hts_os.pico: hts_os.c config.h $(htslib_hts_defs_h) os/rand.c vcf.o vcf.pico: vcf.c config.h $(fuzz_settings_h) $(htslib_vcf_h) $(htslib_bgzf_h) $(htslib_tbx_h) $(htslib_hfile_h) $(hts_internal_h) $(htslib_hts_alloc_h) $(htslib_hts_endian_h) $(htslib_khash_str2int_h) $(htslib_kstring_h) $(htslib_sam_h) $(htslib_khash_h) $(htslib_kseq_h) $(bgzf_internal_h) -sam.o sam.pico: sam.c config.h $(fuzz_settings_h) $(htslib_hts_defs_h) $(htslib_sam_h) $(htslib_bgzf_h) $(cram_h) $(hts_internal_h) $(sam_internal_h) $(htslib_hfile_h) $(htslib_hts_alloc_h) $(htslib_hts_endian_h) $(htslib_hts_expr_h) $(header_h) $(htslib_khash_h) $(htslib_kseq_h) $(htslib_kstring_h) +sam.o sam.pico: sam.c config.h $(fuzz_settings_h) $(htslib_hts_defs_h) $(htslib_sam_h) $(htslib_bgzf_h) $(cram_h) $(hts_internal_h) $(sam_internal_h) $(htslib_hfile_h) $(htslib_hts_alloc_h) $(htslib_hts_endian_h) $(htslib_hts_expr_h) $(header_h) $(htslib_khash_h) $(htslib_kseq_h) $(htslib_kstring_h) $(sam_cache_h) sam_mods.o sam_mods.pico: sam_mods.c config.h $(htslib_sam_h) $(textutils_internal_h) simd.o simd.pico: simd.c config.h $(htslib_sam_h) $(sam_internal_h) tbx.o tbx.pico: tbx.c config.h $(htslib_tbx_h) $(htslib_bgzf_h) $(htslib_hts_alloc_h) $(htslib_hts_endian_h) $(hts_internal_h) $(htslib_khash_h) diff --git a/hts.c b/hts.c index bb16cdff4..a0040baab 100644 --- a/hts.c +++ b/hts.c @@ -62,6 +62,7 @@ DEALINGS IN THE SOFTWARE. */ #include "htslib/hts_alloc.h" #include "htslib/hts_expr.h" #include "htslib/hts_os.h" // drand48 +#include "sam_cache.h" #include "htslib/khash.h" #include "htslib/kseq.h" @@ -1216,7 +1217,9 @@ int hts_opt_add(hts_opt **opts, const char *c_arg) { else if (strcmp(o->arg, "fastq_umi_regex") == 0 || strcmp(o->arg, "FASTQ_UMI_REGEX") == 0) o->opt = FASTQ_OPT_UMI_REGEX, o->val.s = val; - + else if (strcmp(o->arg, "hts_maxdepth") == 0 || + strcmp(o->arg, "HTS_MAXDEPTH") == 0) + o->opt = HTS_OPT_MAXDEPTH, o->val.i = atoi(val); //todo should this be sammaxdepth as it is not applicable to bcf else { hts_log_error("Unknown option '%s'", o->arg); free(o->arg); @@ -1699,6 +1702,7 @@ int hts_close(htsFile *fp) } save = errno; + destroycache(fp); sam_hdr_destroy(fp->bam_header); hts_idx_destroy(fp->idx); hts_filter_free(fp->filter); @@ -1903,6 +1907,22 @@ int hts_set_opt(htsFile *fp, enum hts_fmt_option opt, ...) { } // else CRAM manages this in its own way break; } + case HTS_OPT_MAXDEPTH: { + va_start(args, opt); + int dpth = va_arg(args, int); + va_end(args); + // int wndsz = dpth >> 8; + // if (wndsz <= 0) + // wndsz = 350; + // dpth = dpth & 0x00FF; //upto 65535 + //todo check whether sorted by pos and setup only if so + if (dpth > 0) + if(setupcache(fp, 3500, dpth)) { + hts_log_warning("Failed to setup hts cache"); + } + //hts_log_warning("Wnd %d dpth %d", wndsz, dpth); + return 0; + } default: break; @@ -4270,75 +4290,148 @@ hts_itr_t *hts_itr_regions(const hts_idx_t *idx, hts_reglist_t *reglist, int cou return itr; } -int hts_itr_next(BGZF *fp, hts_itr_t *iter, void *r, void *data) +int hts_itr_next(BGZF *fp, hts_itr_t *iter, void *s, void *data) { - int ret, tid; + int ret, tid, sts = 0; hts_pos_t beg, end; + void *c = getsamcache(iter, data); + void *e = NULL; + void *r = s; + if (iter == NULL || iter->finished) return -1; + if (c && getfromreadcache_iter(c, s, &iter->curr_tid, &iter->curr_beg, &iter->curr_end)) { + return 0; + } if (iter->read_rest) { if (iter->curr_off) { // seek to the start if (bgzf_seek(fp, iter->curr_off, SEEK_SET) < 0) { hts_log_error("Failed to seek to offset %"PRIu64"%s%s", - iter->curr_off, - errno ? ": " : "", strerror(errno)); + iter->curr_off, + errno ? ": " : "", strerror(errno)); return -2; } iter->curr_off = 0; // only seek once } - ret = iter->readrec(fp, data, r, &tid, &beg, &end); - if (ret < 0) iter->finished = 1; - iter->curr_tid = tid; - iter->curr_beg = beg; - iter->curr_end = end; - return ret; } // A NULL iter->off should always be accompanied by iter->finished. - assert(iter->off != NULL); + assert(iter->off != NULL || iter->read_rest); for (;;) { - if (iter->curr_off == 0 || iter->curr_off >= iter->off[iter->i].v) { // then jump to the next chunk - if (iter->i == iter->n_off - 1) { ret = -1; break; } // no more chunks - if (iter->i < 0 || iter->off[iter->i].v != iter->off[iter->i+1].u) { // not adjacent chunks; then seek - if (bgzf_seek(fp, iter->off[iter->i+1].u, SEEK_SET) < 0) { - hts_log_error("Failed to seek to offset %"PRIu64"%s%s", - iter->off[iter->i+1].u, - errno ? ": " : "", strerror(errno)); - return -2; - } - iter->curr_off = bgzf_tell(fp); - } - ++iter->i; + if (c) { + if (!(e = getcache_iter(data))) + return -3; + r = getreadbuffer_iter(e); + sts = 0; } - if ((ret = iter->readrec(fp, data, r, &tid, &beg, &end)) >= 0) { - iter->curr_off = bgzf_tell(fp); - if (tid != iter->tid || beg >= iter->end) { // no need to proceed - ret = -1; break; - } else if (end > iter->beg && iter->end > beg) { + if (iter->read_rest) { + ret = iter->readrec(fp, data, r, &tid, &beg, &end); + if (ret < 0) { + if (c) { + notifyend_iter(c, e); + } + break; + } else if (!c) { iter->curr_tid = tid; iter->curr_beg = beg; iter->curr_end = end; return ret; + } else {//TODO use end to set e->len + if (addtoreadcache_iter(c, e, &sts)) { + return -3; + } + if (sts >= 2) //ready/wnd full/end + break; + continue; } - } else break; // end of file or error + } else { + if (iter->curr_off == 0 || iter->curr_off >= iter->off[iter->i].v) { // then jump to the next chunk + if (iter->i == iter->n_off - 1) { + ret = -1; + if (c) { + notifyend_iter(c, e); + } + break; + } // no more chunks + if (iter->i < 0 || iter->off[iter->i].v != iter->off[iter->i+1].u) { // not adjacent chunks; then seek + if (bgzf_seek(fp, iter->off[iter->i+1].u, SEEK_SET) < 0) { + hts_log_error("Failed to seek to offset %"PRIu64"%s%s", + iter->off[iter->i+1].u, + errno ? ": " : "", strerror(errno)); + return -2; + } + iter->curr_off = bgzf_tell(fp); + } + ++iter->i; + } + if ((ret = iter->readrec(fp, data, r, &tid, &beg, &end)) >= 0) { + iter->curr_off = bgzf_tell(fp); + if (tid != iter->tid || beg >= iter->end) { // no need to proceed + ret = -1; + if (c) { + notifyend_iter(c, e); + } + break; + } else if (end > iter->beg && iter->end > beg) { + if (c) { + if (addtoreadcache_iter(c, e, &sts)) { + return -3; + } + if (sts >= 2) //ready/wnd full/end + break; + continue; + } + iter->curr_tid = tid; + iter->curr_beg = beg; + iter->curr_end = end; + return ret; + } else { //non interested data? + if (c) + retcache(c,e); + } + } else { + if (c && ret == -1) { //eof + notifyend_iter(c, e); + } + break; // end of file or error + } + } + } + if (c && ret >= -1) { + processcache_iter(c); + if (!getfromreadcache_iter(c, s, &iter->tid, &iter->curr_beg, &iter->curr_end)) { + //if (ret == -1) { + iter->finished = 1; + resetcache_itr(c); + //} + return -1; + } else if (ret < 0) + ret = 0; + return ret; + } else { + iter->finished = 1; } - iter->finished = 1; return ret; } -int hts_itr_multi_next(htsFile *fd, hts_itr_t *iter, void *r) +int hts_itr_multi_next(htsFile *fd, hts_itr_t *iter, void *s) { void *fp; - int ret, tid, i, cr, ci; + int ret, tid, i, cr, ci, sts = 0, used = 0; hts_pos_t beg, end; hts_reglist_t *found_reg; + void *c = (void*)fd->c; + void *e = NULL; + void *r = s; - if (iter == NULL || iter->finished) return -1; + if (iter == NULL || iter->finished) return -1;//todo chk + if (c && getfromreadcache_iter(c, s, &iter->curr_tid, &iter->curr_beg, &iter->curr_end)) { + return 0; + } if (iter->is_cram) { fp = fd->fp.cram; } else { fp = fd->fp.bgzf; } - if (iter->read_rest) { if (iter->curr_off) { // seek to the start if (iter->seek(fp, iter->curr_off, SEEK_SET) < 0) { @@ -4348,267 +4441,351 @@ int hts_itr_multi_next(htsFile *fd, hts_itr_t *iter, void *r) iter->curr_off = 0; // only seek once } - ret = iter->readrec(fp, fd, r, &tid, &beg, &end); - if (ret < 0) - iter->finished = 1; + for(;;) { + if (c) { + if (!(e = getcache_iter(fd))) + return -3; + r = getreadbuffer_iter(e); + sts = 0; + } + ret = iter->readrec(fp, fd, r, &tid, &beg, &end); + if (ret < 0) { + if (c) { + notifyend_iter(c, e); + } + break; + } else if (!c) { + iter->curr_tid = tid; + iter->curr_beg = beg; + iter->curr_end = end; + return ret; + } else {//TODO use end to set e->len + if (addtoreadcache_iter(c, e, &sts)) { + return -3; + } + if (sts >= 2) //ready/wnd full/end + break; + continue; + } + } + } else { + // A NULL iter->off should always be accompanied by iter->finished. + assert(iter->off != NULL || iter->nocoor != 0); + + int next_range = 0; + for (;;) { + // Note that due to the way bam indexing works, iter->off may contain + // file chunks that are not actually needed as they contain data + // beyond the end of the requested region. These are filtered out + // by comparing the tid and index into hts_reglist_t::intervals + // (packed for reasons of convenience into iter->off[iter->i].max) + // associated with the file region with iter->curr_tid and + // iter->curr_intv. + + if (c) { + if (!(e = getcache_iter(fd))) + return -3; + r = getreadbuffer_iter(e); + used = 0; + sts = 0; + } + if (next_range + || iter->curr_off == 0 + || iter->i >= iter->n_off + || iter->curr_off >= iter->off[iter->i].v + || (iter->off[iter->i].max >> 32 == iter->curr_tid + && (iter->off[iter->i].max & 0xffffffff) < iter->curr_intv)) { + + // Jump to the next chunk. It may be necessary to skip more + // than one as the iter->off list can include overlapping entries. + do { + iter->i++; + } while (iter->i < iter->n_off + && (iter->curr_off >= iter->off[iter->i].v + || (iter->off[iter->i].max >> 32 == iter->curr_tid + && (iter->off[iter->i].max & 0xffffffff) < iter->curr_intv))); + + if (iter->is_cram && iter->i < iter->n_off) { + // Ensure iter->curr_reg is correct. + // + // We need this for CRAM as we shortcut some of the later + // logic by getting an end-of-range and continuing to the + // next offset. + // + // We cannot do this for BAM (and fortunately do not need to + // either) because in BAM world a query to genomic positions + // GX and GY leading to a seek offsets PX and PY may have + // GX > GY and PX < PY. (This is due to the R-tree and falling + // between intervals, bumping up to a higher bin.) + // CRAM strictly follows PX >= PY if GX >= GY, so this logic + // works. + int want_tid = iter->off[iter->i].max >> 32; + if (!(iter->curr_reg < iter->n_reg && + iter->reg_list[iter->curr_reg].tid == want_tid)) { + int j; + for (j = 0; j < iter->n_reg; j++) + if (iter->reg_list[j].tid == want_tid) + break; + if (j == iter->n_reg) + return -1; + iter->curr_reg = j; + iter->curr_tid = iter->reg_list[iter->curr_reg].tid; + }; + iter->curr_intv = iter->off[iter->i].max & 0xffffffff; + } - iter->curr_tid = tid; - iter->curr_beg = beg; - iter->curr_end = end; + if (iter->i >= iter->n_off) { // no more chunks, except NOCOORs + if (iter->nocoor) { + next_range = 0; + if (iter->seek(fp, iter->nocoor_off, SEEK_SET) < 0) { + hts_log_error("Seek at offset %" PRIu64 " failed.", iter->nocoor_off); + return -2; + } + if (iter->is_cram) { + cram_range r = { HTS_IDX_NOCOOR }; + cram_set_option(fp, CRAM_OPT_RANGE_NOSEEK, &r); + } - return ret; - } - // A NULL iter->off should always be accompanied by iter->finished. - assert(iter->off != NULL || iter->nocoor != 0); + // The first slice covering the unmapped reads might + // contain a few mapped reads, so scroll + // forward until finding the first unmapped read. + do { + ret = iter->readrec(fp, fd, r, &tid, &beg, &end); + } while (tid >= 0 && ret >=0); - int next_range = 0; - for (;;) { - // Note that due to the way bam indexing works, iter->off may contain - // file chunks that are not actually needed as they contain data - // beyond the end of the requested region. These are filtered out - // by comparing the tid and index into hts_reglist_t::intervals - // (packed for reasons of convenience into iter->off[iter->i].max) - // associated with the file region with iter->curr_tid and - // iter->curr_intv. - - if (next_range - || iter->curr_off == 0 - || iter->i >= iter->n_off - || iter->curr_off >= iter->off[iter->i].v - || (iter->off[iter->i].max >> 32 == iter->curr_tid - && (iter->off[iter->i].max & 0xffffffff) < iter->curr_intv)) { - - // Jump to the next chunk. It may be necessary to skip more - // than one as the iter->off list can include overlapping entries. - do { - iter->i++; - } while (iter->i < iter->n_off - && (iter->curr_off >= iter->off[iter->i].v - || (iter->off[iter->i].max >> 32 == iter->curr_tid - && (iter->off[iter->i].max & 0xffffffff) < iter->curr_intv))); - - if (iter->is_cram && iter->i < iter->n_off) { - // Ensure iter->curr_reg is correct. - // - // We need this for CRAM as we shortcut some of the later - // logic by getting an end-of-range and continuing to the - // next offset. - // - // We cannot do this for BAM (and fortunately do not need to - // either) because in BAM world a query to genomic positions - // GX and GY leading to a seek offsets PX and PY may have - // GX > GY and PX < PY. (This is due to the R-tree and falling - // between intervals, bumping up to a higher bin.) - // CRAM strictly follows PX >= PY if GX >= GY, so this logic - // works. - int want_tid = iter->off[iter->i].max >> 32; - if (!(iter->curr_reg < iter->n_reg && - iter->reg_list[iter->curr_reg].tid == want_tid)) { - int j; - for (j = 0; j < iter->n_reg; j++) - if (iter->reg_list[j].tid == want_tid) + if (ret < 0) { + if (c) { + notifyend_iter(c, e); + } break; - if (j == iter->n_reg) - return -1; - iter->curr_reg = j; - iter->curr_tid = iter->reg_list[iter->curr_reg].tid; - }; - iter->curr_intv = iter->off[iter->i].max & 0xffffffff; - } + } + else { + iter->read_rest = 1; + iter->curr_off = 0; // don't seek any more + if (!c) { + iter->curr_tid = tid; + iter->curr_beg = beg; + iter->curr_end = end; + return ret; + } else {//TODO use end to set e->len + if (addtoreadcache_iter(c, e, &sts)) { + return -3; + } + if (sts >= 2) //ready/wnd full/end + break; + continue; + } + } - if (iter->i >= iter->n_off) { // no more chunks, except NOCOORs - if (iter->nocoor) { - next_range = 0; - if (iter->seek(fp, iter->nocoor_off, SEEK_SET) < 0) { - hts_log_error("Seek at offset %" PRIu64 " failed.", iter->nocoor_off); - return -2; - } - if (iter->is_cram) { - cram_range r = { HTS_IDX_NOCOOR }; - cram_set_option(fp, CRAM_OPT_RANGE_NOSEEK, &r); + } else { + ret = -1; + if (c) { + notifyend_iter(c, e); + } + break; } + } else if (iter->i < iter->n_off) { + // New chunk may overlap the last one, so ensure we + // only seek forwards. + if (iter->curr_off < iter->off[iter->i].u || next_range) { + iter->curr_off = iter->off[iter->i].u; + + // CRAM has the capability of setting an end location. + // This means multi-threaded decodes can stop once they + // reach that point, rather than pointlessly decoding + // more slices than we'll be using. + // + // We have to be careful here. Whenever we set the cram + // range we need a corresponding seek in order to ensure + // we can safely decode at that offset. We use next_range + // var to ensure this is always true; this is set on + // end-of-range condition. It's never modified for BAM. + if (iter->is_cram) { + // Next offset.[uv] tuple, but it's already been + // included in our cram range, so don't seek and don't + // reset range so we can efficiently multi-thread. + if (next_range || iter->curr_off >= iter->end) { + if (iter->seek(fp, iter->curr_off, SEEK_SET) < 0) { + hts_log_error("Seek at offset %" PRIu64 + " failed.", iter->curr_off); + return -2; + } - // The first slice covering the unmapped reads might - // contain a few mapped reads, so scroll - // forward until finding the first unmapped read. - do { - ret = iter->readrec(fp, fd, r, &tid, &beg, &end); - } while (tid >= 0 && ret >=0); - - if (ret < 0) - iter->finished = 1; - else - iter->read_rest = 1; - - iter->curr_off = 0; // don't seek any more - iter->curr_tid = tid; - iter->curr_beg = beg; - iter->curr_end = end; - - return ret; - } else { - ret = -1; break; - } - } else if (iter->i < iter->n_off) { - // New chunk may overlap the last one, so ensure we - // only seek forwards. - if (iter->curr_off < iter->off[iter->i].u || next_range) { - iter->curr_off = iter->off[iter->i].u; - - // CRAM has the capability of setting an end location. - // This means multi-threaded decodes can stop once they - // reach that point, rather than pointlessly decoding - // more slices than we'll be using. - // - // We have to be careful here. Whenever we set the cram - // range we need a corresponding seek in order to ensure - // we can safely decode at that offset. We use next_range - // var to ensure this is always true; this is set on - // end-of-range condition. It's never modified for BAM. - if (iter->is_cram) { - // Next offset.[uv] tuple, but it's already been - // included in our cram range, so don't seek and don't - // reset range so we can efficiently multi-thread. - if (next_range || iter->curr_off >= iter->end) { + // Find the genomic range matching this interval. + int j; + hts_reglist_t *rl = &iter->reg_list[iter->curr_reg]; + cram_range r = { + rl->tid, + rl->intervals[iter->curr_intv].beg, + rl->intervals[iter->curr_intv].end + }; + + // Expand it up to cover neighbouring intervals. + // Note we can only have a single chromosome in a + // range, so if we detect our blocks span chromosomes + // or we have a multi-ref mode slice, we just use + // HTS_IDX_START refid instead. This doesn't actually + // seek (due to CRAM_OPT_RANGE_NOSEEK) and is simply + // and indicator of decoding with no end limit. + // + // That isn't as efficient as it could be, but it's + // no poorer than before and it works. + int tid = r.refid; + int64_t end = r.end; + int64_t v = iter->off[iter->i].v; + j = iter->i+1; + while (j < iter->n_off) { + if (iter->off[j].u > v) + break; + + uint64_t max = iter->off[j].max; + if ((max>>32) != tid) { + tid = HTS_IDX_START; // => no range limit + } else { + if (end < rl->intervals[max & 0xffffffff].end) + end = rl->intervals[max & 0xffffffff].end; + } + if (v < iter->off[j].v) + v = iter->off[j].v; + j++; + } + r.refid = tid; + r.end = end; + + // Remember maximum 'v' here so we don't do + // unnecessary subsequent seeks for the next + // regions. We can't change curr_off, but + // beg/end are used only by single region iterator so + // we cache it there to avoid changing the struct. + iter->end = v; + + cram_set_option(fp, CRAM_OPT_RANGE_NOSEEK, &r); + next_range = 0; + } + } else { // Not CRAM if (iter->seek(fp, iter->curr_off, SEEK_SET) < 0) { - hts_log_error("Seek at offset %" PRIu64 - " failed.", iter->curr_off); + hts_log_error("Seek at offset %" PRIu64 " failed.", + iter->curr_off); return -2; } - - // Find the genomic range matching this interval. - int j; - hts_reglist_t *rl = &iter->reg_list[iter->curr_reg]; - cram_range r = { - rl->tid, - rl->intervals[iter->curr_intv].beg, - rl->intervals[iter->curr_intv].end - }; - - // Expand it up to cover neighbouring intervals. - // Note we can only have a single chromosome in a - // range, so if we detect our blocks span chromosomes - // or we have a multi-ref mode slice, we just use - // HTS_IDX_START refid instead. This doesn't actually - // seek (due to CRAM_OPT_RANGE_NOSEEK) and is simply - // and indicator of decoding with no end limit. - // - // That isn't as efficient as it could be, but it's - // no poorer than before and it works. - int tid = r.refid; - int64_t end = r.end; - int64_t v = iter->off[iter->i].v; - j = iter->i+1; - while (j < iter->n_off) { - if (iter->off[j].u > v) - break; - - uint64_t max = iter->off[j].max; - if ((max>>32) != tid) { - tid = HTS_IDX_START; // => no range limit - } else { - if (end < rl->intervals[max & 0xffffffff].end) - end = rl->intervals[max & 0xffffffff].end; - } - if (v < iter->off[j].v) - v = iter->off[j].v; - j++; - } - r.refid = tid; - r.end = end; - - // Remember maximum 'v' here so we don't do - // unnecessary subsequent seeks for the next - // regions. We can't change curr_off, but - // beg/end are used only by single region iterator so - // we cache it there to avoid changing the struct. - iter->end = v; - - cram_set_option(fp, CRAM_OPT_RANGE_NOSEEK, &r); - next_range = 0; - } - } else { // Not CRAM - if (iter->seek(fp, iter->curr_off, SEEK_SET) < 0) { - hts_log_error("Seek at offset %" PRIu64 " failed.", - iter->curr_off); - return -2; } } } } - } - ret = iter->readrec(fp, fd, r, &tid, &beg, &end); - if (ret < 0) { - if (iter->is_cram && cram_eof(fp)) { - // Skip to end of range - // - // We should never be adjusting curr_off manually unless - // we also can guarantee we'll be doing a seek after to - // a new location. Otherwise we'll be reading wrong offset - // for the next container. - // - // We ensure this by adjusting our CRAM_OPT_RANGE - // accordingly above, but to double check we also - // set the skipped_block flag to enforce a seek also. - iter->curr_off = iter->off[iter->i].v; - next_range = 1; - - // Next region - if (++iter->curr_intv >= iter->reg_list[iter->curr_reg].count){ - if (++iter->curr_reg >= iter->n_reg) - break; - iter->curr_intv = 0; - iter->curr_tid = iter->reg_list[iter->curr_reg].tid; + ret = iter->readrec(fp, fd, r, &tid, &beg, &end); + if (ret < 0) { + if (iter->is_cram && cram_eof(fp)) { + // Skip to end of range + // + // We should never be adjusting curr_off manually unless + // we also can guarantee we'll be doing a seek after to + // a new location. Otherwise we'll be reading wrong offset + // for the next container. + // + // We ensure this by adjusting our CRAM_OPT_RANGE + // accordingly above, but to double check we also + // set the skipped_block flag to enforce a seek also. + iter->curr_off = iter->off[iter->i].v; + next_range = 1; + + // Next region + if (++iter->curr_intv >= iter->reg_list[iter->curr_reg].count){ + if (++iter->curr_reg >= iter->n_reg) { + if (c) { + notifyend_iter(c, e); + } + break; + } + iter->curr_intv = 0; + iter->curr_tid = iter->reg_list[iter->curr_reg].tid; + } + if (c) { + notifyend_iter(c, e); + } + continue; + } else { + if (c) { + notifyend_iter(c, e); + } + break; } - continue; - } else { - break; } - } - iter->curr_off = iter->tell(fp); + iter->curr_off = iter->tell(fp); - if (tid != iter->curr_tid) { - hts_reglist_t key; - key.tid = tid; + if (tid != iter->curr_tid) { + hts_reglist_t key; + key.tid = tid; - found_reg = (hts_reglist_t *)bsearch(&key, iter->reg_list, - iter->n_reg, - sizeof(hts_reglist_t), - compare_regions); - if (!found_reg) - continue; + found_reg = (hts_reglist_t *)bsearch(&key, iter->reg_list, + iter->n_reg, + sizeof(hts_reglist_t), + compare_regions); + if (!found_reg) { + retcache(c,e); + continue; + } - iter->curr_reg = (found_reg - iter->reg_list); - iter->curr_tid = tid; - iter->curr_intv = 0; - } + iter->curr_reg = (found_reg - iter->reg_list); + iter->curr_tid = tid; + iter->curr_intv = 0; + } - cr = iter->curr_reg; - ci = iter->curr_intv; + cr = iter->curr_reg; + ci = iter->curr_intv; + + for (i = ci; i < iter->reg_list[cr].count; i++) { + if (end > iter->reg_list[cr].intervals[i].beg && + iter->reg_list[cr].intervals[i].end > beg) { + if (c) { + used = 1; + sts = 0; + if (addtoreadcache_iter(c, e, &sts)) { + return -3; + } + break; + } else { + iter->curr_beg = beg; + iter->curr_end = end; + iter->curr_intv = i; + return ret; + } + } - for (i = ci; i < iter->reg_list[cr].count; i++) { - if (end > iter->reg_list[cr].intervals[i].beg && - iter->reg_list[cr].intervals[i].end > beg) { - iter->curr_beg = beg; - iter->curr_end = end; - iter->curr_intv = i; + // Check if the read starts beyond intervals[i].end + // If so, the interval is finished so move on to the next. + if (beg > iter->reg_list[cr].intervals[i].end) + iter->curr_intv = i + 1; - return ret; + // No need to keep searching if the read ends before intervals[i].beg + if (end < iter->reg_list[cr].intervals[i].beg) { + break; + } + } + if (c) { + if (used) { + if(sts >= 2) //ready/wnd full/end + break; + } else { //unused, return to cache + retcache(c,e); + } } - - // Check if the read starts beyond intervals[i].end - // If so, the interval is finished so move on to the next. - if (beg > iter->reg_list[cr].intervals[i].end) - iter->curr_intv = i + 1; - - // No need to keep searching if the read ends before intervals[i].beg - if (end < iter->reg_list[cr].intervals[i].beg) - break; } } - iter->finished = 1; + if (c && ret >= -1) { + processcache_iter(c); + if (!getfromreadcache_iter(c, s, &iter->tid, &iter->curr_beg, &iter->curr_end)) { + if (ret == -1) { + iter->finished = 1; + } + return -1; + } else + ret = 0; + return ret; + } else { + iter->finished = 1; + } return ret; } diff --git a/htslib/hts.h b/htslib/hts.h index 861bffa25..48e54618e 100644 --- a/htslib/hts.h +++ b/htslib/hts.h @@ -260,6 +260,7 @@ typedef struct htsFile { const char *fnidx; struct sam_hdr_t *bam_header; struct hts_filter_t *filter; + void *c; //for cache } htsFile; // A combined thread pool and queue allocation size. @@ -331,6 +332,7 @@ enum hts_fmt_option { HTS_OPT_BLOCK_SIZE, HTS_OPT_FILTER, HTS_OPT_PROFILE, + HTS_OPT_MAXDEPTH, // Fastq @@ -875,7 +877,7 @@ typedef int64_t hts_tell_func(void *fp); */ typedef struct hts_itr_t { - uint32_t read_rest:1, finished:1, is_cram:1, nocoor:1, multi:1, dummy:27; + uint32_t read_rest:1, finished:1, is_cram:1, nocoor:1, multi:1, usecache:1, dummy:26; int tid, n_off, i, n_reg; hts_pos_t beg, end; hts_reglist_t *reg_list; @@ -1315,6 +1317,9 @@ typedef hts_itr_t *hts_itr_query_func(const hts_idx_t *idx, int tid, hts_pos_t b HTSLIB_EXPORT hts_itr_t *hts_itr_querys(const hts_idx_t *idx, const char *reg, hts_name2id_f getid, void *hdr, hts_itr_query_func *itr_query, hts_readrec_func *readrec); +/// use caching when available +static inline hts_itr_t* hts_itr_usecache(hts_itr_t *itr) { if (itr) itr->usecache = 1; return itr; } + /// Return the next record from an iterator /** @param fp Input file handle @param iter Iterator diff --git a/sam.c b/sam.c index 12acba2ca..0973cb5d8 100644 --- a/sam.c +++ b/sam.c @@ -57,6 +57,7 @@ DEALINGS IN THE SOFTWARE. */ #include "htslib/hts_endian.h" #include "htslib/hts_expr.h" #include "header.h" +#include "sam_cache.h" #include "htslib/khash.h" KHASH_DECLARE(s2i, kh_cstr_t, int64_t) @@ -1150,6 +1151,9 @@ static int sam_readrec(BGZF *ignored, void *fpv, void *bv, int *tid, hts_pos_t * htsFile *fp = (htsFile *)fpv; bam1_t *b = bv; fp->line.l = 0; + if (fp->c) { //mark as thr' iterator + ((rc_t*)fp->c)->itr = 1; + } int ret = sam_read1(fp, fp->bam_header, b); if (ret >= 0) { *tid = b->core.tid; @@ -1165,6 +1169,9 @@ static int sam_readrec_rest(BGZF *ignored, void *fpv, void *bv, int *tid, hts_po htsFile *fp = (htsFile *)fpv; bam1_t *b = bv; fp->line.l = 0; + if (fp->c) { //mark as thr' iterator + ((rc_t*)fp->c)->itr = 1; + } int ret = sam_read1(fp, fp->bam_header, b); return ret; } @@ -1745,11 +1752,11 @@ hts_itr_t *sam_itr_queryi(const hts_idx_t *idx, int tid, hts_pos_t beg, hts_pos_ { const hts_cram_idx_t *cidx = (const hts_cram_idx_t *) idx; if (idx == NULL) - return hts_itr_query(NULL, tid, beg, end, sam_readrec_rest); + return hts_itr_usecache(hts_itr_query(NULL, tid, beg, end, sam_readrec_rest)); else if (cidx->fmt == HTS_FMT_CRAI) - return cram_itr_query(idx, tid, beg, end, sam_readrec); + return hts_itr_usecache(cram_itr_query(idx, tid, beg, end, sam_readrec)); else - return hts_itr_query(idx, tid, beg, end, sam_readrec); + return hts_itr_usecache(hts_itr_query(idx, tid, beg, end, sam_readrec)); } static int cram_name2id(void *fdv, const char *ref) @@ -1761,9 +1768,9 @@ static int cram_name2id(void *fdv, const char *ref) hts_itr_t *sam_itr_querys(const hts_idx_t *idx, sam_hdr_t *hdr, const char *region) { const hts_cram_idx_t *cidx = (const hts_cram_idx_t *) idx; - return hts_itr_querys(idx, region, bam_name2id_wrapper, hdr, + return hts_itr_usecache(hts_itr_querys(idx, region, bam_name2id_wrapper, hdr, cidx->fmt == HTS_FMT_CRAI ? cram_itr_query : hts_itr_query, - sam_readrec); + sam_readrec)); } hts_itr_t *sam_itr_regarray(const hts_idx_t *idx, sam_hdr_t *hdr, char **regarray, unsigned int regcount) @@ -1789,6 +1796,7 @@ hts_itr_t *sam_itr_regarray(const hts_idx_t *idx, sam_hdr_t *hdr, char **regarra itr = hts_itr_regions(idx, r_list, r_count, bam_name2id_wrapper, hdr, hts_itr_multi_bam, sam_readrec, bam_pseek, bam_ptell); } + hts_itr_usecache(itr); if (!itr) hts_reglist_free(r_list, r_count); @@ -1804,11 +1812,11 @@ hts_itr_t *sam_itr_regions(const hts_idx_t *idx, sam_hdr_t *hdr, hts_reglist_t * return NULL; if (cidx->fmt == HTS_FMT_CRAI) - return hts_itr_regions(idx, reglist, regcount, cram_name2id, cidx->cram, - hts_itr_multi_cram, cram_readrec, cram_pseek, cram_ptell); + return hts_itr_usecache(hts_itr_regions(idx, reglist, regcount, cram_name2id, cidx->cram, + hts_itr_multi_cram, cram_readrec, cram_pseek, cram_ptell)); else - return hts_itr_regions(idx, reglist, regcount, bam_name2id_wrapper, hdr, - hts_itr_multi_bam, sam_readrec, bam_pseek, bam_ptell); + return hts_itr_usecache(hts_itr_regions(idx, reglist, regcount, bam_name2id_wrapper, hdr, + hts_itr_multi_bam, sam_readrec, bam_pseek, bam_ptell)); } /********************** @@ -4266,11 +4274,38 @@ static inline int sam_read1_sam(htsFile *fp, sam_hdr_t *h, bam1_t *b) { // Returns 0 on success, // -1 on EOF, // <-1 on error -int sam_read1(htsFile *fp, sam_hdr_t *h, bam1_t *b) +int sam_read1(htsFile *fp, sam_hdr_t *h, bam1_t *r) { int ret, pass_filter; + rc_t *c = NULL; + ce_t *e = NULL; + bam1_t *b = r; + + if(fp->c) { //cache in use? + if (!((rc_t*)fp->c)->itr) { //not thr' iterators, OK to use here + c = (rc_t*)fp->c; + } + } + if (c) { + if ((ret = getfromreadcache(c, r, NULL)) > 0) { + return 0; + } else if (ret < 0) + return -1; + } + if(c) { + LG("sr1: t %"PRIu64" s %"PRIu64" i %"PRIu64" n %"PRIu64"\n", c->rcnt, c->selcnt,c->inscnt, c->nselcnt); + // assert(!c->inscnt && !c->selcnt); + // assert(!c->head_sel && !c->tail_sel); + // assert(!c->head_ins && !c->tail_ins); + } do { + if (c) { + if (!(e = getcache(fp))) + return -4; + b = e->r; + // assert(c->cache.m == c->cache.f+1+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + } switch (fp->format.format) { case bam: ret = sam_read1_bam(fp, h, b); @@ -4308,7 +4343,56 @@ int sam_read1(htsFile *fp, sam_hdr_t *h, bam1_t *b) pass_filter = (ret >= 0 && fp->filter) ? sam_passes_filter(h, b, fp->filter) : 1; + + //if pass, add to cache + //move processing to thread + if (c) { + if (pass_filter) { + /* window should start from 1st pos read in + pos may change or could be on same pos and + as tid changes, pos may start again from older pos or even smaller than that! + */ + pass_filter = 0; + if (ret >= 0) { + if (addtoreadcache(c, e, NULL)) { + return -4; + } + } else { + if (ret == -1) { + c->trgr = 4;//end //todo change to avoid internal access + c->tid = -3; + retcache(c,e); + //fprintf(stderr,"ret -1; ready for processing\n"); + } + } + if (c->trgr >= 2) { //end or window full/ready + // { + // assert(!c->inscnt && !c->selcnt); + // assert(!c->head_sel && !c->tail_sel); + // assert(!c->head_ins && !c->tail_ins); + // } + + processcache(c); + + pass_filter = getfromreadcache(c, r, NULL); + if (-1 == ret && !pass_filter) { + pass_filter = 1; + // ce_t *tmp = c->head_nsel; + // while (tmp) { + // LG("LG bal %"PRIu64" %s,,,nsel-cleaning\n", tmp->ord, tmp->log.s); + // tmp = tmp->next; + // } + } + else + ret = 0; + } + } else + retcache(c, e); + } } while (pass_filter == 0); + if (c) { + LG("sr2: t %"PRIu64" s %"PRIu64" i %"PRIu64" n %"PRIu64"\n", c->rcnt, c->selcnt,c->inscnt, c->nselcnt); + } return pass_filter < 0 ? -2 : ret; } diff --git a/sam_cache.c b/sam_cache.c new file mode 100644 index 000000000..35c4fd665 --- /dev/null +++ b/sam_cache.c @@ -0,0 +1,900 @@ +/* sam_cache.c -- Functions to create a cache of reads for depth handling + + Copyright (C) 2026 Genome Research Ltd. + + Author: Vasudeva Sarma + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. */ + +#include + +#include "sam_cache.h" + +#ifdef CACHE_DBG_LOG +FILE *clogfp = NULL; +#endif //CACHE_DBG_LOG + +//setup/destroy +int setupcache(htsFile *fp, int wndsz, int maxdpth) +{ + int i, j; + rc_t *c = (rc_t*)fp->c; + ce_t *elem = NULL, *tail = NULL; + const int inc = 1024; + if (!c) { //create cache + if (!(c = calloc(1, sizeof(rc_t)))) + goto fail; + fp->c = c; + } + + if (!(c->cache.p = malloc(sizeof(ce_t*)))) + goto fail; + if((elem = calloc(inc, sizeof(ce_t)))) { + c->cache.p[c->cache.n++] = elem; + c->cache.head = tail = elem; + if (!(elem->r = bam_init1())) + goto fail; + ks_initialize(&elem->log); + for (i = 1; i < inc; ++i) { + if (!((elem + i )->r = bam_init1())) + goto fail; + ks_initialize(&(elem+i)->log); + tail->next = elem + i; + tail = tail->next; + } + c->cache.m += inc; + c->cache.f += inc; + c->cache.tail = tail; + } else + goto fail; + + c->wndsz = wndsz; + c->maxdpth = maxdpth; + c->w_st = c->w_en = -1; + c->tid = -2; //start + c->selpair = kh_init(kh_pair); + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + return 0; + +fail: + if (c) { + for (i = 0; i < c->cache.n; ++i) { + elem = c->cache.p[i]; + for (j = 0; j < inc; ++j) { + bam_destroy1(elem[j].r); + ks_free(&(elem[j].log)); + } + free(elem); + } + free(c->cache.p); + free(c); + fp->c = NULL; + } + return 1; +} + +void destroycache(htsFile *fp) +{ + int i, j; + const int inc = 1024; + ce_t *elem = NULL; + rc_t *c = (rc_t*) fp->c; + khint_t iter; + if(!c) + return; + for (iter = kh_begin(c->selpair); iter != kh_end(c->selpair); ++iter) { + if (kh_exist(c->selpair, iter)) { + kh_del(kh_pair, c->selpair, iter); + } + } + for (i = 0; i < c->cache.n; ++i) { + elem = c->cache.p[i]; + for (j = 0; j < inc; ++j) { + bam_destroy1(elem[j].r); + ks_free(&elem[j].log); + } + free(elem); + } + free(c->cache.p); + free(c->dpth); + free(c->inc); + kh_destroy(kh_pair, c->selpair); + free(c); + fp->c = NULL; +} + +//implementation / internals +//todo htsopt3 to try +//-1 on failure and 1 when required and 0 on skip +static int updatedepth(rc_t *c, ce_t *e, int chk) +{ + int *dpth = NULL; + uint32_t *cgr = bam_get_cigar(e->r), i, j; + int clen, off, req = 0; + hts_pos_t st, en, len = 0; + if (!c->dpth) { //setup depth buffer + c->dp_sz = c->wndsz; + if (!(c->dpth = calloc(c->dp_sz, sizeof(int)))) + goto fail; + c->dp_en = c->w_st + c->dp_sz; off = 0; + } + dpth = c->dpth; + st = c->w_st; en = c->dp_en; + if (st > e->r->core.pos) + goto fail; //not sorted? + if (e->len > c->inc_sz) { //grow increment buffer as required + c->inc = realloc(c->inc, sizeof(int) * e->len); + c->inc_sz = e->len; + for(i = 0; i < c->inc_sz; ++i) + *(c->inc + i) = 1; + } + off = e->r->core.pos - st; + { + len = off + e->len; + if (en < (c->w_st+len)) { //extra space possible, every cigars may not contribute! + len = c->w_st + len - en; + if (!(dpth = realloc(c->dpth, (len+c->dp_sz) * sizeof(int)))) { + goto fail; + } + memset(dpth + c->dp_sz, 0, len * sizeof(int)); + c->dp_sz += len; + c->dpth = dpth; + c->dp_en = en = c->w_st + c->dp_sz; + } + len = 0; + } + if (chk) { //check depth + for (i = 0; i < e->r->core.n_cigar; ++i) { + if (!(bam_cigar_type(bam_cigar_op(cgr[i])) & 2)) { //not consuming ref + continue; + } + //deletion is counted! + clen = bam_cigar_oplen(cgr[i]); + for (j = 0; j < clen; ++j) { + req |= dpth[off + j] + 1 <= c->maxdpth; + } + off += clen; + if (req) { //required, no need to check further + break; + } + } + } else { //update depth + req = 1; + for (i = 0; i < e->r->core.n_cigar; ++i) { + if (!(bam_cigar_type(bam_cigar_op(cgr[i])) & 2)) { //not consuming ref + continue; + } + clen = bam_cigar_oplen(cgr[i]); + for (j = 0; j < clen; ++j) { + dpth[off + len + j] += c->inc[j]; + } + len += clen; + } + } + if (req) + return 1; + return 0; + +fail: + return -1; +} + +//1 if required 0 if not -ve on failure +static inline int readrequired(rc_t *c, ce_t *e) +{ + return updatedepth(c, e, 1); +} + +//return cache element to cache +void retcache(rc_t *c, ce_t* elem) +{ + if ((elem->r->core.flag & BAM_FPAIRED) && !(elem->r->core.flag & BAM_FUNMAP)) { //paired and mate mapped, remove from expected pair + khiter_t it = kh_get(kh_pair, c->selpair, bam_get_qname(elem->r)); + if (it != kh_end(c->selpair) && kh_exist(c->selpair, it)) { + //clear from expected pair hash + kh_del(kh_pair, c->selpair, it); + } + } + +#ifdef CACHE_DBG_LOG + if (elem->ord){//todo dbg + LG("LG ret %"PRIu64" %s\n", elem->ord, elem->log.l?elem->log.s:""); + } +#endif //CACHE_DBG_LOG + + elem->prev = NULL; + elem->ord = 0; + elem->len = 0; + elem->next = c->cache.head; + c->cache.head->prev = elem; + c->cache.head = elem; + ++c->cache.f; + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + ks_clear(&elem->log); + +} + +/// @brief get cache elemnt / storage from preallocated cache +/// @param fp htsFile pointer to setup / retrieve cache +/// @return ce_t* on success or NULL on failure +ce_t* getcache(htsFile *fp) +{ + rc_t *c = (rc_t*)fp->c; + ce_t *elem = NULL, *tail = NULL, *ret = NULL, **p = NULL; + int i = 0; + const int inc = 1024; + if (!c) { //create cache + if (!(c = calloc(1, sizeof(rc_t)))) + return NULL; + fp->c = c; + } + if (c->cache.f <= 1) { //grow cache + assert((c->cache.f == 1 && c->cache.head == c->cache.tail) || (c->cache.f != 1 && c->cache.head != c->cache.tail)); + if (!(p = realloc(c->cache.p, (c->cache.n + 1) * sizeof(ce_t*)))) + return NULL; + c->cache.p = p; + if((elem = calloc(inc, sizeof(ce_t)))) { + c->cache.p[c->cache.n++] = elem; + if (!c->cache.m){ + c->cache.head = tail = elem; + if (!(elem->r = bam_init1())) + return NULL; + i = 1; + ks_initialize(&elem->log); + } else { + tail = c->cache.tail; + i = 0; + } + for (; i < inc; ++i) { + if (!((elem + i )->r = bam_init1())) + return NULL; + tail->next = elem + i; + tail = tail->next; + ks_initialize(&elem->log); + } + c->cache.m += inc; + c->cache.f += inc; + c->cache.tail = tail; + assert(!c->cache.tail->next); + assert(c->cache.tail == elem+(i?i-1:0)); + } else + return NULL; + } + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + ret = c->cache.head; + c->cache.head = c->cache.head->next; + c->cache.head->prev = NULL; + ret->prev = NULL; + ret->next = NULL; + --c->cache.f; + assert(c->cache.m == c->cache.f+1+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + return ret; +} + +/// @brief add a read to cache +/// @param c pointer to read cache +/// @param e cache element containing the read to be cached +/// @param sts to return status of cache post caching +/// @return -1 on failure 0 on success +int addtoreadcache(rc_t *c, ce_t *e, int *sts) +{ + int unmap = 0; + if (!(e->r->core.flag & BAM_FUNMAP)) { + if (c->w_st == -1) { + c->w_st = c->head ? c->head->r->core.pos : e->r->core.pos; //starting + c->w_en = c->w_st + c->wndsz; + c->dp_en = c->w_st + c->dp_sz; + } + } else { + unmap = 1; + } + assert(c->cache.m == c->cache.f+1+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + e->ord = ++(c->ord); + e->len = bam_cigar2rlen(e->r->core.n_cigar, bam_get_cigar(e->r)); + LG("+ %s %"PRIu64"\t\t%"PRIhts_pos" %"PRIhts_pos" %"PRIu64" %"PRIhts_pos"\n", bam_get_qname(e->r), e->ord, c->w_st, e->r->core.pos, e->len, c->w_en); + if (!c->head) { + c->head = c->tail = e; + } else { + ce_t *p = c->tail; + ce_t *tmpn = NULL; + if (!unmap) { + if (p->r->core.tid == e->r->core.tid && p->r->core.pos > e->r->core.pos) { + hts_log_error("Unsorted data"); + return -1; //not sorted! + } + while (p && p->r->core.tid == e->r->core.tid && p->r->core.pos == e->r->core.pos && p->len <= e->len) { + //consider flags to make sure dup/fail/sec/supp etc. won't mask others + if (p->r->core.flag>>8 >= e->r->core.flag>>8) { + LG("\t%"PRIu64" %d - %"PRIu64" %d, continuing\n", p->ord, p->r->core.flag, e->ord, e->r->core.flag); + p = p->prev; + } + else { + LG("\t%"PRIu64" %d %"PRIu64" %d, break\n", p->ord, p->r->core.flag, e->ord, e->r->core.flag); + break; + } + } + } + if (p) { + tmpn = p->next; + p->next = e; + e->prev = p; + e->next = tmpn; + if (tmpn) + tmpn->prev = e; + if (p == c->tail) + c->tail = e; + } else { //either last or 1st + if (c->head == c->tail && !c->tail) { //none in list + c->tail = c->head = e; + e->prev = e->next = NULL; + } else { //insert 1st + tmpn = c->head; + c->head = e; + e->prev = NULL; + e->next = tmpn; + if(tmpn) + tmpn->prev = e; + } + } + } + ++c->rcnt; + //todo do we need a limit on max no of items that are cached? like the whole file is for same pos, probably cant be loaded! + if (c->tid == e->r->core.tid) { + if (c->w_en < e->r->core.pos) { //post window, process and advance + LG("wnd full\n"); + c->trgr = 2; //wnd full, go for processing + } + else + c->trgr = 1; //caching + } else if (c->tid != -2) { + LG("tid change\n"); + c->trgr = 3; //ready for processing + } + else + c->trgr = 1; //caching + + c->tid = e->r->core.tid; + if (sts) *sts = c->trgr; + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + LGlog(&e->log, "%s,%"PRIu64",added,%d,%"PRIhts_pos",%"PRIhts_pos",%"PRIu64",%"PRIhts_pos",", bam_get_qname(e->r), e->ord,e->r->core.flag,e->r->core.pos+1, e->r->core.mpos+1,e->len, e->len+e->r->core.pos+1); + return 0; +} + +/// @brief get read from procesed cache +/// @param c pointer to read cache +/// @param b pointer to bam data, to which read data is copied +/// @param end end of read, for iterators +/// @return -1 on failure, 0 when nothing to retrieve and 1 with read retrieved +int getfromreadcache(rc_t *c, bam1_t *b, hts_pos_t *end) +{ + if (!c || c->trgr < 3) { //either ready or end + return 0; + } + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + //todo at some point, removal from selpair need to be done based on pos as well + uint64_t sel = UINT64_MAX, ins = UINT64_MAX; + ce_t *e = c->head_sel, *f = c->head_ins, *p = NULL; + + //get from selected or inserted list, based on ordinal + if (e) + sel = e->ord; + if (f) + ins = f->ord; + if(sel < ins) + p = e; + else + p = f; + + if (p && (c->trgr == 3 || c->trgr == 4)) { //send only upto start of wnd to maintain the order, except when it is end + if (!bam_copy1(b, p->r)) + return -1; + if (p == e) { + c->selcnt--; + c->head_sel = p->next; + if (!c->head_sel) { + c->tail_sel = NULL; + } + } + else { + c->head_ins = p->next; + c->inscnt--; + if (!c->head_ins) { + c->tail_ins = NULL; + } + } + if (!c->head_sel && !c->head_ins && c->trgr != 4) + c->trgr = 0; //not ready + else { + if (c->head_sel) hts_prefetch(c->head_sel); + if (c->head_ins) hts_prefetch(c->head_ins); + } + LG("- %s %"PRIu64"\n", bam_get_qname(b), p->ord); + LGlog(&p->log, "%s", ",retrieved"); + if (end) *end = p->len + p->r->core.pos; + retcache(c, p); + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + + return 1; + } + return 0; +} +/// @brief find a read matching to given one from non-selected list +/// @param c pointer to read cache +/// @param e pointer to read for which pair need to be found +/// @param ep pointer to previous one of the pair, to maintain list +/// @return NULL when not found and cache element pointer when found +static inline ce_t* find_nsel(rc_t *c, ce_t *e, ce_t **ep) +{ + ce_t *s = c->head_nsel; + *ep = NULL; + while (s) { + if (s->next) + hts_prefetch(s->next->r); + if (s->ord > e->ord) + break; //not found + if (s->r->core.pos == e->r->core.mpos && + s->r->core.mpos == e->r->core.pos && + s->r->core.tid == e->r->core.mtid && + s->r->core.mtid == e->r->core.mtid && !strcmp(bam_get_qname(s->r), bam_get_qname(e->r))) + return s; + *ep = s; + s = s->next; + } + return NULL; +} +/// @brief move the read/cache element from main list to selected/unselected/insert list +/// @param c pointer to cache +/// @param ep pointer to previous element to maintain the list +/// @param e element being moved +/// @param en next element +/// @param sel 1 to move read to selected list 0 to move to nselected list +/// @param ins 1 to move to insert list, relevant with sel = 1 +static inline void moveread(rc_t *c, ce_t *ep, ce_t *e, ce_t* en, int sel, int ins) +{ + int paired = (e->r->core.flag & BAM_FPAIRED) && + !(e->r->core.flag & BAM_FUNMAP) && !(e->r->core.flag & BAM_FMUNMAP) && + e->r->core.mtid != -1 && e->r->core.mpos != -1; + if (!ins) { //remove from cache + assert(c->head == e); //always be head as they are moved to either selected or nsel + if (en) + en->prev = e->prev; + c->head = en; + if (!c->head) + c->tail = c->head; + c->rcnt--; + } else { //remove from nsel + if (ep) { + ep->next = en; + if (en) + en->prev = ep; + } + else { + c->head_nsel = en; + if (en) + en->prev = NULL; + } + if (!en) + c->tail_nsel = ep ? ep : NULL; + + c->nselcnt--; + } + e->next = NULL; + e->prev = NULL; + + if (sel) { + LG("mv %"PRIu64" sel\n", e->ord); + ins ? c->inscnt++ : c->selcnt++; + //insert in required pos, starting from tail + ce_t *s = ins? c->tail_ins : c->tail_sel, *p = NULL; + if (s && s->ord < e->ord) { //shortcut + s->next = e; + e->next = NULL; + e->prev = s; + if(ins) + c->tail_ins = e; + else + c->tail_sel = e; + return; + } + while (s) { + if (s->ord < e->ord) { //add in ascending order + break; + } + s = s->prev; + } + if (!s) { //as head + if (ins) { + p = c->head_ins; + c->head_ins = e; + } + else { + p = c->head_sel; + c->head_sel = e; + } + e->prev = NULL; + e->next = p; + if(p) + p->prev = e; + else { //update tail + if (ins) + c->tail_ins = e; + else + c->tail_sel = e; + } + return; + } else { + p = s->next; + s->next = e; + e->prev = s; + e->next = p; + if (p) + p->prev = e; + return; + } + return; + } else if (paired) { //move to nsel if paired, otherwise discard and return cache + c->nselcnt++; + LG("mv %"PRIu64" nsel\n", e->ord); + //add to non-selected list, for pair lookup + ce_t *s = c->tail_nsel, *p = NULL; + assert (!s || s->r->core.tid == e->r->core.tid); + if (s && s->r->core.pos < e->r->core.pos) { //shortcut + s->next = e; + e->next = NULL; + e->prev = s; + c->tail_nsel = e; + return; + } + //find pos and fit, in order of increasing pos, that it is easy to remove + while (s) { + if (s->r->core.pos < e->r->core.pos) { + break; + } + s = s->prev; + } + if (!s) { //add as head + p = c->head_nsel; + c->head_nsel = e; + e->prev = NULL; + e->next = p; + if(p) + p->prev = e; + if (!p) + c->tail_nsel = e; + return; + } else { + p = s->next; + s->next = e; + e->prev = s; + e->next = p; + if (p) + p->prev = e; + return; + } + return; + } else { //non selected, non paired reads, release them + LGlog(&e->log, "%s", "npair,,,disc"); + retcache(c, e); + return; + } +} +/// @brief reset cache status, for next tid/iterator... +/// @param c pointer to read cache +static inline void resetdepth(rc_t* c) +{ + c->w_st = -1; + if (c->dp_sz <= 0 || !c->dpth) + return; + LG("reset\n"); + memset(c->dpth, 0, c->dp_sz * sizeof(int)); + ce_t *en = NULL; + //clear all from previous tid + while (c->head && c->head->r->core.tid != c->tid) { + en = c->head->next; + c->rcnt--; + retcache(c, c->head); + c->head = en; + } + if (!c->head) c->tail = NULL; + else c->head->prev = NULL; + + //clear whole non selected ones + while (c->head_nsel) { + en = c->head_nsel->next; + c->nselcnt--; + retcache(c, c->head_nsel); + c->head_nsel = en; + } + assert(!c->head_nsel && !c->nselcnt); + if(c->head_sel) assert(c->head_sel->r->core.tid == c->tail_sel->r->core.tid); + if(c->head_nsel) assert(c->head_nsel->r->core.tid == c->tail_nsel->r->core.tid); + if(c->head_ins) assert(c->head_ins->r->core.tid == c->tail_ins->r->core.tid); + + LG("reset: t %"PRIu64" s %"PRIu64" i %"PRIu64" n %"PRIu64"; nxt %d\n", c->rcnt, c->selcnt,c->inscnt, c->nselcnt, c->tid); + c->tail_nsel = NULL; +} + +/// @brief process the cache and find reads relevant based on depth +/// @param c pointer to read cache +/// @return -ve on error, 0 on success +int processcache(rc_t *c) +{ + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + assert(!c->inscnt && !c->selcnt); + assert(!c->head_sel && !c->tail_sel); + assert(!c->head_ins && !c->tail_ins); + + /* there is a window which starts at pos of 1st read and adds reads upto + a read past the end pos to cache. a read past the end, a tid change or end + of file could be a trigger to start processing the cached reads. they are + expected as sorted by pos and kept in descending order of reference + consumption (given by bam_cigar2rlen) if have same pos. + + iterate through cached reads, check if it is expected as pair of an + earlier selected read otherwise use cigar to calc depth and decide whether + read is needed or not. move required reads to sel list and unwanted paired + ones to nsel list. check whether selected read's pair is awaited or already + processed based on pos values. add to selected pair hash for awaiting ones + and search in nsel list for already processed ones. if a pair is found, move + to ins list. sel and ins list are in order of reading / as in source. + + processing will stop at tid change or once whole cache is processed. window + will be adjusted to include the read found outside the end pos. start will + move by same amount. if there is gap b/w start and next read, start moves to + that pos and end gets extended. any read which falls outside the new start + in nsel list is discarded. + + the 1st one from sel / ins list is removed and passed until both are empty. + the read and processig continues with new window extremities. + */ + ce_t *e = c->head, *en = NULL; + pair_exp *p = NULL; + khiter_t it; + int sel = 0, ret = -1; + int chkpair, foundpair; + LG("pc: t %"PRIu64" s %"PRIu64" i %"PRIu64" n %"PRIu64"\n", c->rcnt, c->selcnt,c->inscnt, c->nselcnt); + while (e) { + sel = foundpair = 0; + chkpair = (e->r->core.flag & BAM_FPAIRED) && !(e->r->core.flag & BAM_FMUNMAP); + en = e->next; + if (c->tid == e->r->core.tid) { + if (c->trgr != 2) { //if not wnd full, it is either end or tid change + resetdepth(c); + ret = 0; + break; //last one / one that triggered the processing; on next iteration + } else { //wnd full, process and move wnd + if (e->r->core.pos >= c->w_en) { //done enough + LG("* wnd full,[%"PRIhts_pos" - %"PRIhts_pos"] processed upto %"PRIhts_pos"\n", c->w_st, c->w_en, e->r->core.pos); + break; + } + } + } + LG("* checking %s %"PRIu64"\n", bam_get_qname(e->r), e->ord); + if (e->r->core.flag & BAM_FUNMAP) {//unmapped, select anyway + //selectread(c, ep, e, en, 0); //add to selected list + moveread(c, NULL, e, en, 1, 0); //add to selected list + LG("* s unmap %s %"PRIu64"\n", bam_get_qname(e->r), e->ord); + LGlog(&e->log, "%s", "sel,umap,,"); + e = en; + continue; + } + //LGlog(&e->log,","); + if (chkpair) { //paired and mate mapped + //have to remove from map as ce_t are freed; also they can't be modified while in cache + if ((it = kh_get(kh_pair, c->selpair, bam_get_qname(e->r))) != kh_end(c->selpair)) { + if (kh_exist(c->selpair, it)) { //iterate and find pair to this + p = &kh_val(c->selpair, it); + if (p->mpos == e->r->core.pos && + p->mtid == e->r->core.tid && + p->pos == e->r->core.mpos && + p->tid == e->r->core.mtid) { //pair already selected + kh_del(kh_pair, c->selpair, it); //remove from expected pairs + foundpair = 1; + moveread(c, NULL, e, en, 1, 0); //select this + sel = 1; + } else { + kh_del(kh_pair, c->selpair, it); //remove from expected pairs + //not possible to have duplicate on qname, chk n confirm + it = kh_end(c->selpair); + } + } + } + } + if (!sel) { + //check depth + int r = 0; + if ((r = readrequired(c, e)) > 0) { //read required + moveread(c, NULL, e, en, 1, 0); //select this + sel = 1; + } else if (r < 0) { + goto fail; + } + } + if (sel) { + if (updatedepth(c, e, 0) == -1) + goto fail; + LG("* s %s %"PRIu64" wnd:%"PRIhts_pos"-%"PRIhts_pos"", bam_get_qname(e->r), e->ord, c->w_st, c->w_en); + LGlog(&e->log, "%s", "sel,"); + if (chkpair && !foundpair) { //1st one or pair not selected + if (e->r->core.pos <= e->r->core.mpos) { //add only if it is yet to be processed, sorted data! + int r = 0; + it = kh_put(kh_pair, c->selpair, bam_get_qname(e->r), &r); + if (r == -1) + goto fail; + pair_exp *p = &kh_val(c->selpair, it); + p->pos = e->r->core.pos; p->tid = e->r->core.tid; + p->mpos = e->r->core.mpos; p->mtid = e->r->core.mtid; + LG(" PAIR expected"); + LGlog(&e->log, "%s", "paired,,"); + } else { + //do it after finishing the loop, to avoid issues with ep/epp... + //have to insert them based on ord., if not found, discard. if eq. limit there if done here. + ce_t *o = NULL, *op = NULL; + if ((o = find_nsel(c, e, &op))) { + if (o->r->core.pos >= c->w_st) { //only if order can be maintained + moveread(c, op, o, o->next, 1, 1); + if (updatedepth(c, o, 0) == -1) + goto fail; + LG(" inserted PAIR\n* s %s %"PRIu64" (inspair)", bam_get_qname(o->r), o->ord); + LGlog(&o->log, "%s", "paired,inserted,"); + } + LGlog(&e->log, "%s", "paired,nsel,"); + } else { + LG(" no PAIR"); + LGlog(&e->log, "%s", "paired,notfound,"); + } + } + } else if (foundpair) { + LG(" found PAIR"); + LGlog(&e->log, "%s", "paired,found,"); + } else { + LG(" no PAIR"); + LGlog(&e->log, "%s", "notpaired,NA,"); + } + LG("\n"); + } + else { + LG("* d %s %"PRIu64"\n", bam_get_qname(e->r), e->ord); + LGlog(&e->log, "%s", "nsel,"); + moveread(c, NULL, e, en, 0, 0); //remove as non-selected + } + e = en; //chk with next one + } + if (c->trgr == 2) { //2 --> wnd full, processed, move wnd + en = NULL; + hts_pos_t adj = c->tail ? c->tail->r->core.pos - c->w_en : 0; //last one, out of window - current end + hts_pos_t new_st = c->w_st + adj; + hts_pos_t bkp_st = c->w_st; + int rem = 0; + while (c->head_nsel && c->head_nsel->r->core.pos < new_st) { //holding until wnd passes mate pos, but anything after this which has already passed out is held until this is cleared! + rem = 1; + en = c->head_nsel->next; + LG("* nsel discarded %s %"PRIu64"\n", bam_get_qname(c->head_nsel->r), c->head_nsel->ord); + LGlog(&c->head_nsel->log,"%s",",,,nsel-disc,"); + c->nselcnt--; + retcache(c, c->head_nsel); + if(!(c->head_nsel = en)) c->tail_nsel = NULL; + } + if (rem) { + LG("* wnd full, removed items from head_nsel\n")//fprintf(fp1, "* wnd full, removed items from head_nsel\n"); + } + else { + LG("* wnd full, 0 removed items from head_nsel, [%"PRIhts_pos"-%"PRIhts_pos"] %"PRIhts_pos"\n", c->w_st, c->w_en, c->head_nsel?c->head_nsel->r->core.pos : 0)//fprintf(fp1, "* wnd full, removed items from head_nsel\n") + } + assert(e == c->head); + c->w_st = c->head ? c->head->r->core.pos : new_st; //move wnd + c->w_en = c->w_st + c->wndsz; + adj = c->w_st - bkp_st; + if (adj >= c->dp_sz) { + memset(c->dpth, 0, c->dp_sz * sizeof(int)); + c->dp_en = c->w_st + c->dp_sz; + LG("0 dpth buffer\n"); + } else { + LG("adj %"PRIhts_pos", mv %"PRIhts_pos"-%"PRIhts_pos",", adj, c->w_st+adj, c->w_st+c->dp_sz); + LG("0 set %"PRIhts_pos" - %"PRIhts_pos"\n", c->w_st+c->dp_sz-adj,c->w_st+c->dp_sz); + memmove(c->dpth, c->dpth + adj, (c->dp_sz - adj) * sizeof(int)); + memset(c->dpth + c->dp_sz - adj, 0, adj * sizeof(int)); + c->dp_en += adj; + assert(c->dp_en == (c->w_st+c->dp_sz)); + } + LG("* wnd moved, %"PRIhts_pos" - %"PRIhts_pos", dpth %"PRIhts_pos" - %"PRIhts_pos"; s %"PRIu64" i %"PRIu64" ns %"PRIu64"\n", c->w_st, c->w_en, c->w_st, c->dp_en, c->selcnt, c->inscnt, c->nselcnt); + c->trgr = 3; //reset full status n get already processedn + } + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + LG("pc2: t %"PRIu64" s %"PRIu64" i %"PRIu64" n %"PRIu64"\n", c->rcnt, c->selcnt,c->inscnt, c->nselcnt); + + if (c->head_sel) + hts_prefetch(c->head_sel); + if (c->head_ins) + hts_prefetch(c->head_ins); + return ret; +fail: + LG(" FAIL\n"); + return -1; +} + +//wrappers / for iterators +void* getsamcache(hts_itr_t *itr, void *data) +{ + htsFile *fp = NULL; + if (itr && itr->usecache) + fp = (htsFile*)data; + return fp ? fp->c : NULL; +} + +int getfromreadcache_iter(void *p, void *s, int *tid, hts_pos_t *beg, hts_pos_t* end) //? +{ + rc_t *c = (rc_t*)p; + bam1_t *b = (bam1_t*)s; + int ret = getfromreadcache(c, b, end); + if (ret > 0) { + *tid = b->core.tid; + *beg = b->core.pos; + } + + if (!ret) { + assert(!c->inscnt && !c->selcnt); + assert(!c->head_sel && !c->tail_sel); + assert(!c->head_ins && !c->tail_ins); + } + + return ret; +} + +void *getcache_iter(void *data) //? +{ + htsFile *fp = (htsFile*)data; + rc_t *c = (rc_t*)fp->c; + assert(c->cache.m == c->cache.f+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + void *p = getcache(fp); + assert(c->cache.m == c->cache.f+1+c->rcnt+c->inscnt+c->selcnt+c->nselcnt); + return p; +} + +void *getreadbuffer_iter(void *p) +{ + ce_t* e = (ce_t*)p; + return e->r; +} + +void notifyend_iter(void *p, void *e) +{ + rc_t *c = (rc_t*)p; + c->trgr = 4;//end + c->tid = -2;//reset that it doesn't match to any + retcache(c, (ce_t*)e); +} + +int addtoreadcache_iter(void *c, void *s, int *sts) +{ + return addtoreadcache(c, s, sts); +} + +int processcache_iter(void *c) +{ + return processcache((rc_t*)c); +} + +void resetcache_itr(rc_t *c) +{ + resetdepth(c); + c->trgr = 0; + c->tid = -2; + c->w_st = c->w_en = -1; +} diff --git a/sam_cache.h b/sam_cache.h new file mode 100644 index 000000000..2e42c980e --- /dev/null +++ b/sam_cache.h @@ -0,0 +1,106 @@ +/* sam_cache.h -- Functions to create a cache of reads for depth handling + + Copyright (C) 2026 Genome Research Ltd. + + Author: Vasudeva Sarma + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. */ +#ifndef HTSLIB_SAMCACHE_H +#define HTSLIB_SAMCACHE_H + +#include "htslib/sam.h" +#include "htslib/khash.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct ce_t {//cache element + uint64_t ord; //ordinal + bam1_t *r; + struct ce_t *next, *prev; + uint64_t len; + kstring_t log; +} ce_t; + +typedef struct cache_t {//cache + int f, m, n; //free, max, chunks(of 1024?) + ce_t **p; + struct ce_t *head, *tail; +} cache_t; + +typedef struct pair_exp { + int mtid, tid; + hts_pos_t mpos, pos; +} pair_exp; +KHASH_MAP_INIT_STR(kh_pair, pair_exp) + +typedef struct rc_t { + cache_t cache; //cache of mem space + ce_t *head, *tail; //alignments + ce_t *head_sel, *tail_sel; //selected alignments + ce_t *head_nsel, *tail_nsel; //non-selected alignments + ce_t *head_ins, *tail_ins; //inserted alignments + uint64_t selcnt, nselcnt, inscnt,rcnt; + uint64_t ord; //last ordinal + int trgr; //sts: 0 not ready 1 caching 2 wnd full 3 ready 4 end + int wndsz, maxdpth, itr; + hts_pos_t w_st, w_en, dp_en; + khash_t(kh_pair) *selpair; + int dp_sz, /*dp_st,*/ tid; + int *inc, inc_sz; + int *dpth; +} rc_t; + + +void destroycache(htsFile *fp); +int setupcache(htsFile *fp, int wndsz, int maxdpth); + +void retcache(rc_t *c, ce_t* elem); +ce_t* getcache(htsFile *fp); +int addtoreadcache(rc_t *c, ce_t *e, int *sts); +int getfromreadcache(rc_t *c, bam1_t *b, hts_pos_t *end); +int processcache(rc_t *c); + +//wrapper / for iterators +void* getsamcache(hts_itr_t *itr, void *data); +int getfromreadcache_iter(void *c, void *s, int *tid, hts_pos_t *beg, hts_pos_t* end); +void *getcache_iter(void *data); +void *getreadbuffer_iter(void *e); +void notifyend_iter(void *c, void *e); +int addtoreadcache_iter(void *c, void *s, int *sts); +int processcache_iter(void *c); +void resetcache_itr(rc_t *c); + +#ifdef CACHE_DBG_LOG +extern FILE *clogfp; +//this is closed by system on exit! +#define LG(...) {if (!clogfp) clogfp = fopen("/tmp/op","w"); if (clogfp) { fprintf(clogfp, __VA_ARGS__);}} +#define LGlog(s,...) ksprintf(s,__VA_ARGS__) +#else +#define LG(...) ; +#define LGlog(s,...) ; +#endif //CACHE_DBG_LOG + +#ifdef __cplusplus +} +#endif + +#endif //HTSLIB_SAMCACHE_H + diff --git a/test/cache.exp.1.T1T3T2.sam b/test/cache.exp.1.T1T3T2.sam new file mode 100644 index 000000000..9c8de88ed --- /dev/null +++ b/test/cache.exp.1.T1T3T2.sam @@ -0,0 +1,28 @@ +@HD VN:1.17 SO:coordinate +@SQ SN:T1 LN:40 +@SQ SN:T2 LN:40 +@SQ SN:T3 LN:40 +@CO @SQ SN* LN* AH AN AS DS M5 SP TP UR +@CO @RG ID* BC CN DS DT FO KS LB PG PI PL PM PU SM +@CO @PG ID* PN CL PP DS VN +@CO this is a dummy alignment file to demonstrate different abilities of hts apis +@CO QNAME FLAG RNAME POS MAPQ CIGAR RNEXT PNEXT TLEN SEQ QUAL [TAG:TYPE:VALUE]… +@CO 1234567890123456789012345678901234567890 +@CO AAAAACTGAAAACCCCTTTTGGGGACTGTTAACAGTTTTT T1 +@CO TTTTCCCCACTGAAAACCCCTTTTGGGGACTGTTAACAGT T2 +ITR1 99 T1 5 40 4M = 33 10 ACTG ()() +UNMP1 73 T1 21 40 3M * 0 5 GGG &&1 +A1 99 T1 25 35 6M = 31 8 ACTGTT ****** +B1 99 T1 25 35 6M = 31 8 GCTATT ****** +A1 147 T1 31 33 6M = 25 -8 ACTGTT ()()() +ITR1 147 T1 33 37 4M = 5 -10 ACTG $$$$ +A5 99 T3 12 50 3M = 23 5 GAA ()( +B5 99 T3 12 50 3M = 23 5 GAT ()( +ITR3 147 T3 23 49 2M = 35 -10 TT ** +B5 147 T3 23 47 2M1X = 12 -5 TTG ((( +A5 147 T3 23 47 2M1X = 12 -5 TAG ((( +ITR3 99 T3 35 51 2M = 23 10 AA && +B4 99 T2 12 50 3M = 23 5 GAT ()( +ITR2 147 T2 23 49 2M = 35 -10 TT ** +B4 147 T2 23 47 2M1X = 12 -5 TAG ((( +ITR2 99 T2 35 51 2M = 23 10 AA && diff --git a/test/cache.exp.1.sam b/test/cache.exp.1.sam new file mode 100644 index 000000000..ac84d810c --- /dev/null +++ b/test/cache.exp.1.sam @@ -0,0 +1,30 @@ +@HD VN:1.17 SO:coordinate +@SQ SN:T1 LN:40 +@SQ SN:T2 LN:40 +@SQ SN:T3 LN:40 +@CO @SQ SN* LN* AH AN AS DS M5 SP TP UR +@CO @RG ID* BC CN DS DT FO KS LB PG PI PL PM PU SM +@CO @PG ID* PN CL PP DS VN +@CO this is a dummy alignment file to demonstrate different abilities of hts apis +@CO QNAME FLAG RNAME POS MAPQ CIGAR RNEXT PNEXT TLEN SEQ QUAL [TAG:TYPE:VALUE]… +@CO 1234567890123456789012345678901234567890 +@CO AAAAACTGAAAACCCCTTTTGGGGACTGTTAACAGTTTTT T1 +@CO TTTTCCCCACTGAAAACCCCTTTTGGGGACTGTTAACAGT T2 +ITR1 99 T1 5 40 4M = 33 10 ACTG ()() +UNMP1 73 T1 21 40 3M * 0 5 GGG &&1 +A1 99 T1 25 35 6M = 31 8 ACTGTT ****** +B1 99 T1 25 35 6M = 31 8 GCTATT ****** +A1 147 T1 31 33 6M = 25 -8 ACTGTT ()()() +ITR1 147 T1 33 37 4M = 5 -10 ACTG $$$$ +B4 99 T2 12 50 3M = 23 5 GAT ()( +ITR2 147 T2 23 49 2M = 35 -10 TT ** +B4 147 T2 23 47 2M1X = 12 -5 TAG ((( +ITR2 99 T2 35 51 2M = 23 10 AA && +A5 99 T3 12 50 3M = 23 5 GAA ()( +B5 99 T3 12 50 3M = 23 5 GAT ()( +ITR3 147 T3 23 49 2M = 35 -10 TT ** +B5 147 T3 23 47 2M1X = 12 -5 TTG ((( +A5 147 T3 23 47 2M1X = 12 -5 TAG ((( +ITR3 99 T3 35 51 2M = 23 10 AA && +UNMP2 141 * 0 0 * * 0 7 AA && +UNMP3 77 * 0 0 * * 0 5 GGG &&2 diff --git a/test/cache.exp.2.sam b/test/cache.exp.2.sam new file mode 100644 index 000000000..7529d96e2 --- /dev/null +++ b/test/cache.exp.2.sam @@ -0,0 +1,32 @@ +@HD VN:1.17 SO:coordinate +@SQ SN:T1 LN:40 +@SQ SN:T2 LN:40 +@SQ SN:T3 LN:40 +@CO @SQ SN* LN* AH AN AS DS M5 SP TP UR +@CO @RG ID* BC CN DS DT FO KS LB PG PI PL PM PU SM +@CO @PG ID* PN CL PP DS VN +@CO this is a dummy alignment file to demonstrate different abilities of hts apis +@CO QNAME FLAG RNAME POS MAPQ CIGAR RNEXT PNEXT TLEN SEQ QUAL [TAG:TYPE:VALUE]… +@CO 1234567890123456789012345678901234567890 +@CO AAAAACTGAAAACCCCTTTTGGGGACTGTTAACAGTTTTT T1 +@CO TTTTCCCCACTGAAAACCCCTTTTGGGGACTGTTAACAGT T2 +ITR1 99 T1 5 40 4M = 33 10 ACTG ()() +UNMP1 73 T1 21 40 3M * 0 5 GGG &&1 +A1 99 T1 25 35 6M = 31 8 ACTGTT ****** +B1 99 T1 25 35 6M = 31 8 GCTATT ****** +A1 147 T1 31 33 6M = 25 -8 ACTGTT ()()() +ITR1 147 T1 33 37 4M = 5 -10 ACTG $$$$ +A4 99 T2 12 50 3M = 23 5 GAA ()( +B4 99 T2 12 50 3M = 23 5 GAT ()( +ITR2 147 T2 23 49 2M = 35 -10 TT ** +A4 147 T2 23 47 2M1X = 12 -5 TTG ((( +B4 147 T2 23 47 2M1X = 12 -5 TAG ((( +ITR2 99 T2 35 51 2M = 23 10 AA && +A5 99 T3 12 50 3M = 23 5 GAA ()( +B5 99 T3 12 50 3M = 23 5 GAT ()( +ITR3 147 T3 23 49 2M = 35 -10 TT ** +B5 147 T3 23 47 2M1X = 12 -5 TTG ((( +A5 147 T3 23 47 2M1X = 12 -5 TAG ((( +ITR3 99 T3 35 51 2M = 23 10 AA && +UNMP2 141 * 0 0 * * 0 7 AA && +UNMP3 77 * 0 0 * * 0 5 GGG &&2 diff --git a/test/cache.exp.T1.sam b/test/cache.exp.T1.sam new file mode 100644 index 000000000..96af87a0b --- /dev/null +++ b/test/cache.exp.T1.sam @@ -0,0 +1,20 @@ +@HD VN:1.17 SO:coordinate +@SQ SN:T1 LN:40 +@SQ SN:T2 LN:40 +@SQ SN:T3 LN:40 +@CO @SQ SN* LN* AH AN AS DS M5 SP TP UR +@CO @RG ID* BC CN DS DT FO KS LB PG PI PL PM PU SM +@CO @PG ID* PN CL PP DS VN +@CO this is a dummy alignment file to demonstrate different abilities of hts apis +@CO QNAME FLAG RNAME POS MAPQ CIGAR RNEXT PNEXT TLEN SEQ QUAL [TAG:TYPE:VALUE]… +@CO 1234567890123456789012345678901234567890 +@CO AAAAACTGAAAACCCCTTTTGGGGACTGTTAACAGTTTTT T1 +@CO TTTTCCCCACTGAAAACCCCTTTTGGGGACTGTTAACAGT T2 +ITR1 99 T1 5 40 4M = 33 10 ACTG ()() +UNMP1 73 T1 21 40 3M * 0 5 GGG &&1 +A1 99 T1 25 35 6M = 31 8 ACTGTT ****** +A5 355 T1 25 35 4M = 33 5 ACTG PPPP +B1 99 T1 25 35 6M = 31 8 GCTATT ****** +B5 355 T1 25 35 4M = 33 5 AGTG PPPP +A1 147 T1 31 33 6M = 25 -8 ACTGTT ()()() +ITR1 147 T1 33 37 4M = 5 -10 ACTG $$$$ diff --git a/test/cache.sam b/test/cache.sam new file mode 100644 index 000000000..183c81fa5 --- /dev/null +++ b/test/cache.sam @@ -0,0 +1,34 @@ +@HD VN:1.17 SO:coordinate +@SQ SN:T1 LN:40 +@SQ SN:T2 LN:40 +@SQ SN:T3 LN:40 +@CO @SQ SN* LN* AH AN AS DS M5 SP TP UR +@CO @RG ID* BC CN DS DT FO KS LB PG PI PL PM PU SM +@CO @PG ID* PN CL PP DS VN +@CO this is a dummy alignment file to demonstrate different abilities of hts apis +@CO QNAME FLAG RNAME POS MAPQ CIGAR RNEXT PNEXT TLEN SEQ QUAL [TAG:TYPE:VALUE]… +@CO 1234567890123456789012345678901234567890 +@CO AAAAACTGAAAACCCCTTTTGGGGACTGTTAACAGTTTTT T1 +@CO TTTTCCCCACTGAAAACCCCTTTTGGGGACTGTTAACAGT T2 +ITR1 99 T1 5 40 4M = 33 10 ACTG ()() +UNMP1 73 T1 21 40 3M * 0 5 GGG &&1 +A1 99 T1 25 35 6M = 31 8 ACTGTT ****** +A5 355 T1 25 35 4M = 33 5 ACTG PPPP +B1 99 T1 25 35 6M = 31 8 GCTATT ****** +B5 355 T1 25 35 4M = 33 5 AGTG PPPP +A1 147 T1 31 33 6M = 25 -8 ACTGTT ()()() +ITR1 147 T1 33 37 4M = 5 -10 ACTG $$$$ +A4 99 T2 12 50 3M = 23 5 GAA ()( +B4 99 T2 12 50 3M = 23 5 GAT ()( +ITR2 147 T2 23 49 2M = 35 -10 TT ** +A4 147 T2 23 47 2M1X = 12 -5 TTG ((( +B4 147 T2 23 47 2M1X = 12 -5 TAG ((( +ITR2 99 T2 35 51 2M = 23 10 AA && +A5 99 T3 12 50 3M = 23 5 GAA ()( +B5 99 T3 12 50 3M = 23 5 GAT ()( +ITR3 147 T3 23 49 2M = 35 -10 TT ** +B5 147 T3 23 47 2M1X = 12 -5 TTG ((( +A5 147 T3 23 47 2M1X = 12 -5 TAG ((( +ITR3 99 T3 35 51 2M = 23 10 AA && +UNMP2 141 * 0 0 * * 0 7 AA && +UNMP3 77 * 0 0 * * 0 5 GGG &&2 diff --git a/test/test.pl b/test/test.pl index eaa65ea30..e54fd9c1a 100755 --- a/test/test.pl +++ b/test/test.pl @@ -50,6 +50,8 @@ run_test('test_MD',$opts); +run_test('test_cache',$opts); + run_test('test_vcf_api',$opts,out=>'test-vcf-api.out',needed_by=>'test_vcf_sweep'); run_test('test_bcf2vcf',$opts); run_test('test_vcf_sweep',$opts,out=>'test-vcf-sweep.out'); @@ -1064,6 +1066,46 @@ sub test_MD } } +# Tests hts lib sam cache +sub test_cache +{ + my ($opts) = @_; + $test_view_failures = 0; + + print "\ntest_cache:\n"; + # high depth or all in + testv $opts, "./test_view -i hts_maxdepth=10 -p $$opts{tmp}/cache.tmp.10.sam $$opts{path}/cache.sam"; + testv $opts, "./compare_sam.pl $$opts{tmp}/cache.tmp.10.sam $$opts{path}/cache.sam"; + # filter as much as possible + testv $opts, "./test_view -i hts_maxdepth=1 -p $$opts{tmp}/cache.tmp.1.sam $$opts{path}/cache.sam"; + testv $opts, "./compare_sam.pl $$opts{tmp}/cache.tmp.1.sam $$opts{path}/cache.exp.1.sam"; + # different depth val + testv $opts, "./test_view -i hts_maxdepth=2 -p $$opts{tmp}/cache.tmp.2.sam $$opts{path}/cache.sam"; + testv $opts, "./compare_sam.pl $$opts{tmp}/cache.tmp.2.sam $$opts{path}/cache.exp.2.sam"; + + #using iterators + testv $opts, "./test_view -b -x $$opts{tmp}/cache.tmp.bam.bai -p $$opts{tmp}/cache.tmp.bam $$opts{path}/cache.sam"; + + testv $opts, "./test_view -i hts_maxdepth=10 -p $$opts{tmp}/cache.tmp.10.T1.sam $$opts{tmp}/cache.tmp.bam T1"; + testv $opts, "./compare_sam.pl $$opts{tmp}/cache.tmp.10.T1.sam $$opts{path}/cache.exp.T1.sam"; + # filter as much as possible + testv $opts, "./test_view -i hts_maxdepth=1 -p $$opts{tmp}/cache.tmp.1.T1T3T2.sam $$opts{tmp}/cache.tmp.bam T1 T3 T2"; + testv $opts, "./compare_sam.pl $$opts{tmp}/cache.tmp.1.T1T3T2.sam $$opts{path}/cache.exp.1.T1T3T2.sam"; + # different depth val + testv $opts, "./test_view -i hts_maxdepth=2 -p $$opts{tmp}/cache.tmp.2.sam $$opts{tmp}/cache.tmp.bam T1 T2 T3"; + testv $opts, "head -n 30 $$opts{path}/cache.exp.2.sam > $$opts{tmp}/cache.tmp.exp.sam"; + testv $opts, "./compare_sam.pl $$opts{tmp}/cache.tmp.2.sam $$opts{tmp}/cache.tmp.exp.sam"; + # multi region iterator + testv $opts, "./test_view -M -i hts_maxdepth=2 -p $$opts{tmp}/cache.tmp.2.T3T1T2.sam $$opts{tmp}/cache.tmp.bam T3 T1 T2"; + testv $opts, "./compare_sam.pl $$opts{tmp}/cache.tmp.2.T3T1T2.sam $$opts{tmp}/cache.tmp.exp.sam"; + + if ($test_view_failures == 0) { + passed($opts, "cache tests"); + } else { + failed($opts, "cache tests", "$test_view_failures subtests failed"); + } +} + sub test_index { my ($opts, $nthreads) = @_;