数据并行

并行改变数组中元素

rayon-badge cat-concurrency-badge

下面的实例使用了 rayon crate,这是一个 Rust 程序设计语言的数据并行库。rayon 为任何并行可迭代的数据类型提供 par_iter_mut 方法。这是一个类迭代器的链,可以对链内的数据并行计算。

use rayon::prelude::*;

fn main() {
    let mut arr = [0, 7, 9, 11];
    arr.par_iter_mut().for_each(|p| *p -= 1);
    println!("{:?}", arr);
}

并行测试集合中任意或所有的元素是否匹配给定断言

rayon-badge cat-concurrency-badge

这个实例示范如何使用 rayon::anyrayon::all 方法,这两个方法是分别与 std::anystd::all 相对应的并行方法。rayon::any 并行检查迭代器的任意元素是否与断言匹配,并在找到一个匹配的元素时就返回。rayon::all 并行检查迭代器的所有元素是否与断言匹配,并在找到不匹配的元素时立即返回。

use rayon::prelude::*;

fn main() {
    let mut vec = vec![2, 4, 6, 8];

    assert!(!vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(!vec.par_iter().any(|n| *n > 8 ));
    assert!(vec.par_iter().all(|n| *n <= 8 ));

    vec.push(9);

    assert!(vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(!vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(vec.par_iter().any(|n| *n > 8 ));
    assert!(!vec.par_iter().all(|n| *n <= 8 )); 
}

使用给定断言并行搜索项

rayon-badge cat-concurrency-badge

下面的实例使用 rayon::find_anypar_iter 并行搜索 vector 集合,以查找满足指定闭包中的断言的元素。

如果有多个元素满足 rayon::find_any 闭包参数中定义的断言,rayon 将返回搜索发现的第一个元素,但不一定是 vector 集合的第一个元素。

请注意,实例中闭包的参数是对引用的引用(&&x)。有关更多详细信息,请参阅关于 std::find 的讨论。

use rayon::prelude::*;

fn main() {
    let v = vec![6, 2, 1, 9, 3, 8, 11];

    let f1 = v.par_iter().find_any(|&&x| x == 9);
    let f2 = v.par_iter().find_any(|&&x| x % 2 == 0 && x > 6);
    let f3 = v.par_iter().find_any(|&&x| x > 8);

    assert_eq!(f1, Some(&9));
    assert_eq!(f2, Some(&8));
    assert!(f3 > Some(&8));
}

对 vector 并行排序

rayon-badge rand-badge cat-concurrency-badge

本实例对字符串 vector 并行排序。

首先,分配空字符串 vector;然后,通过 par_iter_mut().for_each 并行对 vector 填充随机值。尽管存在多种选择,可以对可枚举数据类型进行排序,但 par_sort_unstable 通常比稳定排序(相同的值排序后相对顺序不变)算法快。


use rand::{Rng, thread_rng};
use rand::distributions::Alphanumeric;
use rayon::prelude::*;

fn main() {
  let mut vec = vec![String::new(); 100_000];
  vec.par_iter_mut().for_each(|p| {
    let mut rng = thread_rng();
    *p = (0..5).map(|_| rng.sample(&Alphanumeric)).collect()
  });
  vec.par_sort_unstable();
}

Map-reduce 并行计算

rayon-badge cat-concurrency-badge

此实例使用 rayon::filterrayon::map,以及 rayon::reduce 计算 Person 对象中年龄超过 30 岁的那些人的平均年龄。

rayon::filter 过滤集合中满足给定断言的元素。rayon::map 对每个元素执行一次计算,创建一个新的迭代;然后,基于前一次的 reduce 计算结果和当前元素一起,rayon::reduce 执行新的计算。也可以查看 rayon::sum,它与本实例中的 reduce 计算具有相同的结果。

use rayon::prelude::*;

struct Person {
    age: u32,
}

fn main() {
    let v: Vec<Person> = vec![
        Person { age: 23 },
        Person { age: 19 },
        Person { age: 42 },
        Person { age: 17 },
        Person { age: 17 },
        Person { age: 31 },
        Person { age: 30 },
    ];

    let num_over_30 = v.par_iter().filter(|&x| x.age > 30).count() as f32;
    let sum_over_30 = v.par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .reduce(|| 0, |x, y| x + y);

    let alt_sum_30: u32 = v.par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .sum();

    let avg_over_30 = sum_over_30 as f32 / num_over_30;
    let alt_avg_over_30 = alt_sum_30 as f32/ num_over_30;

    assert!((avg_over_30 - alt_avg_over_30).abs() < std::f32::EPSILON);
    println!("The average age of people older than 30 is {}", avg_over_30);
}

并行生成 jpg 缩略图

rayon-badge glob-badge image-badge cat-concurrency-badge cat-filesystem-badge

本实例为当前目录中的所有 .jpg 图像文件生成缩略图,然后将生成的缩略图保存在一个名为 thumbnails 的新文件夹中。

glob::glob_with 在当前目录中查找 jpeg 图像文件,rayon 通过 par_iter 方法调用 DynamicImage::resize,并行地调整图像大小。

use error_chain::error_chain;

use std::path::Path;
use std::fs::create_dir_all;

use error_chain::ChainedError;
use glob::{glob_with, MatchOptions};
use image::{FilterType, ImageError};
use rayon::prelude::*;

error_chain! {
    foreign_links {
        Image(ImageError);
        Io(std::io::Error);
        Glob(glob::PatternError);
    }
}

fn main() -> Result<()> {
    let options: MatchOptions = Default::default();
    let files: Vec<_> = glob_with("*.jpg", options)?
        .filter_map(|x| x.ok())
        .collect();

    if files.len() == 0 {
        error_chain::bail!("No .jpg files found in current directory");
    }

    let thumb_dir = "thumbnails";
    create_dir_all(thumb_dir)?;

    println!("Saving {} thumbnails into '{}'...", files.len(), thumb_dir);

    let image_failures: Vec<_> = files
        .par_iter()
        .map(|path| {
            make_thumbnail(path, thumb_dir, 300)
                .map_err(|e| e.chain_err(|| path.display().to_string()))
        })
        .filter_map(|x| x.err())
        .collect();

    image_failures.iter().for_each(|x| println!("{}", x.display_chain()));

    println!("{} thumbnails saved successfully", files.len() - image_failures.len());
    Ok(())
}

fn make_thumbnail<PA, PB>(original: PA, thumb_dir: PB, longest_edge: u32) -> Result<()>
where
    PA: AsRef<Path>,
    PB: AsRef<Path>,
{
    let img = image::open(original.as_ref())?;
    let file_path = thumb_dir.as_ref().join(original);

    Ok(img.resize(longest_edge, longest_edge, FilterType::Nearest)
        .save(file_path)?)
}