Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/distributing_iterator/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module DistributingIterator
VERSION = "0.0.1"
VERSION = "0.0.2"
end
8 changes: 4 additions & 4 deletions src/distribute_csv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{Context, Result};
use std::collections::VecDeque;
use anyhow::{Result, Context};

use crate::distributing_iterator;
use csv::ByteRecord;
Expand All @@ -11,14 +11,14 @@ pub fn distribute(data: &str, field: &str, spread: u64) -> Result<String> {
let headers = csv.headers()?.clone();
let field_index = headers
.iter()
.position(|header| header == field).context(format!("field `{field}` not found in CSV headers"))?;
.position(|header| header == field)
.context(format!("field `{field}` not found in CSV headers"))?;
let data = csv
.into_byte_records()
.map(|record| record.map_err(anyhow::Error::from))
.collect::<Result<VecDeque<_>>>()?;
let id_func = move |item: &ByteRecord| item[field_index].to_vec();
let iterator =
distributing_iterator::DistributingIterator::new(data, spread as usize, id_func);
let iterator = distributing_iterator::DistributingIterator::new(data, spread as usize, id_func);
let data: Vec<_> = iterator.collect();
let mut wtr = csv::Writer::from_writer(vec![]);
wtr.write_record(&headers).context("writing headers")?;
Expand Down
133 changes: 133 additions & 0 deletions src/distribute_indexes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use fnv::FnvHashMap;
use indexmap::IndexMap;
use std::collections::VecDeque;

pub fn distribute<'a, T: 'a, ID>(
data: &'a [T],
mut spread: usize,
id_func: impl Fn(&'a T) -> ID + Send + 'static,
) -> Vec<usize>
where
ID: Eq + std::hash::Hash,
{
let mut result = Vec::with_capacity(data.len());
let mut queue_per_id: FnvHashMap<ID, VecDeque<usize>> = Default::default();
let mut last_pos: IndexMap<ID, usize> = Default::default();
let mut output_pos = 0;
let mut data_pos = 0;
let mut iterator_reached_end = false;

loop {
let item = loop {
let mut result = None;
let mut adjust_spread = false;
let sorted_spreadable_ids = last_pos
.iter()
.filter(|(_id, &last_pos)| output_pos - last_pos >= spread)
.map(|(id, _last_pos)| id);
for id in sorted_spreadable_ids {
match queue_per_id.get_mut(id) {
Some(queue) => {
if let Some(item) = queue.pop_front() {
if iterator_reached_end && queue.is_empty() {
queue_per_id.remove(id);
adjust_spread = true
}
result = Some(item);
break;
}
}
None => continue,
}
}
if result.is_some() {
if adjust_spread {
spread = calculate_spread(&queue_per_id);
}
break result;
}

if iterator_reached_end {
if queue_per_id.values().flatten().any(|_| true) {
panic!(
"Nothing can be returned even though the queue is not empty. This is a bug"
);
} else {
break None;
}
}

let current_data_pos = data_pos;
data_pos += 1;

match data.get(current_data_pos) {
Some(item) => {
let id = (id_func)(item);
if !last_pos.contains_key(&id) {
break Some(current_data_pos);
} else {
queue_per_id
.entry(id)
.or_insert_with(|| VecDeque::with_capacity(100))
.push_back(current_data_pos);
}
}
None => {
spread = calculate_spread(&queue_per_id);
iterator_reached_end = true;
}
}
};
if let Some(output_idx) = item {
let id = (id_func)(&data[output_idx]);
last_pos.shift_remove(&id);
last_pos.insert(id, output_pos);
result.push(output_idx);
output_pos += 1;
} else {
break;
}
}
result
}

/// Distribute items that are themselves IDs
pub fn distribute_ids<T>(data: &[T], spread: usize) -> Vec<usize> where T: Eq + std::hash::Hash, {
distribute(data, spread, |item| item)
}

fn calculate_spread<T, ID>(queue_per_id: &FnvHashMap<ID, VecDeque<T>>) -> usize {
queue_per_id
.iter()
.filter(|(_id, queue)| !queue.is_empty())
.count()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_distribute() {
let data = vec![
"1", "1", "1", "2", "3", "3", "2", "2", "3", "3", "2", "3", "2", "3", "3",
];
let result = distribute(&data, 3, |item| item.parse::<usize>().unwrap());
assert_eq!(
result,
vec![0, 3, 4, 1, 6, 5, 2, 7, 8, 10, 9, 12, 11, 13, 14]
);
}

#[test]
fn test_distribute2() {
let data = vec!["Picture", "Post", "Video", "Video", "Picture", "Post", "Picture", "Picture", "Video"];
let result = distribute_ids(&data, 3);
let result_with_labels = result
.iter()
.map(|idx| data[*idx])
.collect::<Vec<_>>();
assert_eq!(result_with_labels, vec!["Picture", "Post", "Video", "Picture", "Post", "Video", "Picture", "Video", "Picture"]);
assert_eq!(result, vec![0, 1, 2, 4, 5, 3, 6, 8, 7]);
}
}
6 changes: 3 additions & 3 deletions src/distributing_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ where
}
}
if result.is_some() {
if adjust_spread {
self.spread = Self::calculate_spread(&queue_per_id);
}
break result;
}

Expand Down Expand Up @@ -133,9 +136,6 @@ where
}
}
};
if adjust_spread {
self.spread = Self::calculate_spread(&queue_per_id);
}
self.queue_per_id = Some(queue_per_id);
result
}
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
mod distribute_csv;
mod distribute_indexes;
mod distributing_iterator;

#[cfg(feature = "magnus")]
mod ruby_ext;

pub use distribute_csv::distribute as distribute_csv;
pub use distribute_indexes::distribute;
pub use distribute_indexes::distribute_ids;
pub use distributing_iterator::DistributingIterator;
15 changes: 9 additions & 6 deletions src/ruby_ext.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use magnus::{error::Result, function, exception, Error};
use magnus::{error::Result, exception, function, Error, Value};

use crate::distribute_csv;
use crate::{distribute_ids, distribute_csv};

fn distribute_csv_ruby(data: String, field: String, spread: u64) -> Result<String> {
match distribute_csv(&data, &field, spread) {
Ok(result) => Ok(result),
Err(e) => {
Err(Error::new(exception::standard_error(), format!("{:?}", e)))
}
Err(e) => Err(Error::new(exception::standard_error(), format!("{:?}", e))),
}
}

fn distribute_indexes_ruby(data: Vec<String>, spread: usize) -> Result<Vec<usize>> {
Ok(distribute_ids(&data, spread))
}

#[magnus::init]
fn init(ruby: &magnus::Ruby) -> Result<()> {
let module = ruby.define_module("DistributingIterator")?;
module.define_module_function("distribute", function!(distribute_csv_ruby, 3))?;
module.define_module_function("distribute_csv", function!(distribute_csv_ruby, 3))?;
module.define_module_function("distribute_indexes", function!(distribute_indexes_ruby, 2))?;
Ok(())
}