use thiserror::Error;.../// BedErrorPlus enumerates all possible errors/// returned by this library./// Based on https://nick.groenen.me/posts/rust-error-handling/#the-library-error-type#[derive(Error, Debug)]pub enum BedErrorPlus { #[error(transparent)] IOError(#[from] std::io::Error), #[error(transparent)] BedError(#[from] BedError), #[error(transparent)] ThreadPoolError(#[from] ThreadPoolBuildError),}
use thiserror::Error;[...]// https://docs.rs/thiserror/1.0.23/thiserror/#[derive(Error, Debug, Clone)]pub enum BedError { #[error("Ill-formed BED file. BED file header is incorrect or length is wrong.'{0}'")] IllFormed(String),[...]}
其余的错误处理与示例错误 1 中的相同。 最后,对于 Rust 和 Python,标准错误和自定义错误的结果都属于带有信息性错误消息的特定错误类型。 规则 6:多线程与 Rayon 和 ndarray::parallel,返回任何错误 Rust Rayon crate 提供了简单且轻量级的数据并行多线程。ndarray::parallel 模块将 Rayon 应用于数组。通常的模式是跨一个或多个 2D 数组的列(或行)并行化。面对的一个挑战是从并行线程返回任何错误消息。我将重点介绍两种通过错误处理并行化数组操作的方法。以下两个示例都出现在 Bed-Reader 的 src/lib.rs 文件中。 方法 1:par_bridge().try_for_each Rayon 的 par_bridge 将顺序迭代器变成了并行迭代器。如果遇到错误,使用 try_for_each 方法可以尽快停止所有处理。 这个例子中,我们遍历了压缩(zip)在一起的两个 things:
DNA 位置的二进制数据;
输出数组的列。
然后,按顺序读取二进制数据,但并行处理每一列的数据。我们停止了任何错误。
[... not shown, read bytes for DNA location's data ...]// Zip in the column of the output array.zip(out_val.axis_iter_mut(nd::Axis(1)))// In parallel, decompress the iid info and put it in its column.par_bridge() // This seems faster that parallel zip.try_for_each(|(bytes_vector_result, mut col)| { match bytes_vector_result { Err(e) => Err(e), Ok(bytes_vector) => { for out_iid_i in 0..out_iid_count { let in_iid_i = iid_index[out_iid_i]; let i_div_4 = in_iid_i / 4; let i_mod_4 = in_iid_i % 4; let genotype_byte: u8 = (bytes_vector[i_div_4] >> (i_mod_4 * 2)) & 0x03; col[out_iid_i] = from_two_bits_to_value[genotype_byte as usize]; } Ok(()) } }})?;
方法 2:par_azip! ndarray 包的 par_azip!宏允许并行地通过一个或多个压缩在一起的数组或数组片段。在我看来,这非常具有可读性。但是,它不直接支持错误处理。因此,我们可以通过将任何错误保存到结果列表来添加错误处理。 下面是一个效用函数的例子。完整的效用函数从三个计数和总和(count and sum)数组计算统计量(均值和方差),并且并行工作。如果在数据中发现错误,则将该错误记录在结果列表中。在完成所有处理之后,检查结果列表是否有错误。
[...]let mut result_list: Vec> = vec![Ok(()); sid_count];nd::par_azip!((mut stats_row in stats.axis_iter_mut(nd::Axis(0)), &n_observed in &n_observed_array, &sum_s in &sum_s_array, &sum2_s in &sum2_s_array, result_ptr in &mut result_list){ [...some code not shown...]});// Check the result list for errorsresult_list.par_iter().try_for_each(|x| (*x).clone())?;[...]
def get_num_threads(num_threads=None): if num_threads is not None: return num_threads if "PST_NUM_THREADS" in os.environ: return int(os.environ["PST_NUM_THREADS"]) if "NUM_THREADS" in os.environ: return int(os.environ["NUM_THREADS"]) if "MKL_NUM_THREADS" in os.environ: return int(os.environ["MKL_NUM_THREADS"]) return multiprocessing.cpu_count()
接着在 Rust 端,我们可以定义 create_pool。这个辅助函数从 num_threads 构造一个 Rayon ThreadPool 对象。