当前位置:首页 > 编程笔记 > 正文
已解决

以key为特征,分类多个信息(手机流量栗子)

来自网友在路上 11058105提问 提问时间:2023-11-19 20:05:13阅读次数: 105

最佳答案 问答题库1058位专家为你答疑解惑

Mapper

package com.atguigu.mr.writable2;
/** Mapper阶段会运行MapTask,MapTask会调用Mapper类* 作用:在该类中实现需要在MapTask中实现的业务逻辑代码* */import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>:*   第一组泛型:*       KEYIN:读取数据时的偏移量的类型*       VALUEIN:读取的一行一行的数据的类型*   第二组泛型:*       KEYOUT:写出的key的类型(在这是手机号的类型)*       VALUEOUT:写出的value的类型(在这是手机号的数量的类型)* */
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {private Text outKey = new Text();//创建的Key对象private FlowBean outValue = new FlowBean();//创建的value的对象/*** 1.在map方法中实现需要在MapTask中实现的业务逻辑代码* 2,该方法在被循环调用,每调用一次传入一行数据* @param key 读取数据时的偏移量* @param value 读取的数据(一行一行的数据)* @param context 上下文(在这用来将key,value写出去)* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//        super.map(key, value, context);//1、将数据切割String[] phoneInfo = value.toString().split("\t");//2、封装key,value//给key赋值outKey.set(phoneInfo[1]);//给value赋值outValue.setUpFlow(Long.parseLong(phoneInfo[phoneInfo.length-3]));outValue.setDownFlow(Long.parseLong(phoneInfo[phoneInfo.length-2]));outValue.setSumFlow(outValue.getUpFlow()+outValue.getDownFlow());//3.给将key,value写出去context.write(outKey,outValue);}
}

Reducer

package com.atguigu.mr.writable2;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** Reduce阶段会运行ReduceTask,ReduceTask会调用Reducer类* 作用:在该类中实现需要在ReducerTask中实现的业务逻辑代码* */
/** Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>* 第一组泛型:*   KEYIN:读取的key的类型(Mapper写出的key的类型)*   VALUEIN:读取的value的类型(Mapper写出的value的类型)* 第二组泛型:*   KEYOUT:写出的key的泛型(在这是单词的类型)*   VALUEOUT:写出的value的类型(在这是单词的数量)* */
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {private FlowBean outValue = new FlowBean();//创建value对象/*** 1.在reduce方法中实现需要在ReduceTask中实现现在的业务逻辑代码* 2.reduce方法在被循环调用,每调用一次传入一组数据(在这key值相同为一组)* @param key 读取的key* @param values 读取的所有的value* @param context 上下文(在这用来将key,value写出去)* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {long sumUpFlow = 0;long sumDownFlow = 0;
//        super.reduce(key, values, context);//遍历所有的valuefor (FlowBean value : values){//将上行流量累加sumUpFlow += value.getUpFlow();//将下行流量累加sumDownFlow += value.getDownFlow();}//2.封装key,value//给value赋值outValue.setUpFlow(sumUpFlow);outValue.setDownFlow(sumDownFlow);outValue.setSumFlow(outValue.getUpFlow()+outValue.getDownFlow());//3.将key,value写出去context.write(key,outValue);}
}

FollowBean

package com.atguigu.mr.writable2;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*
*
* JavaBean :
* 1.自定义类并实现Writable接口
* 2.重写write和readFields方法
* */
public class FlowBean implements Writable {private long upFlow;private long downFlow;private long sumFlow;public FlowBean(){}/** 序列化时调用该方法* */@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/** 反序列化调用该方法* 注意:反序列化时的顺序和序列化时的顺序要保持一致* */@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}@Overridepublic String toString() {return "FlowBean{" +"upFlow=" + upFlow +", downFlow=" + downFlow +", sumFlow=" + sumFlow +'}';}
}

Driver

package com.atguigu.mr.writable2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.创建Job实例Job job = Job.getInstance(new Configuration());//2.给Job赋值//2.1 关联本程序的Jar——如果是本地可以不写,在集群上运行必须写job.setJarByClass(FlowDriver.class);//2.2设置Mapper和Reducer类job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);//2.3设置Mapper输出的key,value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//2.4设置最终输出的key,value的类型(在这是reducer输出的key,value的类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//2.5设置输入和输出路径FileInputFormat.setInputPaths(job,new Path("E:\\文档\\大数据\\尚硅谷_大数据\\test\\2MR1\\input"));FileOutputFormat.setOutputPath(job,new Path("E:\\文档\\大数据\\尚硅谷_大数据\\test\\2MR1\\output3"));//3.运行Job/** boolean waitForCompletion(boolean verbose)* verbose:是否打印信息* 返回值:如果job执行成功返回true* */boolean b = job.waitForCompletion(true);System.out.println("b====" + b);}
}

查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"以key为特征,分类多个信息(手机流量栗子)":http://eshow365.cn/6-39591-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!