Node.js异步任务并行执行流程控制

本文由清尘发表于2020-07-20 09:01属于Node.js分类

为了让异步任务并行执行,仍然是要把任务放到数组中,但任务的存放顺序无关紧要。每个任务都应该调用处理器函数增加已完成任务的计数值。当所有任务都完成后,处理器函数应该执行后续的逻辑。

下面是一个简单的程序作为并行化流程控制的例子,它会读取几个文本文件的内容,并输出单词在整个文件中出现的次数。我们会用异步的readFile函数读取文本文件的内容,所以几个文件的读取可以并行执行。

这个程序的输出看起来应该像下面这样(尽管实际上可能要长很多):

and:3
dedication:1
family:1
far:1
is:1
life:2
me:1

简单的程序中实现并行化流程控制

const fs = require('fs');
const tasks = [];
const wordCounts = {};
const filesDir = './text';
let completedTasks = 0;

function checkIfComplete(){
  completedTasks++;
  //当所有任务全部完成后,列出文件中用到的每个单词以及用了多少次
  if(completedTasks == tasks.length){
    for(let index in wordCounts){
      console.log(`${index}:${wordCounts[index]}`);
    }
  }
}

function addWordCount(word){
  wordCounts[word] = (wordCounts[word]) ? wordCounts[word] + 1:1;
}

function countWordsInText(text){
  const words = text.toString().toLowerCase().split(/\W+/).sort();
  //对文本中出现的单词计数
  words.filter(word => word).forEach(word => addWordCount(word));
}

fs.readdir(filesDir,(err,files) => {
  //得出text目录中的文件列表
  if(err) throw err;
  //定义处理每个文件的任务。每个任务中都会调用一个异步读取文件的函数并对文件中使用的单词计数
  files.forEach(file => {
    const task = (file => {
      return () => {
        fs.readFile(file,(err,text) => {
          if(err) throw err;
          countWordsInText(text);
          checkIfComplete();
        })
      }
    })(`${filesDir}/${file}`);
    //把所有任务都添加到函数调用数组中
    tasks.push(task);
  })
  //开始并行执行所有任务
  tasks.forEach(task => task());
})

在试用这个程序之前,先在前面创建的text目录中创建一些文本文件

利用社区里的工具

社区中的很多附加模块都提供了方便好用的流程控制工具。其中比较流行的有Async、Step和Seq这三个。尽管这些都很值得一看,但下面这个例子用的还是Async。

下面这个例子是用Async实现任务序列化的一段脚本,它同时用并行化流程控制下载两个文件,然后把它们归档。

此例在微软的Windows中无法使用 因为Windows中没有tar和curl这两个命令,所以下面这个例子在Windows中无法使用。

在简单的程序中使用社区附加模块中的流程控制工具

const async = require('async');
const exec = require('child_process').exec;

//下载指定版本的Node源码
function downloadNodeVersion(version,destination,callback){
  const url = `http://nodejs.org/dist/v${version}/node-v${version}.tar.gz`;
  const filepath = `${destination}/${version}.tgz`;
  exec(`curl ${url} > ${filepath}`,callback);
}

//按顺序执行串行化任务
async.series([
  callback => {
    //并行下载
    async.parallel([
      callback => {
        console.log('Downloading Node v4.4.7...');
        downloadNodeVersion('4.4.7','/tmp',callback);
      },
      callback => {
        console.log('Downloading Node v6.3.0...');
        downloadNodeVersion('6.3.0','/tmp',callback);
      }
    ],callback);
  },
  callback => {
    //创建归档文件
    console.log('Creating archive of downloaded files...');
    exec('tar cvf node_distros.tar /tmp/4.4.7.tgz /tmp/6.3.0.tgz',err => {
      if(err) throw err;
      console.log('All done!');
      callback();
    })
  }
])

相关文章:node使用Async进行流程控制实现异步任务按顺序执行串行化控制