多线程真头疼,但也挺有趣# Java - 爪哇娇娃
r*r
1 楼
有点点长啊。
有一个 tool, 以前处理的数据比较少,所以速度,时间都不成问题。但现在需要处理
的数据增大,速度问题就突出了。费了好大一番力气,总算弄通了,一次处理的时间由
原来的 24 个小时,减小为 3 个小时。
第一次弄多线程,很费劲。但弄通了(只是这个小 task 弄通了,并非多线程弄通了)
,又觉得很有意思,小有成就感。分享一下,希望对 beginner 有帮助,也希望有经验
的多指教。其中有些地方,还没有完全明白,正在学习-ing。
程序其实也简单,就是在单机上运行的一个程序。过去单线程的,即使在多 core 的机
器上运行,也只能利用处理器的少部分能力。现在就是要改为多线程,多核处理。
数据存在 10 个文本文件中,每个大约 40M.程序从每个文件逐行读取,逐行处理,很
多信息被保存到不同的数据结构中。最后输出处理报告,中间还要将部分处理信息输出
到一个 XML 文件中。
经过一番 google search 和研究,所做的改动包括:
1) 将原来的逐行读取,逐行处理,改为先将每个文件的所有行读到一个新建的
DataStore 中,这个结构比较简单。就是保持原来的每行文本信息,以及行号,和
getter, setter 等。每个都加上 synchronized
class DataStore{
}
2) 建立一个 thread pool,并将这个 DataStore 对象传到这个 pool 中。
ExecutorService executor = Executors.newFixedThreadPool(count);
for (int i = 0; i < count; i++) {
Runnable dataThread = new DataThread(dataStore,
"Thread " + i);
executor.execute(dataThread);
}
3) DataThread 的 run(),基本上就运行原来的代码,原来是从文件读取每一行,现在
变为从 dataStore 中逐行取得文本进行处理。
4)原来逐行处理,逐行输出部分信息到一个 XML 文件中,原来的记录顺序需要保持。
现在由于多线程异步处理,直接输出无法保证顺序。就先把这部分信息存储到一个新建
的 map 中,保存记录 ID -> data 的映射。当一个文件处理完了,才最后按 ID 从
map 有序输出到 XML 中。
5)其他很多信息需要计算,排序和保存。过去用的都是无保护的 Map, List, Set,以
及原始的 int. 后来在需要的地方,对这些数据存储对象都加上锁。比如:
synchronized(dataMap){
dataMap.remove(id);
}
其他需要帮助处理结果的 private static 方法,也都加上了 synchronized.
6)很多 int 型的计算 (i.e. itemCounts++), 都从 int 变成了 AtomicInteger.
基本上就这样。下面是几点体会。
1)为了测试多线程下无线程保护时可能出现的问题。开始我故意对所有的那些 map,
list 和 set 都用原来的,无任何保护。一运行,果然,上面那个 dataMap 马上出错
。数据处理完后,抛出一个 RuntimeException, 提示数据状态 inconsistent.
2) 遇到一个困难。就是在 local 机器上测试时,虽然发现速度明显提高,50% 左右,
但是通过 OS 的 performance monitor 观察时,发现内存使用逐渐小幅增加,当数据
处理越 2/3 后,内存所剩已无几,而 cpu 也马上从多 core,变为过去的单 core.但
机器并没有停下来,而是按单 core 的速度,处理完剩下的 1/3, memory 此期间全负
荷运转。
多线程的程序,怎么自动变成了单线程?memory usage 为什么增加? memory 增加
,其实预见到了,由于上面提到的改动 4),原来不保存,现在逐渐保存到一个 map 中
,所以消耗内存。没想到影响这么大,可能是记录比较多的缘故。再联系到 CPU 的使
用,原来当 memory 快耗尽时,OS or JVM 能够探测到,就自动 switch 到 single
core, 以保证程序继续运行。这里,还挺 smart 哈。
后来,就为解决这个问题,费力了。数据再大一点,可能就会死机了。等一个文件处
理完,存储到 map 中的方案不行。还得在处理数据的同时,想办法输出,不保存,或
少保存信息到 map 中。可是,异步处理,如何保持输出顺序呢?
try 了很多次,又 search 了很多,最后找到办法了。在原来 10 个 thread 的基础
上,再加一个专用的 thread, 在其他 10 thread 处理 dataStore 并存储到
resultMap 的同时,负责 concurrently 从这个 map 中输出信息到 XML 文件中;每当
一条信息输出完,就立刻从 map 中删除记录。这样,这个 map 的 size 一直会维持在
非常小的水平。如何控制顺序呢?就取一个循环变量。
int i =0;
do(
{
DataRecord record = resultMap.get(i);
if( record != null ){
// 1. output
// 2. resultMap.remove(i)
// i++
}else{
Thread.sleep(1000);
}
}while(i 觉得这段小程序真是体现了 multithread 的精髓。多人同时干一件事多或多件事,
并相互协调,保证很好的完成任务。主要的 10 个线程处理耗资源和时间的主任务,另
一个 thread 负责简单的输出;当它空闲无任务可做时,就通过睡觉 (sleep),稍事休
息,又继续工作,直到结束。fun!
3) 最后,又经过了一点优化吧。上面提到需要存取多个 map, list 和 set 时,都用
synchronized block 加锁。这样一来,有很多地方都需要这样,看上去不好。后来就
做了改动:
Map -> ConcurrentHashMap
List -> ConcurrentSkipListMap
只有一个 ArrayList 还是用 synchronized 加锁,因为不确定改为
CopyOnWriteArrayList 是否合适,还在研究!
最终,速度又从 50% 进一步减少为原来的 1/8,内存问题也得到了解决。发现速度的
提高,基本上与 thread pool 中线程数量成比例的增加。当然,thread 的数量应该小
于所在机器 core 的数量?
使用 Executors.newFixedThreadPool(count) 这张方式,很容易测试单线程和多线的
区别,所需要改的就是改变那个 count 变量的值,很方便。
最后提一下,第一次在 debugger 里调试多线程,感觉难多了!
有一个 tool, 以前处理的数据比较少,所以速度,时间都不成问题。但现在需要处理
的数据增大,速度问题就突出了。费了好大一番力气,总算弄通了,一次处理的时间由
原来的 24 个小时,减小为 3 个小时。
第一次弄多线程,很费劲。但弄通了(只是这个小 task 弄通了,并非多线程弄通了)
,又觉得很有意思,小有成就感。分享一下,希望对 beginner 有帮助,也希望有经验
的多指教。其中有些地方,还没有完全明白,正在学习-ing。
程序其实也简单,就是在单机上运行的一个程序。过去单线程的,即使在多 core 的机
器上运行,也只能利用处理器的少部分能力。现在就是要改为多线程,多核处理。
数据存在 10 个文本文件中,每个大约 40M.程序从每个文件逐行读取,逐行处理,很
多信息被保存到不同的数据结构中。最后输出处理报告,中间还要将部分处理信息输出
到一个 XML 文件中。
经过一番 google search 和研究,所做的改动包括:
1) 将原来的逐行读取,逐行处理,改为先将每个文件的所有行读到一个新建的
DataStore 中,这个结构比较简单。就是保持原来的每行文本信息,以及行号,和
getter, setter 等。每个都加上 synchronized
class DataStore{
}
2) 建立一个 thread pool,并将这个 DataStore 对象传到这个 pool 中。
ExecutorService executor = Executors.newFixedThreadPool(count);
for (int i = 0; i < count; i++) {
Runnable dataThread = new DataThread(dataStore,
"Thread " + i);
executor.execute(dataThread);
}
3) DataThread 的 run(),基本上就运行原来的代码,原来是从文件读取每一行,现在
变为从 dataStore 中逐行取得文本进行处理。
4)原来逐行处理,逐行输出部分信息到一个 XML 文件中,原来的记录顺序需要保持。
现在由于多线程异步处理,直接输出无法保证顺序。就先把这部分信息存储到一个新建
的 map 中,保存记录 ID -> data 的映射。当一个文件处理完了,才最后按 ID 从
map 有序输出到 XML 中。
5)其他很多信息需要计算,排序和保存。过去用的都是无保护的 Map, List, Set,以
及原始的 int. 后来在需要的地方,对这些数据存储对象都加上锁。比如:
synchronized(dataMap){
dataMap.remove(id);
}
其他需要帮助处理结果的 private static 方法,也都加上了 synchronized.
6)很多 int 型的计算 (i.e. itemCounts++), 都从 int 变成了 AtomicInteger.
基本上就这样。下面是几点体会。
1)为了测试多线程下无线程保护时可能出现的问题。开始我故意对所有的那些 map,
list 和 set 都用原来的,无任何保护。一运行,果然,上面那个 dataMap 马上出错
。数据处理完后,抛出一个 RuntimeException, 提示数据状态 inconsistent.
2) 遇到一个困难。就是在 local 机器上测试时,虽然发现速度明显提高,50% 左右,
但是通过 OS 的 performance monitor 观察时,发现内存使用逐渐小幅增加,当数据
处理越 2/3 后,内存所剩已无几,而 cpu 也马上从多 core,变为过去的单 core.但
机器并没有停下来,而是按单 core 的速度,处理完剩下的 1/3, memory 此期间全负
荷运转。
多线程的程序,怎么自动变成了单线程?memory usage 为什么增加? memory 增加
,其实预见到了,由于上面提到的改动 4),原来不保存,现在逐渐保存到一个 map 中
,所以消耗内存。没想到影响这么大,可能是记录比较多的缘故。再联系到 CPU 的使
用,原来当 memory 快耗尽时,OS or JVM 能够探测到,就自动 switch 到 single
core, 以保证程序继续运行。这里,还挺 smart 哈。
后来,就为解决这个问题,费力了。数据再大一点,可能就会死机了。等一个文件处
理完,存储到 map 中的方案不行。还得在处理数据的同时,想办法输出,不保存,或
少保存信息到 map 中。可是,异步处理,如何保持输出顺序呢?
try 了很多次,又 search 了很多,最后找到办法了。在原来 10 个 thread 的基础
上,再加一个专用的 thread, 在其他 10 thread 处理 dataStore 并存储到
resultMap 的同时,负责 concurrently 从这个 map 中输出信息到 XML 文件中;每当
一条信息输出完,就立刻从 map 中删除记录。这样,这个 map 的 size 一直会维持在
非常小的水平。如何控制顺序呢?就取一个循环变量。
int i =0;
do(
{
DataRecord record = resultMap.get(i);
if( record != null ){
// 1. output
// 2. resultMap.remove(i)
// i++
}else{
Thread.sleep(1000);
}
}while(i
并相互协调,保证很好的完成任务。主要的 10 个线程处理耗资源和时间的主任务,另
一个 thread 负责简单的输出;当它空闲无任务可做时,就通过睡觉 (sleep),稍事休
息,又继续工作,直到结束。fun!
3) 最后,又经过了一点优化吧。上面提到需要存取多个 map, list 和 set 时,都用
synchronized block 加锁。这样一来,有很多地方都需要这样,看上去不好。后来就
做了改动:
Map -> ConcurrentHashMap
List -> ConcurrentSkipListMap
只有一个 ArrayList 还是用 synchronized 加锁,因为不确定改为
CopyOnWriteArrayList 是否合适,还在研究!
最终,速度又从 50% 进一步减少为原来的 1/8,内存问题也得到了解决。发现速度的
提高,基本上与 thread pool 中线程数量成比例的增加。当然,thread 的数量应该小
于所在机器 core 的数量?
使用 Executors.newFixedThreadPool(count) 这张方式,很容易测试单线程和多线的
区别,所需要改的就是改变那个 count 变量的值,很方便。
最后提一下,第一次在 debugger 里调试多线程,感觉难多了!