当前位置: 首页> 财经> 产业 > Hadoop的streamingAPI与MapReduce[Python]

Hadoop的streamingAPI与MapReduce[Python]

时间:2025/7/14 15:34:27来源:https://blog.csdn.net/YENTERTAINR/article/details/140988099 浏览次数:0次

文章目录

  • 1.创建模拟文本
  • 2. 使用mapperduce统计标签分布和抽取指定标签
  • 3. 运行Map函数并排序结果以模拟Reduce任务:
  • 4.运行在无网络开发机上

1.创建模拟文本

1.1 机器模拟生成

from collections import namedtuple
from faker import Faker# 初始化Faker
fake = Faker()# 定义一个namedtuple类型,包含id, subject, text字段
GaokaoQuestion = namedtuple('GaokaoQuestion', 'id subject text')# 定义生成模拟数据的函数
def generate_faker_data(num_samples):data = []for _ in range(num_samples):# 使用faker生成数据subject = fake.word()text = fake.sentence()# 使用md5生成idid_value = f"{text} {subject}"id_hash = hashlib.md5(id_value.encode('utf-8')).hexdigest()# 创建namedtuple实例question = GaokaoQuestion(id=id_hash, subject=subject, text=text)data.append(question)return data# 生成3条模拟数据
samples = generate_faker_data(3)# 打印生成的数据
for sample in samples:print(sample)

2.手动生成

cat > test_data.jsonl << EOF
{"id":"1", "subject":"Math", "text":"Math question"}
{"id":"2", "subject":"Science", "text":"Science question"}
{"id":"3", "subject":"Math", "text":"Another Math question"}
EOF

2. 使用mapperduce统计标签分布和抽取指定标签

#!/usr/bin/env python3
import sys
import json
from collections import defaultdict# 指定需要抽取的subject标签列表
TARGET_SUBJECTS = ["数学", "物理"]def mapper():for line in sys.stdin:data = json.loads(line)if data['subject'] in TARGET_SUBJECTS:print(json.dumps(data))def reducer():counts = defaultdict(int)for line in sys.stdin:subject, count = line.strip().split('\t')counts[subject] += int(count)for subject, count in counts.items():print(f"{subject}\t{count}")if __name__ == "__main__":if len(sys.argv) > 1 and sys.argv[1] == 'reduce':reducer()else:mapper()

3. 运行Map函数并排序结果以模拟Reduce任务:

cat test_data.jsonl | python3 mapper_reducer.py | sort -k1,1 | python3 mapper_reducer.py reduce

4.运行在无网络开发机上

# 假设input_data.jsonl是HDFS上的输入文件路径
# 假设output是HDFS上输出结果的路径# 运行Map任务
hadoop fs -get /path/to/input_data.jsonl input_data.jsonl
python \Auser\tmp\mapper_reducer_script\mapper_reducer.py | sort -k1,1 > mapped_output.txt# 运行Reduce任务
python \Auser\tmp\mapper_reducer_script\mapper_reducer.py reduce < mapped_output.txt > reduced_output.txt# 将结果上传到HDFS
hadoop fs -put reduced_output.txt /path/to/output/
关键字:Hadoop的streamingAPI与MapReduce[Python]

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: