多线程实战之批处理一个文件夹下的文件总结

项目背景

现有4个G的船舶轨迹数据,按照船号存储在一个文件夹下,我需要对这些数据进行处理一下,按照单线程的方式,处理起来有点浪费时间,为了提高效率,提高电脑CPU的利用率,打算将数据使用多线程的方式进行处理一下。

项目代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package InitShip;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.Buffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class FileThread extends Thread{
Tool tool = new Tool();
private final static CountDownLatch countDownL = new CountDownLatch(10);
private int fileIndex;
private List<String> fileList;
private String filepath="";
private String movepath="";

public String getFilepath() {
return filepath;
}
public void setFilepath(String filepath) {
this.filepath = filepath;
}
public String getMovepath() {
return movepath;
}
public void setMovepath(String movepath) {
this.movepath = movepath;
}
public int getFileIndex() {
return fileIndex;
}
public void setFileIndex(int fileIndex) {
this.fileIndex = fileIndex;
}
public List<String> getFileList() {
return fileList;
}
public void setFileList(List<String> fileList) {
this.fileList = fileList;
}
@Override
public void run() {
for(int i=0;i<fileList.size();i++) {
if(i%10 == fileIndex) {
//下面三行代码写逻辑代码的地方,可以将想要更改的文件逻辑添加在这个地方
List<List<String>> allList =tool.readFILEN(filepath+fileList.get(i));
String value = tool.WriteValue(allList);
//readFile.renameTo(new File(movepath+readFile.getName()));
tool.writerFile(movepath+fileList.get(i),value);
}
countDownL.countDown();
}
}
public static void main(String [] args) throws InterruptedException {
String filepath = "C:\\Users\\ouc\\Desktop\\data\\202101perupd_test\\";
String movepath = "C:\\Users\\ouc\\Desktop\\data\\202101perupd\\";
File file = new File(filepath);
String [] fileList = file.list();
List<String> fList = new ArrayList<String>();
for(int i=0;i<fileList.length;i++) {
fList.add(fileList[i]);
}
for(int i=0;i<10;i++) {
FileThread fileThread = new FileThread();
fileThread.setFileIndex(i);
fileThread.setFileList(fList);
fileThread.setFilepath(filepath);
fileThread.setMovepath(movepath);
fileThread.start();
}
countDownL.await();
}
}

相关知识记录

文件读写操作

CountDownLatch

在java.util.concurrent.CountDownLatch;包下的一个工具类,countDownLatch这个类使得一个线程或者多个线程等待其他线程各自执行完毕之后在执行。

两种使用场景

  • 让多个线程等待
  • 让单个线程等待

    构造器

    1
    2
    //参数count为计数值
    public CountDownLatch(int count){}

主要方法

1
2
3
4
5
6
//调用await()方法的线程会被挂起,它会等待直到count的值为0的时候才继续执行
public void await()
//和await()类似,只不过等待一定的时间后count的值还没有变为0的话就继续执行
public boolean await(long timeout,TimeUnit unit)
//执行将count的值减一
public void countDown(){};

让多个线程等待,模拟并发

并发线程一起执行,例如在指定的时刻进行秒杀,线程准备就绪后,进行等待,直到秒杀时刻的到来,然后一起执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FileThread extends Thread{

private final static CountDownLatch countDownL = new CountDownLatch(1);
@Override
public void run() {
try {
countDownL.await();
String parter ="["+Thread.currentThread().getName()+"]";
System.out.println(parter+"开始执行");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String [] args) throws InterruptedException {
for(int i=0;i<5;i++) {
new FileThread().start();
}
Thread.sleep(2000);
countDownL.countDown();
}
}

让单个线程等待,多个线程任务完成后,进行汇总合并

很多时候,并发任务存在前后依赖关系,例如:多个数据操作完成后,需要数据check,这是在多个线程任务完成后,进行汇总合并的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private  final static CountDownLatch countDownL = new CountDownLatch(5);
@Override
public void run() {
System.out.println("finish"+Thread.currentThread().getName());
countDownL.countDown();
}
public static void main(String [] args) throws InterruptedException {
for(int i=0;i<5;i++) {
new FileThread().start();
}
//Thread.sleep(2000);
countDownL.await();
System.out.println("主线程:在所有的任务完成后,执行");
}

工作原理

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。
调用await()方法的线程会被阻塞,直到计数器减到0的时候,才能继续往下执行。
调用await()进行阻塞等待的线程,他们被阻塞,这样所有的线程都能在同一个起跑线上
countDown()方法则是将计数器减1;当减为0的时候,就继续往下执行。

项目代码解读

首先创建了一个CountDownLatch的工具,初始值为10,设置一个文件的index,以及文件路径的列表,该类继承了Thread,因此需要重写run()方法,这里我们要使用的是第二种场景,让单线程,等待多线程的过程,虽然我们后续没有其他任务要继续执行。在run 方法里面,线程开始执行我们编写好的逻辑代码,执行完成一个之后,countDown就会减一,countDownLatch的值设置为全局的,因此,我们创建的10个线程,每次执行都会使得countDown减一,run()方法里面的if(i%10 == fileIndex) 表示了10个线程,需要负责的文件的数量。这样就实现了10个线程同时进行文件操作的过程。