这个功能主要是使用python的jieba分词模块来分词,然后使用mapreduce来统计,其实功能很简单,但是由于刚开始接触使用hadoop,出现了各种问题,这里记录下。
在hadoop执行的mapreduce程序时,使用python非系统模块的话会出现找不到包的错误,网上找到了一种解决方案,使用系统内置的zipimport
模块,使用方法:
首先将该模块的所有文件复制到当前目录,然后执行:
zip -r jieba.zip jieba
mv jieba.zip jieba.mod
然后代码中就可以这样引入:
import sys
sys.path.append('./')
import zipimport
importer = zipimport.zipimporter('jieba.mod')
jieba = importer.load_module('jieba')
import jieba.analyse
执行的时候又提示错误.在import jieba.analyse
时候报错找不到idf.txt
这个文件,看了下jieba的源码才发现,在tfidf.py
文件如下代码块:
_get_module_path = lambda path: os.path.normpath(os.path.join(os.getcwd(),
os.path.dirname(__file__), path))
_get_abs_path = jieba._get_abs_path
DEFAULT_IDF = _get_module_path("idf.txt")
需要读取idf.txt
,因为我是使用压缩包导入的模块,而这样读取的话肯定是读取不到的,之后修改了源码,将上面代码块的最后一行修改为DEFAULT_IDF='idf.txt'
然后将执行MapReduce的命令中加上该文件:
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.8.2.jar \
-files ./mapper.py,./reducer.py,./jieba.mod,./idf.txt \
-mapper "python3 ./mapper.py" \
-reducer "python3 ./reducer.py" \
-input xiyouji/* -output xiyouji-output
终于可以成功执行了。 下面贴出源码 mapper.py:
#!/usr/bin/env python
import sys
sys.path.append('./')
import zipimport
importer = zipimport.zipimporter('jieba.mod')
jieba = importer.load_module('jieba')
import jieba.analyse
for line in sys.stdin:
line = line.strip()
results = jieba.analyse.extract_tags(line, topK=100)
resultdict = dict()
for s in results:
if s and s.strip():
if s in resultdict:
resultdict[s] += 1
else:
resultdict[s] = 1
if resultdict and len(resultdict):
for s in resultdict:
if s:
print('%s\t%s' % (s, resultdict[s]))
reducer.py:
#!/usr/bin/env python
from operator import itemgetter
import sys
import sys
sys.path.append('./')
current_word = None
current_count = 0
word = None # input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# print(line)
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print('%s\t%s' % (current_word, current_count))
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print('%s\t%s' % (current_word, current_count))
使用
hadoop fs -cat xiyouji-output/* | sort -nk 2 -r | more
来查看结果.如下图。