做数据挖掘工作,最耗时,最费脑子的步骤应该是数据预处理了。
扫库出来格式:
类目 商品id 来源网址 标题
整个商品库扫下来的.txt很大,大约3亿条,20个G,我们的类目体系(类目树)全展开,大约15000+个类目,想在每个类目下的标题,使用topic model做一些数据挖掘工作。所以,数据预处理先得把这3亿条数据放到各自类目的.txt中。
咋一想,这种工作使用awk+sed这套文本处理套餐应该ok。
awk -F'\t' '{print $1"\t"$2"\t"$3 >> $1".txt"}' all_cid.log
以tab为分割符,把一行的商品id,网站,标题写到类目.txt
结果一跑,速度超级慢,大约2m/s的速度,这得到哪时候去呀。慢的原因应该是每读一行都打开对应类目txt做追加操作。
显然,先按类目排序,再一个个类目的数据一起写入txt,能取得较好效果。以我浅薄的算法知识,对这类大文件以归并排序排列500MB,速度仍然不满意。那只好拿出大杀器MapReduce咯。
有同学说这种一次性文本工作要专门写个Jar包,放Hadoop集群上跑有点吃力,有木有快一点的MapReduce实现
啊。有个shell MapReduce模板
cat *.txt | Map操作 | sort | Reduce操作
其中Map,Reduce操作部分可以用任何语言实现,这儿我选用了Python。
import sys
for line in sys.stdin:
line_list = line.strip().split("\t")
if len(line_list) != 4:
continue
cid = line_list[0]
print "%s\t%s\t%s\t%s" % (cid, line_list[1], line_list[2], line_list[3])
Map操作很简单,sys.stdin
读取cat
流进来的数据,做一些脏数据的出来,输出KV,这儿Key是类目号,Value是标题,来源,id。
中间sort操作就交给shell完成咯,
关键操作在Reduce里:
current_cid = None
cid = None
for line in sys.stdin:
line_list = line.strip().split("\t")
cid = line_list[0]
if current_cid == cid:
tmp_list.append("\t".join(line_list[1:]))
else:
if current_cid:
with open("./data/%s.txt" % current_cid, "a") as fp:
for i in tmp_list:
fp.write("%s\n" % (i))
current_cid = cid
tmp_list = ["\t".join(line_list[1:])]
if current_cid == cid:
with open("./data/%s.txt" % current_cid, "a") as fp:
fp.write("%s\t%s\t%s\n" % (line_list[1], line_list[2], line_list[3]))
得到:
其中1623.txt
中的数据:
好了。对了,数据大了做处理时候别忘记打日志:
import logging
logging_format_str = ("%(levelname)-8s %(asctime)s %(filename)s:%(lineno)d"
" ] %(message)s")
logging.basicConfig(format=logging_format_str, level=logging.DEBUG)
if count % 1000000 == 0:
logging.debug("%s processed cid:%s"%(count, cid))
对所有数据做这样的MapReduce时间控制在1小时,足够了。如果数据再多,那就得上Hadoop集群了,Hadoop有个Streaming工具,可以不限定java语言,对任何能使用Map,Reduce流操作都可以使用集群,非常方便,配置模板:
hadoop jar Hadoop路径/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.0.jar \
-D mapred.job.name=任务名称" \
-input HDFS上输入路径 \
-output HDFS上输出路径 \
-mapper mapper.py \
-reducer reducer.py \
-file ./mapper.py \
-file ./reducer.py
提交后,大象般的Hadoop集群就开始吭哧吭哧运作了.