C# 多线程 10-并行编程模式 04-使用 PLINQ 实现 Map/Reduce 模式
🏷️ 《C# 多线程》
Map/Reduce 模式
Map/Reduce 功能是另一个重要的并行编程模式。它适用于小程序以及拥有大量的多个服务器端计算的场景。
该模式的含义是你有两个特殊的功能要应用于你的数据。
Map 函数
接收一组键/值列表的初始数据,并产生另一组键/值序列,将初始数据转换为适合的格式以便进行下一部处理。
Reduce 函数
使用 Map 函数的结果,并将其转换为我们真正需要的尽可能小的数据集。
示例代码
csharp
class Program
{
/// <summary>
/// 分隔符
/// </summary>
static char[] delimiters = { ' ', ',', ';', '\"', '.' };
/// <summary>
/// 使用 PLINQ 实现 Map/Reduce 模式
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
// 书籍列表
var booksList = new Dictionary<string, string>()
{
["Moby Dick; Or, The Whale by Herman Melville"] = "http://www.gutenberg.org/cache/epub/2701/pg2701.txt",
["The Adventures of Tom Sawyer by Mark Twain"] = "http://www.gutenberg.org/cache/epub/74/pg74.txt",
["Treasure Islan by Robert Louis Stevenson"] = "http://www.gutenberg.org/cache/epub/120/pg120.txt",
["The Picture of Dorian Gray by Oscar Wilde"] = "http://www.gutenberg.org/cache/epub/174/pg174.txt",
};
// 异步获取过滤词汇
HashSet<string> stopwords = DownloadStopWordsAsync().GetAwaiter().GetResult();
var output = new StringBuilder();
// 并行处理书籍
Parallel.ForEach(booksList.Keys, key => {
// 异步下载书籍
var bookContent = DownloadBookAsync(booksList[key]).GetAwaiter().GetResult();
// 异步统计书籍
string result = ProcessBookAsync(bookContent, key, stopwords).GetAwaiter().GetResult();
// 打印结果
output.Append(result);
output.AppendLine();
});
Console.Write(output.ToString());
Console.ReadLine();
}
async static Task<string> ProcessBookAsync(string bookContent, string title, HashSet<string> stopwords)
{
using (var reader = new StringReader(bookContent))
{
var query = reader.EnumLines() // 异步获取文件所有行
.AsParallel() // 并行化
.SelectMany(line => line.Split(delimiters)) // 对每一行分词
.MapReduce( // 调用自定义的 MapReduce 方法
word => new[] { word.ToLower() },
key => key,
g => new[] { new { Word = g.Key, Count = g.Count() } }
)
.ToList();
// 过滤单词并根据统计数倒序排序
var words = query
.Where(element => !string.IsNullOrEmpty(element.Word) && !stopwords.Contains(element.Word))
.OrderByDescending(element => element.Count);
var sb = new StringBuilder();
sb.AppendLine($"'{title}' book stats");
sb.AppendLine($"Top ten words used in this book:");
// 打印 TOP 10 单词
foreach (var w in words.Take(10))
{
sb.AppendLine($"Word: '{w.Word}', times used: '{w.Count}'");
}
sb.AppendLine($"Unique Words used: {query.Count()}");
return sb.ToString();
}
}
async static Task<string> DownloadBookAsync(string bookUrl)
{
using (var client = new HttpClient())
{
return await client.GetStringAsync(bookUrl);
}
}
async static Task<HashSet<string>> DownloadStopWordsAsync()
{
string url = "https://raw.githubusercontent.com/6/stopwords/master/stopwords-all.json";
using (var client = new HttpClient())
{
try
{
var content = await client.GetStringAsync(url);
var words = JsonConvert.DeserializeObject<Dictionary<string, string[]>>(content);
return new HashSet<string>(words["en"]);
}
catch
{
return new HashSet<string>();
}
}
}
}
/// <summary>
/// 扩展方法类
/// </summary>
static class Extensions
{
/// <summary>
/// 自定义的 Map/Reduce 扩展方法
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <typeparam name="TMapped"></typeparam>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="source">源</param>
/// <param name="map">获取单个元素 Func</param>
/// <param name="keySelector">统计 Func</param>
/// <param name="reduce">查询结果 Func</param>
/// <returns></returns>
public static ParallelQuery<TResult> MapReduce<TSource, TMapped, TKey, TResult>(
this ParallelQuery<TSource> source,
Func<TSource, IEnumerable<TMapped>> map,
Func<TMapped, TKey> keySelector,
Func<IGrouping<TKey, TMapped>, IEnumerable<TResult>> reduce
)
{
return source
.SelectMany(map)
.GroupBy(keySelector)
.SelectMany(reduce);
}
public static IEnumerable<string> EnumLines(this StringReader reader)
{
while (true)
{
string line = reader.ReadLine();
if (null == line)
{
yield break;
}
yield return line;
}
}
}
打印结果
txt
'The Adventures of Tom Sawyer by Mark Twain' book stats
Top ten words used in this book:
Word: '?', times used: '61'
Word: '??', times used: '19'
Word: '???', times used: '5'
Word: 'p?', times used: '3'
Word: '}?', times used: '2'
Word: '=', times used: '2'
Word: '0?', times used: '2'
Word: '??{', times used: '2'
Word: 't?', times used: '2'
Word: '#', times used: '2'
Unique Words used: 4150
'Treasure Islan by Robert Louis Stevenson' book stats
Top ten words used in this book:
Word: 'man', times used: '227'
Word: 'captain', times used: '205'
Word: 'silver', times used: '194'
Word: 'doctor', times used: '151'
Word: 'time', times used: '130'
Word: 'good', times used: '123'
Word: 'hand', times used: '119'
Word: 'long', times used: '114'
Word: 'back', times used: '106'
Word: 'cried', times used: '103'
Unique Words used: 7452
'The Picture of Dorian Gray by Oscar Wilde' book stats
Top ten words used in this book:
Word: 'dorian', times used: '390'
Word: 'lord', times used: '247'
Word: 'henry', times used: '220'
Word: 'life', times used: '216'
Word: 'gray', times used: '176'
Word: 'man', times used: '168'
Word: 'harry', times used: '141'
Word: 'basil', times used: '135'
Word: 'things', times used: '124'
Word: 'thing', times used: '118'
Unique Words used: 8226
'Moby Dick; Or, The Whale by Herman Melville' book stats
Top ten words used in this book:
Word: '?', times used: '230'
Word: '??', times used: '71'
Word: '???', times used: '38'
Word: '????', times used: '10'
Word: '', times used: '8'
Word: 'e?', times used: '5'
Word: '?a', times used: '5'
Word: '<', times used: '5'
Word: '\', times used: '5'
Word: '{', times used: '5'
Unique Words used: 13003