大文本文件处理实例

做数据挖掘工作,最耗时,最费脑子的步骤应该是数据预处理了。

扫库出来格式:
类目 商品id 来源网址 标题
整个商品库扫下来的.txt很大,大约3亿条,20个G,我们的类目体系(类目树)全展开,大约15000+个类目,想在每个类目下的标题,使用topic model做一些数据挖掘工作。所以,数据预处理先得把这3亿条数据放到各自类目的.txt中。
图像1
咋一想,这种工作使用awk+sed这套文本处理套餐应该ok。

awk -F'\t' '{print $1"\t"$2"\t"$3 >> $1".txt"}' all_cid.log

以tab为分割符,把一行的商品id,网站,标题写到类目.txt
结果一跑,速度超级慢,大约2m/s的速度,这得到哪时候去呀。慢的原因应该是每读一行都打开对应类目txt做追加操作。

显然,先按类目排序,再一个个类目的数据一起写入txt,能取得较好效果。以我浅薄的算法知识,对这类大文件以归并排序排列500MB,速度仍然不满意。那只好拿出大杀器MapReduce咯。
有同学说这种一次性文本工作要专门写个Jar包,放Hadoop集群上跑有点吃力,有木有快一点的MapReduce实现
啊。有个shell MapReduce模板

cat *.txt | Map操作 | sort | Reduce操作

其中Map,Reduce操作部分可以用任何语言实现,这儿我选用了Python。

import sys

for line in sys.stdin:
    line_list = line.strip().split("\t")
    if len(line_list) != 4:
        continue
    cid = line_list[0]
    print "%s\t%s\t%s\t%s" % (cid, line_list[1], line_list[2], line_list[3])

Map操作很简单,sys.stdin读取cat流进来的数据,做一些脏数据的出来,输出KV,这儿Key是类目号,Value是标题,来源,id。
中间sort操作就交给shell完成咯,
图片2
关键操作在Reduce里:

current_cid = None
cid = None
for line in sys.stdin:
    line_list = line.strip().split("\t")
    cid = line_list[0]
    if current_cid == cid:
        tmp_list.append("\t".join(line_list[1:]))
    else:
        if current_cid:
            with open("./data/%s.txt" % current_cid, "a") as fp:
                for i in tmp_list:
                    fp.write("%s\n" % (i))
        current_cid = cid
        tmp_list = ["\t".join(line_list[1:])]

if current_cid == cid:
    with open("./data/%s.txt" % current_cid, "a") as fp:
        fp.write("%s\t%s\t%s\n" % (line_list[1], line_list[2], line_list[3]))

得到:
图片3
其中1623.txt中的数据:
图片4
好了。对了,数据大了做处理时候别忘记打日志:

import logging  
logging_format_str = ("%(levelname)-8s %(asctime)s %(filename)s:%(lineno)d"
                      " ] %(message)s")
logging.basicConfig(format=logging_format_str, level=logging.DEBUG)

    if count % 1000000 == 0:
        logging.debug("%s processed cid:%s"%(count, cid))

对所有数据做这样的MapReduce时间控制在1小时,足够了。如果数据再多,那就得上Hadoop集群了,Hadoop有个Streaming工具,可以不限定java语言,对任何能使用Map,Reduce流操作都可以使用集群,非常方便,配置模板:

hadoop jar Hadoop路径/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.0.jar \
-D mapred.job.name=任务名称" \
-input HDFS上输入路径 \
-output HDFS上输出路径 \
-mapper mapper.py \
-reducer reducer.py \
-file ./mapper.py \
-file ./reducer.py

提交后,大象般的Hadoop集群就开始吭哧吭哧运作了.

Comments
Write a Comment