摘要: 原创出处 juejin.im/post/5da5b568f265da5b6c4bc13d 「ksfzhaohui」欢迎转载,保留摘要,谢谢!
前言
最近有个需求解析一个订单文件,并且说明文件可达到千万条数据,每条数据大概在20个字段左右,每个字段使用逗号分隔,需要尽量在半小时内入库。
思路
1.估算文件大小
因为告诉文件有千万条,同时每条记录大概在20个字段左右,所以可以大致估算一下整个订单文件的大小,方法也很简单使用FileWriter往文件中插入一千万条数据,查看文件大小,经测试大概在1.5G左右;
2.如何批量插入
由上可知文件比较大,一次性读取内存肯定不行,方法是每次从当前订单文件中截取一部分数据,然后进行批量插入,如何批次插入可以使用insert(...)values(...),(...)
的方式,经测试这种方式效率还是挺高的;
3.数据的完整性
截取数据的时候需要注意,需要保证数据的完整性,每条记录最后都是一个换行符,需要根据这个标识保证每次截取都是整条数,不要出现半条数据这种情况;
4.数据库是否支持批次数据
因为需要进行批次数据的插入,数据库是否支持大量数据写入,比如这边使用的mysql,可以通过设置max_allowed_packet来保证批次提交的数据量;
5.中途出错的情况
因为是大文件解析,如果中途出现错误,比如数据刚好插入到900w的时候,数据库连接失败,这种情况不可能重新来插一遍,所有需要记录每次插入数据的位置,并且需要保证和批次插入的数据在同一个事务中,这样恢复之后可以从记录的位置开始继续插入。
实现
1.准备数据表
这里需要准备两张表分别是:订单状态位置信息表,订单表;
CREATE TABLE `file_analysis` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `file_type` varchar (255 ) NOT NULL COMMENT '文件类型 01:类型1,02:类型2' , `file_name` varchar (255 ) NOT NULL COMMENT '文件名称' , `file_path` varchar (255 ) NOT NULL COMMENT '文件路径' , `status` varchar (255 ) NOT NULL COMMENT '文件状态 0初始化;1成功;2失败:3处理中' , `position` bigint (20 ) NOT NULL COMMENT '上一次处理完成的位置' , `crt_time` datetime NOT NULL COMMENT '创建时间' , `upd_time` datetime NOT NULL COMMENT '更新时间' , PRIMARY KEY (`id` ) ) ENGINE =InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET =utf8
CREATE TABLE `file_order` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `file_id` bigint (20 ) DEFAULT NULL , `field1` varchar (255 ) DEFAULT NULL , `field2` varchar (255 ) DEFAULT NULL , `field3` varchar (255 ) DEFAULT NULL , `field4` varchar (255 ) DEFAULT NULL , `field5` varchar (255 ) DEFAULT NULL , `field6` varchar (255 ) DEFAULT NULL , `field7` varchar (255 ) DEFAULT NULL , `field8` varchar (255 ) DEFAULT NULL , `field9` varchar (255 ) DEFAULT NULL , `field10` varchar (255 ) DEFAULT NULL , `field11` varchar (255 ) DEFAULT NULL , `field12` varchar (255 ) DEFAULT NULL , `field13` varchar (255 ) DEFAULT NULL , `field14` varchar (255 ) DEFAULT NULL , `field15` varchar (255 ) DEFAULT NULL , `field16` varchar (255 ) DEFAULT NULL , `field17` varchar (255 ) DEFAULT NULL , `field18` varchar (255 ) DEFAULT NULL , `crt_time` datetime NOT NULL COMMENT '创建时间' , `upd_time` datetime NOT NULL COMMENT '更新时间' , PRIMARY KEY (`id` ) ) ENGINE =InnoDB AUTO_INCREMENT=10000024 DEFAULT CHARSET =utf8
2.配置数据库包大小
mysql> show VARIABLES like '%max_allowed_packet%'; + | Variable_name | Value | + | max_allowed_packet | 1048576 | | slave_max_allowed_packet | 1073741824 | + 2 rows in set mysql> set global max_allowed_packet = 1024 *1024 *10 ; Query OK, 0 rows affected
通过设置max_allowed_packet,保证数据库能够接收批次插入的数据包大小;不然会出现如下错误:
Caused by: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (4980577 > 1048576 ) . You can change this value on the server by setting the max_allowed_packet' variable. at com.mysql.jdbc.MysqlIO.send (MysqlIO.java:3915 ) at com.mysql.jdbc.MysqlIO.sendCommand (MysqlIO.java:2598 ) at com.mysql.jdbc.MysqlIO.sqlQueryDirect (MysqlIO.java:2778 ) at com.mysql.jdbc.ConnectionImpl.execSQL (ConnectionImpl.java:2834 )
3.准备测试数据
public static void main (String[] args) throws IOException { FileWriter out = new FileWriter(new File("D://xxxxxxx//orders.txt" )); for (int i = 0 ; i < 10000000 ; i++) { out.write( "vaule1,vaule2,vaule3,vaule4,vaule5,vaule6,vaule7,vaule8,vaule9,vaule10,vaule11,vaule12,vaule13,vaule14,vaule15,vaule16,vaule17,vaule18" ); out.write(System.getProperty("line.separator" )); } out.close(); }
使用FileWriter遍历往一个文件里插入1000w条数据即可,这个速度还是很快的,不要忘了在每条数据的后面添加换行符(\n\r)
;
4.截取数据的完整性
除了需要设置每次读取文件的大小,同时还需要设置一个参数,用来每次获取一小部分数据,从这小部分数据中获取换行符(\n\r)
,如果获取不到一直累加直接获取为止,这个值设置大小大致同每条数据的大小差不多合适,部分实现如下:
ByteBuffer byteBuffer = ByteBuffer.allocate(buffSize); long endPosition = batchFileSize + startPosition - buffSize;long startTime, endTime;for (int i = 0 ; i < count; i++) { startTime = System.currentTimeMillis(); if (i + 1 != count) { int read = inputChannel.read(byteBuffer, endPosition); readW: while (read != -1 ) { byteBuffer.flip(); byte [] array = byteBuffer.array(); for (int j = 0 ; j < array.length; j++) { byte b = array[j]; if (b == 10 || b == 13 ) { endPosition += j; break readW; } } endPosition += buffSize; byteBuffer.clear(); read = inputChannel.read(byteBuffer, endPosition); } } else { endPosition = fileSize; } ...省略,更多可以查看Github完整代码... }
如上代码所示开辟了一个缓冲区,根据每行数据大小来定大概在200字节左右,然后通过遍历查找换行符(\n\r),找到以后将当前的位置加到之前的结束位置上,保证了数据的完整性;
5.批次插入数据
通过insert(...)values(...),(...)
的方式批次插入数据,部分代码如下:
SqlSession session = sqlSessionFactory.openSession(); try { long startTime = System.currentTimeMillis(); FielAnalysisMapper fielAnalysisMapper = session.getMapper(FielAnalysisMapper.class); FileOrderMapper fileOrderMapper = session.getMapper(FileOrderMapper.class); fileOrderMapper.batchInsert(orderList); fileAnalysis.setPosition(endPosition + 1 ); fileAnalysis.setStatus("3" ); fileAnalysis.setUpdTime(new Date()); fielAnalysisMapper.updateFileAnalysis(fileAnalysis); session.commit(); long endTime = System.currentTimeMillis(); System.out.println("===插入数据花费:" + (endTime - startTime) + "ms===" ); } catch (Exception e) { session.rollback(); } finally { session.close(); } ...省略,更多可以查看Github完整代码...
如上代码在一个事务中同时保存批次订单数据和文件解析位置信息,batchInsert通过使用mybatis的``标签来遍历订单列表,生成values数据;
总结
以上展示了部分代码,完整的代码可以查看Github地址中的batchInsert模块,本地设置每次截取的文件大小为2M,经测试1000w条数据(大小1.5G左右)插入mysql数据库中,大概花费时间在20分钟左右,当然可以通过设置截取的文件大小,花费的时间也会相应的改变。
完整代码
https://github.com/ksfzhaohui/blog/tree/master/mybatis