diff --git a/core-relations/src/offsets/mod.rs b/core-relations/src/offsets/mod.rs index 329e2232f..ee4cec7f8 100644 --- a/core-relations/src/offsets/mod.rs +++ b/core-relations/src/offsets/mod.rs @@ -1,4 +1,4 @@ -use std::{cmp, fmt, mem}; +use std::{cmp, cmp::Ordering, fmt, mem}; use crate::numeric_id::{NumericId, define_id}; @@ -18,9 +18,10 @@ mod tests; /// of it. pub(crate) trait Offsets { // A half-open range enclosing the offsets in this sequence. - fn bounds(&self) -> Option<(RowId, RowId)>; + fn bounds(&self) -> (RowId, RowId); fn is_empty(&self) -> bool { - self.bounds().is_none_or(|(lo, hi)| lo == hi) + let (lo, hi) = self.bounds(); + lo == hi } fn offsets(&self, f: impl FnMut(RowId)); } @@ -32,8 +33,8 @@ pub struct OffsetRange { } impl Offsets for OffsetRange { - fn bounds(&self) -> Option<(RowId, RowId)> { - Some((self.start, self.end)) + fn bounds(&self) -> (RowId, RowId) { + (self.start, self.end) } fn offsets(&self, f: impl FnMut(RowId)) { @@ -113,7 +114,7 @@ impl Clear for SortedOffsetVector { } impl Offsets for SortedOffsetVector { - fn bounds(&self) -> Option<(RowId, RowId)> { + fn bounds(&self) -> (RowId, RowId) { self.slice().bounds() } @@ -184,14 +185,18 @@ impl SortedOffsetSlice { Err(i) } } + + pub(crate) fn contains(&self, row: RowId) -> bool { + self.scan_for_offset(0, row).is_ok() + } } impl Offsets for SortedOffsetSlice { - fn bounds(&self) -> Option<(RowId, RowId)> { - Some(( - *self.0.first()?, - RowId::from_usize(self.0.last()?.index() + 1), - )) + fn bounds(&self) -> (RowId, RowId) { + match (self.0.first(), self.0.last()) { + (Some(&lo), Some(&hi)) => (lo, RowId::from_usize(hi.index() + 1)), + _ => (RowId::new(0), RowId::new(0)), + } } fn offsets(&self, f: impl FnMut(RowId)) { @@ -200,11 +205,8 @@ impl Offsets for SortedOffsetSlice { } impl Offsets for &'_ SortedOffsetSlice { - fn bounds(&self) -> Option<(RowId, RowId)> { - Some(( - *self.0.first()?, - RowId::from_usize(self.0.last()?.index() + 1), - )) + fn bounds(&self) -> (RowId, RowId) { + (*self).bounds() } fn offsets(&self, f: impl FnMut(RowId)) { @@ -212,23 +214,192 @@ impl Offsets for &'_ SortedOffsetSlice { } } +/// A bitvector representation for a subset, using 256-bit blocks. +/// +/// Each block covers 256 consecutive row IDs starting at a block-aligned offset. +/// The block_starts array holds the start position of each block (always a multiple of 256), +/// and blocks holds the corresponding 256-bit bitvectors as four u64 words. +#[derive(Default, Clone, PartialEq, Eq, Debug, Hash)] +pub struct BitVecSubset { + block_starts: Vec, + blocks: Vec<[u64; 4]>, + size: usize, +} + +impl Offsets for BitVecSubset { + fn bounds(&self) -> (RowId, RowId) { + let lo = 'search: { + for (start, block) in self.block_starts.iter().zip(self.blocks.iter()) { + for (wi, &word) in block.iter().enumerate() { + if word != 0 { + let bit = word.trailing_zeros() as usize; + break 'search RowId::from_usize(*start as usize + wi * 64 + bit); + } + } + } + return (RowId::new(0), RowId::new(0)); + }; + let hi = { + let mut result = RowId::new(0); + 'outer: for (start, block) in self.block_starts.iter().zip(self.blocks.iter()).rev() { + for (wi, &word) in block.iter().enumerate().rev() { + if word != 0 { + let bit = 63 - word.leading_zeros() as usize; + result = RowId::from_usize(*start as usize + wi * 64 + bit + 1); + break 'outer; + } + } + } + result + }; + (lo, hi) + } + + fn offsets(&self, mut f: impl FnMut(RowId)) { + for (start, block) in self.block_starts.iter().zip(self.blocks.iter()) { + for (wi, &word) in block.iter().enumerate() { + let mut w = word; + while w != 0 { + let bit = w.trailing_zeros() as usize; + f(RowId::from_usize(*start as usize + wi * 64 + bit)); + w &= w - 1; + } + } + } + } +} + +impl BitVecSubset { + fn block_start_for(row: RowId) -> u32 { + (row.index() as u32 / 256) * 256 + } + + /// Set a bit for `row`, which must be >= all previously set bits (sorted insertion). + fn push_sorted(&mut self, row: RowId) { + let start = Self::block_start_for(row); + if self.block_starts.last().copied() != Some(start) { + self.block_starts.push(start); + self.blocks.push([0u64; 4]); + } + let i = self.blocks.len() - 1; + let bit_idx = row.index() - start as usize; + self.blocks[i][bit_idx / 64] |= 1u64 << (bit_idx % 64); + self.size += 1; + } + + pub(crate) fn contains(&self, row: RowId) -> bool { + let start = Self::block_start_for(row); + match self.block_starts.binary_search(&start) { + Ok(i) => { + let bit_idx = row.index() - start as usize; + (self.blocks[i][bit_idx / 64] >> (bit_idx % 64)) & 1 == 1 + } + Err(_) => false, + } + } + + pub(crate) fn size(&self) -> usize { + self.size + } + + fn retain(&mut self, mut f: impl FnMut(RowId) -> bool) { + for (start, block) in self.block_starts.iter().zip(self.blocks.iter_mut()) { + for (wi, word) in block.iter_mut().enumerate() { + let mut w = *word; + while w != 0 { + let bit = w.trailing_zeros() as usize; + let row = RowId::from_usize(*start as usize + wi * 64 + bit); + if !f(row) { + *word &= !(1u64 << bit); + } + w &= w - 1; + } + } + } + let mut keep = 0; + let mut new_size = 0; + for i in 0..self.blocks.len() { + let block_size: usize = self.blocks[i].iter().map(|w| w.count_ones() as usize).sum(); + if block_size > 0 { + self.block_starts.swap(keep, i); + self.blocks.swap(keep, i); + new_size += block_size; + keep += 1; + } + } + self.block_starts.truncate(keep); + self.blocks.truncate(keep); + self.size = new_size; + } + + /// AND this bitvec with `other`, keeping only bits present in both. + fn intersect_with(&mut self, other: &BitVecSubset) { + let mut keep = 0; + let mut si = 0; + let mut oi = 0; + let mut new_size = 0; + while si < self.block_starts.len() && oi < other.block_starts.len() { + match self.block_starts[si].cmp(&other.block_starts[oi]) { + Ordering::Equal => { + let mut new_block = [0u64; 4]; + #[allow(clippy::needless_range_loop)] + for k in 0..4 { + new_block[k] = self.blocks[si][k] & other.blocks[oi][k]; + } + let block_size: usize = new_block.iter().map(|w| w.count_ones() as usize).sum(); + if block_size > 0 { + self.block_starts[keep] = self.block_starts[si]; + self.blocks[keep] = new_block; + new_size += block_size; + keep += 1; + } + si += 1; + oi += 1; + } + Ordering::Less => { + si += 1; + } + Ordering::Greater => { + oi += 1; + } + } + } + self.block_starts.truncate(keep); + self.blocks.truncate(keep); + self.size = new_size; + } + + pub(crate) fn from_sorted_slice(slice: &SortedOffsetSlice) -> BitVecSubset { + let mut bv = BitVecSubset::default(); + for row in slice.iter() { + bv.push_sorted(row); + } + bv + } +} + +const BITVEC_DENSITY_THRESHOLD: f64 = 0.05; + #[derive(Copy, Clone)] pub enum SubsetRef<'a> { Dense(OffsetRange), Sparse(&'a SortedOffsetSlice), + Bitvec(&'a BitVecSubset), } impl Offsets for SubsetRef<'_> { - fn bounds(&self) -> Option<(RowId, RowId)> { + fn bounds(&self) -> (RowId, RowId) { match self { SubsetRef::Dense(r) => r.bounds(), SubsetRef::Sparse(s) => s.bounds(), + SubsetRef::Bitvec(bv) => bv.bounds(), } } fn offsets(&self, f: impl FnMut(RowId)) { match self { SubsetRef::Dense(r) => r.offsets(f), SubsetRef::Sparse(s) => s.offsets(f), + SubsetRef::Bitvec(bv) => bv.offsets(f), } } } @@ -238,6 +409,7 @@ impl SubsetRef<'_> { match self { SubsetRef::Dense(range) => range.size(), SubsetRef::Sparse(vec) => vec.0.len(), + SubsetRef::Bitvec(bv) => bv.size(), } } @@ -249,6 +421,7 @@ impl SubsetRef<'_> { vec.extend_nonoverlapping(s); Subset::Sparse(vec) } + SubsetRef::Bitvec(bv) => Subset::Bitvec(bv.clone()), } } @@ -257,6 +430,7 @@ impl SubsetRef<'_> { match self { SubsetRef::Dense(_) => panic!("getting slice from dense subset"), SubsetRef::Sparse(slc) => slc.inner(), + SubsetRef::Bitvec(_) => panic!("getting slice from bitvec subset"), } } pub(crate) fn iter_bounded( @@ -286,28 +460,60 @@ impl SubsetRef<'_> { vec.0[start..end].iter().copied().for_each(f); next } + SubsetRef::Bitvec(bv) => { + let total = bv.size(); + let end = cmp::min(total, end); + let has_more = end < total; + let mut count = 0; + 'outer: for (block_start, block) in bv.block_starts.iter().zip(bv.blocks.iter()) { + // Skip entire blocks that fall before `start`. + let block_count = block.iter().map(|w| w.count_ones() as usize).sum::(); + if count + block_count <= start { + count += block_count; + continue; + } + for (wi, &word) in block.iter().enumerate() { + let mut w = word; + while w != 0 { + let bit = w.trailing_zeros() as usize; + if count >= start { + if count >= end { + break 'outer; + } + f(RowId::from_usize(*block_start as usize + wi * 64 + bit)); + } + count += 1; + w &= w - 1; + } + } + } + if has_more { Some(end) } else { None } + } } } } -/// Either or an offset range or a sorted offset vector. +/// Either an offset range, a sorted offset vector, or a bitvector. #[derive(Debug, Hash, PartialEq, Eq)] pub enum Subset { Dense(OffsetRange), Sparse(Pooled), + Bitvec(BitVecSubset), } impl Offsets for Subset { - fn bounds(&self) -> Option<(RowId, RowId)> { + fn bounds(&self) -> (RowId, RowId) { match self { Subset::Dense(r) => r.bounds(), Subset::Sparse(s) => s.slice().bounds(), + Subset::Bitvec(bv) => bv.bounds(), } } fn offsets(&self, f: impl FnMut(RowId)) { match self { Subset::Dense(r) => r.offsets(f), Subset::Sparse(s) => s.slice().offsets(f), + Subset::Bitvec(bv) => bv.offsets(f), } } } @@ -317,6 +523,7 @@ impl Clone for Subset { match self { Subset::Dense(r) => Subset::Dense(*r), Subset::Sparse(s) => Subset::Sparse(Pooled::cloned(s)), + Subset::Bitvec(bv) => Subset::Bitvec(bv.clone()), } } } @@ -329,9 +536,21 @@ impl Subset { match self { Subset::Dense(range) => range.size(), Subset::Sparse(vec) => vec.0.len(), + Subset::Bitvec(bv) => bv.size(), } } + /// The density of the subset: ratio of elements to the span of its bounds. + /// Returns 0.0 for empty subsets. + pub fn density(&self) -> f64 { + let (lo, hi) = self.bounds(); + let range = hi.index().saturating_sub(lo.index()); + if range == 0 { + return 0.0; + } + self.size() as f64 / range as f64 + } + pub(crate) fn is_dense(&self) -> bool { matches!(self, Subset::Dense(_)) } @@ -340,6 +559,7 @@ impl Subset { match self { Subset::Dense(r) => SubsetRef::Dense(*r), Subset::Sparse(s) => SubsetRef::Sparse(s.slice()), + Subset::Bitvec(bv) => SubsetRef::Bitvec(bv), } } @@ -355,10 +575,17 @@ impl Subset { *self = res; } Subset::Sparse(offs) => offs.retain(filter), + Subset::Bitvec(bv) => bv.retain(filter), } } + /// Remove any elements of the current subset not present in `other`. pub(crate) fn intersect(&mut self, other: SubsetRef, pool: &Pool) { + if self.is_empty() || other.is_empty() { + *self = Subset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0))); + return; + } + match (self, other) { (Subset::Dense(cur), SubsetRef::Dense(other)) => { let resl = cmp::max(cur.start, other.start); @@ -370,18 +597,24 @@ impl Subset { } } (x @ Subset::Dense(_), SubsetRef::Sparse(sparse)) => { - let (low, hi) = x.bounds().unwrap(); - if sparse.bounds().is_some() { - let mut res = pool.get(); - let l = sparse.binary_search_by_id(low); - let r = sparse.binary_search_by_id(hi); - let subslice = sparse.subslice(l, r); - res.extend_nonoverlapping(subslice); - *x = Subset::Sparse(res); - } else { - // empty range - *x = Subset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0))); - } + let (low, hi) = x.bounds(); + let mut res = pool.get(); + let l = sparse.binary_search_by_id(low); + let r = sparse.binary_search_by_id(hi); + let subslice = sparse.subslice(l, r); + res.extend_nonoverlapping(subslice); + *x = Subset::Sparse(res); + } + (x @ Subset::Dense(_), SubsetRef::Bitvec(other)) => { + let (low, hi) = x.bounds(); + let mut result = BitVecSubset::default(); + // TODO See below, use bounds to avoid enumerating rows outside the range. + other.offsets(|row| { + if row >= low && row < hi { + result.push_sorted(row); + } + }); + *x = Subset::Bitvec(result); } (Subset::Sparse(sparse), SubsetRef::Dense(dense)) => { let r = sparse.slice().binary_search_by_id(dense.end); @@ -389,17 +622,50 @@ impl Subset { sparse.retain(|row| row >= dense.start); } (Subset::Sparse(cur), SubsetRef::Sparse(other)) => { - let mut other_off = 0; - cur.retain(|rowid| match other.scan_for_offset(other_off, rowid) { - Ok(found) => { - other_off = found + 1; - true - } - Err(next_off) => { - other_off = next_off; - false - } - }) + if cur.0.len() < other.inner().len() { + let mut other_off = 0; + cur.retain(|rowid| match other.scan_for_offset(other_off, rowid) { + Ok(found) => { + other_off = found + 1; + true + } + Err(next_off) => { + other_off = next_off; + false + } + }) + } else { + let mut cur_off = 0; + let mut cur_idx = 0; + other.inner().iter().copied().for_each(|rowid| { + match cur.slice().scan_for_offset(cur_off, rowid) { + Ok(found) => { + cur_off = found + 1; + cur.0[cur_idx] = rowid; + cur_idx += 1; + } + Err(next_off) => { + cur_off = next_off; + } + } + }); + cur.0.truncate(cur_idx); + } + } + (Subset::Sparse(sparse), SubsetRef::Bitvec(bv)) => { + sparse.retain(|row| bv.contains(row)); + } + (Subset::Bitvec(bv), SubsetRef::Dense(dense)) => { + // TODO Make retain takes a bound so that it only enumerates rows within the range. + bv.retain(|row| row >= dense.start && row < dense.end); + } + (Subset::Bitvec(bv), SubsetRef::Sparse(sparse)) => { + // TODO save the result as a Sparse instead of BitVec because the result + // must be more sparse than Sparse. + bv.retain(|row| sparse.contains(row)); + } + (Subset::Bitvec(bv_self), SubsetRef::Bitvec(other)) => { + bv_self.intersect_with(other); } } } @@ -428,6 +694,16 @@ impl Subset { } Subset::Sparse(s) => { s.push(row); + // TODO: Don't switch to bitvec here, because the bounds can be an underestimate. + let (lo, hi) = s.slice().bounds(); + let range_len = hi.index().saturating_sub(lo.index()); + if range_len > 0 && s.0.len() as f64 / range_len as f64 > BITVEC_DENSITY_THRESHOLD { + let bv = BitVecSubset::from_sorted_slice(s.slice()); + *self = Subset::Bitvec(bv); + } + } + Subset::Bitvec(bv) => { + bv.push_sorted(row); } } } diff --git a/core-relations/src/offsets/tests.rs b/core-relations/src/offsets/tests.rs index 214a4b44b..763007547 100644 --- a/core-relations/src/offsets/tests.rs +++ b/core-relations/src/offsets/tests.rs @@ -11,9 +11,7 @@ fn o(u: usize) -> RowId { fn collect(range: &impl Offsets, elts: &[T]) -> Vec { let mut res = Vec::new(); range.offsets(|off| res.push(elts[off.index()].clone())); - if !res.is_empty() { - range.bounds().expect("nonempty range should have bounds"); - } + range.bounds(); res } diff --git a/core-relations/src/table/mod.rs b/core-relations/src/table/mod.rs index a857fbd7f..82e4e1b88 100644 --- a/core-relations/src/table/mod.rs +++ b/core-relations/src/table/mod.rs @@ -353,10 +353,10 @@ impl Table for SortedWritesTable { where Self: Sized, { - let Some((_low, hi)) = subset.bounds() else { - // Empty subset + if subset.is_empty() { return; - }; + } + let (_, hi) = subset.bounds(); assert!( hi.index() <= self.data.data.len(), "{} vs. {}",