1、MapReduce是人类有史以来第一代分布式计算引擎
2、后期的绝大多数的分布式计算引擎都借鉴了MapReduce的思想
3、学习了MapReduce可以为以后学习其他的分布式计算引擎打好基础
4、目前还有很多的框架底层代码就是MapReduce: Hive、Sqoop、Oozie
1、MapReduce的学习是一半理论,一半代码
2、指导思想是:重理论,轻代码
3、上课的代码要求能看懂,自己敲一遍即可,不要求盲写
第一代:MapReduce(MR) 离线分析
第二代:Tez 离线分析
第三代:Spark 离线分析 + 实时分析
第四代:Flink 离线分析 + 实时分析
第五代:Doris , kylin ,ClickHouse, ES,
1、MapReduce最基本的思想就是分而治之
2、MapReduce有两个阶段,一个Map阶段,负责任务的拆分,一个Reduce阶段负责任务的合并
3、MapReduce将一个大的任务进行拆分,拆分成小任务,拆分之后,放在不同的主机上运行,运行之后再将这些结果合并
4、MapReduce整个处理过程就是将原始数据转成一个个键值对,然后不断的对这些键值对进行迭代处理,直到得到最理想的键值对位,最后的键值对就是我们想要的结果
介绍
对文件中的单词数量进行统计
代码
//=================================WorCountMapper类==============================
package pack01_wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 第一步:定义类继承Mapper类* 四个泛型:K1的类型,V1的类型,K2的类型,V2的类型*/public class WorCountMapper extends Mapper {/*** 第二步:重写map方法,在该方法中,将K1、V1,转为K2和V2* @param key 表示K1,是每一行的偏移量,是系统自动转换得到* @param value 表是V1, 是每一行的文本数据* @param context 表示MapReduce的上下文对象,可以将我们的键值对传送到下一个处理环节*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1:获取K2//1.1 对V1按照空格进行切割,获取的每个单词就是K2String[] words = value.toString().split(" ");for (String k2 : words) {//2:获取V2,V2就是固定值1//3:将K2和V2写入上下文中,送到下一个处理环节context.write(new Text(k2),new LongWritable(1));}}
}//=================================WordCountReducer类=============================
package pack01_wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Iterator;/*** 第一步:自定义类,继承Reducer类型* 泛型:K2类型、V2类型、K3类型、V3类型*/public class WordCountReducer extends Reducer {/*** 第二步:重写reduce方法,在该方法中,将新K2,V2,转为K3和V3* @param key 新K2* @param values [V2]* @param context 上下文对象*/@Overrideprotected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {//1:获取K3, 新K2就是K3//2:获取V3,遍历[V2]集合,将所有的值相加long count = 0;//Iterator iterator = values.iterator();//while (iterator.hasNext()){// long i = iterator.next().get();// count += i;//}for (LongWritable value : values) {count += value.get();}//3:将K3和V3写入上下文中context.write(key,new LongWritable(count));}
}//=================================WordCountDriver类=============================
package pack01_wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** 创建一个Job任务,设置Job任务每一个环节的处理流程,最后将该任务交Yarn执行*/
public class WordCountDriver {public static void main(String[] args) throws Exception{//1:创建Job任务对象Configuration configuration = new Configuration();//configuration.set("参数名字","参数值");Job job = Job.getInstance(configuration, "wordcount_basic");//2、设置置作业驱动类job.setJarByClass(WordCountDriver.class);//3、设置文件读取输入类的名字和文件的读取路径/*FileInputFormat父类:该类用来决定源数据的读取方式TextInputFormat(默认子类):一行一行读取*///方式1-写法//FileInputFormat.addInputPath(job, new Path("hdfs://node1:8020/mapreduce/input/wordcount"));FileInputFormat.addInputPath(job, new Path("file:///D:\\input\\wordcount"));//方式2-写法/*job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/mapreduce/input/wordcount"));*///4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型job.setMapperClass(WorCountMapper.class);job.setMapOutputKeyClass(Text.class); //设置K2类型job.setMapOutputValueClass(LongWritable.class); //设置V2类型//5:设置分区、排序,规约、分组(保留)//6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class); //设置K3类型job.setOutputValueClass(LongWritable.class); //设置V3类型//7、设置文件读取输出类的名字和文件的写入路径/*FileOutputFormat父类:该类用来决定目标数据的写入方式TextOutputFormat(默认子类):一行一行写入*///方式1-写法//FileOutputFormat.setOutputPath(job, new Path("hdfs://node1:8020/mapreduce/output/wordcount"));FileOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\wordcount"));//方式2-写法/*job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("hdfs://node1:8020/mapreduce/input/wordcount"));*///8、将设置好的job交给Yarn集群去执行// 提交作业并等待执行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);}
}
测试
注意:测试之前一定要准备好数据,目标目录不能存在,否则报错
本地测试
直接右键执行, 注意该测试只是模拟,不是真正的Yarn集群执行
集群测试
1、代码打jar包
2、将jar包上传到Linux服务器
3、执行以下命令hadoop jar module3_mapreduce-1.0-SNAPSHOT.jar pack01_wordcount.WordCountDriver
分区概念的引入
1、为了增加MR数据聚合的并行度,有时候需要增加Reduce的个数
2、增加了Reduce之后,就要面临一个问题,哪些键值对规哪个Reduce来聚合,你需要定义出一套规则,这套规则就是分区
3、分区就是对每一个K2和V2键值对打标记,标记相同的键值对就会跑到同一个Reduce
4、如果你定义分区,系统有默认的分区机制5、MR的默认分区是按照键K2进行分区
自定义分区代码编写思路
#需求:将wordcount案例中的数据按照单词长度进行分区,长度>=5的单词和长度小于5的单词进行分区1、定义类继承Partitioner类
2、重写getPartition方法,在该方法中对每一个K2和V2打标记,标记从0开始,0标记的键值对会被0编号的Reduce拉取进行聚合,1标记的键值对会被1编号的Reduce进行聚合
3、设置job你的自定义分区类job.setPartitionerClass(MyPartitioner.class);
4、在主类中要设置Reduce的个数为job.setNumReduceTasks(2);
代码
//=================================CovidMapper类=============================
package pack04_wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 第一步:定义类继承Mapper类* 四个泛型:K1的类型,V1的类型,K2的类型,V2的类型*/public class WorCountMapper extends Mapper {/*** 第二步:重写map方法,在该方法中,将K1、V1,转为K2和V2* @param key 表示K1,是每一行的偏移量,是系统自动转换得到* @param value 表是V1, 是每一行的文本数据* @param context 表示MapReduce的上下文对象,可以将我们的键值对传送到下一个处理环节*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1:获取K2//1.1 对V1按照空格进行切割,获取的每个单词就是K2String[] words = value.toString().split(" ");for (String k2 : words) {//2:获取V2,V2就是固定值1//3:将K2和V2写入上下文中,送到下一个处理环节context.write(new Text(k2),new LongWritable(1));}}
}//=================================CovidPartitioner类=============================
package pack04_wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*1、定义类继承Partitioner类
2、重写getPartition方法,在该方法中对每一个K2和V2打标记,标记从0开始,0标记的键值对会被0编号的Reduce拉取进行聚合,1标记的键值对会被1编号的Reduce进行聚合
3、在主类中要设置Reduce的个数为2*/
public class MyPartitioner extends Partitioner {/**** @param text K2* @param longWritable V2* @param i Reduce的个数* @return*/@Overridepublic int getPartition(Text text, LongWritable longWritable, int i) {// 长度>=5的单词打标记为0// 长度小于5的单词打标记为1if(text.toString().length() >= 5){return 0;}else {return 1;}}
}//=================================CovidReducer类=============================
package pack04_wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** 第一步:自定义类,继承Reducer类型* 泛型:K2类型、V2类型、K3类型、V3类型*/public class WordCountReducer extends Reducer {/*** 第二步:重写reduce方法,在该方法中,将新K2,V2,转为K3和V3* @param key 新K2* @param values [V2]* @param context 上下文对象*/@Overrideprotected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {//1:获取K3, 新K2就是K3//2:获取V3,遍历[V2]集合,将所有的值相加long count = 0;//Iterator iterator = values.iterator();//while (iterator.hasNext()){// long i = iterator.next().get();// count += i;//}for (LongWritable value : values) {count += value.get();}//3:将K3和V3写入上下文中context.write(key,new LongWritable(count));}
}//=================================CovidDriver类=============================
package pack04_wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*1、定义类继承Partitioner类
2、重写getPartition方法,在该方法中对每一个K2和V2打标记,标记从0开始,0标记的键值对会被0编号的Reduce拉取进行聚合,1标记的键值对会被1编号的Reduce进行聚合
3、在主类中要设置Reduce的个数为2*/
public class MyPartitioner extends Partitioner {/**** @param text K2* @param longWritable V2* @param i Reduce的个数* @return*/@Overridepublic int getPartition(Text text, LongWritable longWritable, int i) {// 长度>=5的单词打标记为0// 长度小于5的单词打标记为1if(text.toString().length() >= 5){return 0;}else {return 1;}}
}
默认分区代码
public class HashPartitioner extends Partitioner {public HashPartitioner() {}//根据每一个K2的hash进行分区,分区的效果是:每一个Reduce可以均衡的聚合数据public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & 2147483647) % numReduceTasks;}
}//----------------------------------------------------------------------public class HashPartitioner extends Partitioner {public HashPartitioner() {}//根据每一个K2的hash进行分区,分区的效果是:每一个Reduce可以均衡的聚合数据public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode()+随机数 & 2147483647) % numReduceTasks;}
}
需求
根据疫情数据,统计美国每个州的确诊病例数和死亡病例数时间 县名 州名, 县编码 确诊人数 死亡人数
2021-01-28,Autauga,Alabama, 01001, 5554, 69select 州名,sum(确诊人数),sum(死亡人数) from t_covid group by 州名#最后结果
Alabama 192898 345
Arkansa 25109 875
思路
1、将州名作为K2,将确诊人数 死亡人数作为V2
2、可以将V2封装成一个Java类,如果一个自定义类出现在MapReduce中,必须保证该类能够被序列化和反序列化--方式1:实现Writable#应用场景:JavaBean类对象不作为K2,不需要能够被排序public class CovidBean implements Writable {//实现序列化@Overridepublic void write(DataOutput out) throws IOException {}//实现反序列化@Overridepublic void readFields(DataInput in) throws IOException {}
}--方式2:实现WritableComparable#应用场景:JavaBean类对象作为K2,需要能够被排序
public class CovidBean implements WritableComparable {//定义类对象排序的比较规则@Overridepublic int compareTo(CovidBean o) {return 0;}//实现序列化@Overridepublic void write(DataOutput out) throws IOException {}//实现反序列化@Overridepublic void readFields(DataInput in) throws IOException {}
}
代码
package pack07_covid_bean;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*1、在MR中,如果要自定义Java类,如果该类不是K2,则直接实现Writable接口2、在该接口中重写序列化方法和反序列化方法*/
public class CovidBean implements Writable {private int cases; //确诊人数private int deaths; //死亡人数public CovidBean(int cases, int deaths) {this.cases = cases;this.deaths = deaths;}public CovidBean() {}public int getCases() {return cases;}public void setCases(int cases) {this.cases = cases;}public int getDeaths() {return deaths;}public void setDeaths(int deaths) {this.deaths = deaths;}//实现序列化:写@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(cases);out.writeInt(deaths);}//实现反序列化:读@Overridepublic void readFields(DataInput in) throws IOException {this.cases = in.readInt();this.deaths = in.readInt();}@Overridepublic String toString() {return cases + "\t" + deaths ;}
}//---------------------------------------
package pack07_covid_bean;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*1、自定义Java类*/
public class CovidMapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {String[] array = value.toString().split(",");if(array.length != 6){return;}//1:得到K2String k2 = array[2];//2:得到V2CovidBean v2 = new CovidBean();v2.setCases(Integer.parseInt(array[4]));v2.setDeaths(Integer.parseInt(array[5]));//3:将K2和V2写入上下文context.write(new Text(k2),v2);}
}//--------------------------------------------
package pack07_covid_bean;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*1、自定义Java类*/
public class CovidReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {/*K2 [V2]Alabama {Covid(18919,234),Covid(383883,119)}*///1:得到K3,K2就是K3,//2:得到V3int casesCount= 0;int deathsCount= 0;for (CovidBean value : values) {casesCount += value.getCases(); //累加确诊病例deathsCount += value.getDeaths(); //累加死亡病例}CovidBean covidBean = new CovidBean();covidBean.setCases(casesCount);covidBean.setDeaths(deathsCount);//3:将K3和V3写入上下文中context.write(key,covidBean);}
}//-------------------------------------------
package pack07_covid_bean;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import pack05_wordcount.WorCountMapper;
import pack05_wordcount.WordCountDriver;
import pack05_wordcount.WordCountReducer;import java.net.URI;/*1、自定义Java类*/
public class CovidDriver {public static void main(String[] args) throws Exception {//1:创建Job任务对象Configuration configuration = new Configuration();//configuration.set("参数名字","参数值");Job job = Job.getInstance(configuration, "covid_bean_demo");//2、设置置作业驱动类job.setJarByClass(CovidDriver.class);//3、设置文件读取输入类的名字和文件的读取路径//方式1-写法//FileInputFormat.addInputPath(job, new Path("hdfs://node1:8020/mapreduce/input/wordcount"));//FileInputFormat.addInputPath(job, new Path("file:///D:\\input\\wordcount"));FileInputFormat.addInputPath(job, new Path(args[0]));//4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型job.setMapperClass(CovidMapper.class);job.setMapOutputKeyClass(Text.class); //设置K2类型job.setMapOutputValueClass(CovidBean.class); //设置V2类型//5:设置分区、排序,规约、分组(保留)//5.1 设置你的定义分区类//job.setPartitionerClass(MyPartitioner.class);//5.2 设置Reduce个数//job.setNumReduceTasks(2);//6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型job.setReducerClass(CovidReducer.class);job.setOutputKeyClass(Text.class); //设置K3类型job.setOutputValueClass(CovidBean.class); //设置V3类型//7、设置文件读取输出类的名字和文件的写入路径//7.1 如果目标目录存在,则删除String fsType = "file:///";//String outputPath = "file:///D:\\output\\wordcount";//String fsType = "hdfs://node1:8020";//String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount";String outputPath = args[1];URI uri = new URI(fsType);FileSystem fileSystem =FileSystem.get(uri, configuration);boolean flag = fileSystem.exists(new Path(outputPath));if(flag == true){fileSystem.delete(new Path(outputPath),true);}FileOutputFormat.setOutputPath(job, new Path(outputPath));//FileOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\wordcount"));//8、将设置好的job交给Yarn集群去执行// 提交作业并等待执行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);}
}
需求
#数据
Alabama 452734 7340
Alaska 53524 253
Arizona 745976 12861#要求
基于以上数据对确诊病例数进行降序排序,如果确诊病例数相同 ,则按照死亡病例数升序排序
select * from A order by cases desc , deaths asc;
思路
1、MR的排序只能按照K2排序,哪个字段要参与排序,则哪个字段就应该包含在K2中
2、如果你自定义类作为K2,则必须指定排序规则,实现WritableComparable接口,重写compareTo方法,其他的地方不需要再做任何的设置
代码
package pack08_covid_sort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class CovidSortBean implements WritableComparable {private String state; //州名private int cases; //确诊人数private int deaths; //死亡人数public String getState() {return state;}public void setState(String state) {this.state = state;}public int getCases() {return cases;}public void setCases(int cases) {this.cases = cases;}public int getDeaths() {return deaths;}public void setDeaths(int deaths) {this.deaths = deaths;}@Overridepublic String toString() {return state + "\t" + cases + "\t"+deaths ;}//定义你的JavaBean对象的排序规则/*Alabama 452734 7340Alaska 53524 253Arizona 745976 12861基于以上数据对确诊病例数进行降序排序,如果确诊病例数相同 ,则按照死亡病例数升序排序select * from A order by cases desc , deaths asc;我 > 他 返回大于0的值我 < 他 返回小于0的值我 = 他 返回等于0的值*/@Overridepublic int compareTo(CovidSortBean o) {int result = this.cases - o.cases;if(result == 0){return this.deaths - o.deaths;}return result * -1;}//实现序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(state);out.writeInt(cases);out.writeInt(deaths);}//实现反序列化@Overridepublic void readFields(DataInput in) throws IOException {this.state = in.readUTF();this.cases = in.readInt();this.deaths = in.readInt();}
}#----------------------------------------
package pack08_covid_sort;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CovidSortMapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {//1:得到K2String[] array = value.toString().split("\t");CovidSortBean k2 = new CovidSortBean();k2.setState(array[0]);k2.setCases(Integer.parseInt(array[1]));k2.setDeaths(Integer.parseInt(array[2]));//2:得到V2,就是NullWritable//3:将K2和V2写入上下文中context.write(k2,NullWritable.get());}
}#----------------------------------
package pack08_covid_sort;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CovidSortReducer extends Reducer {@Overrideprotected void reduce(CovidSortBean key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {//1:得到K3,就是K2//2:得到V3,就是NullWritable//3:将K3和V3写入上下文中context.write(key,NullWritable.get());}
}#----------------------------------package pack08_covid_sort;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class CovidSortDriver {public static void main(String[] args) throws Exception {//1:创建Job任务对象Configuration configuration = new Configuration();//configuration.set("参数名字","参数值");Job job = Job.getInstance(configuration, "covid_sort_demo");//2、设置置作业驱动类job.setJarByClass(CovidSortDriver.class);//3、设置文件读取输入类的名字和文件的读取路径//方式1-写法//FileInputFormat.addInputPath(job, new Path("hdfs://node1:8020/mapreduce/input/wordcount"));//FileInputFormat.addInputPath(job, new Path("file:///D:\\input\\wordcount"));FileInputFormat.addInputPath(job, new Path(args[0]));//4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型job.setMapperClass(CovidSortMapper.class);job.setMapOutputKeyClass(CovidSortBean.class); //设置K2类型job.setMapOutputValueClass(NullWritable.class); //设置V2类型//5:设置分区、排序,规约、分组(保留)//5.1 设置你的定义分区类//job.setPartitionerClass(MyPartitioner.class);//5.2 设置Reduce个数//job.setNumReduceTasks(2);//6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型job.setReducerClass(CovidSortReducer.class);job.setOutputKeyClass(CovidSortBean.class); //设置K3类型job.setOutputValueClass(NullWritable.class); //设置V3类型//7、设置文件读取输出类的名字和文件的写入路径//7.1 如果目标目录存在,则删除String fsType = "file:///";//String outputPath = "file:///D:\\output\\wordcount";//String fsType = "hdfs://node1:8020";//String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount";String outputPath = args[1];URI uri = new URI(fsType);FileSystem fileSystem =FileSystem.get(uri, configuration);boolean flag = fileSystem.exists(new Path(outputPath));if(flag == true){fileSystem.delete(new Path(outputPath),true);}FileOutputFormat.setOutputPath(job, new Path(outputPath));//FileOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\wordcount"));//8、将设置好的job交给Yarn集群去执行// 提交作业并等待执行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);}
}
介绍
当我们在使用MapReduce进行大数据分析时,很多时候使用一个MR并不能完成分析任务,需要使用多个MR进行串联
则我们可以使用MR提供的Job控制器来实现多个MR的依赖串联执行
代码
package pack09_mapreduce_series;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import pack07_covid_bean.CovidBean;
import pack07_covid_bean.CovidDriver;
import pack07_covid_bean.CovidMapper;
import pack07_covid_bean.CovidReducer;
import pack08_covid_sort.CovidSortBean;
import pack08_covid_sort.CovidSortDriver;
import pack08_covid_sort.CovidSortMapper;
import pack08_covid_sort.CovidSortReducer;import java.net.URI;public class MapReduceSeriesJob {public static void main(String[] args) throws Exception {//1:创建Job任务对象Configuration configuration = new Configuration();//configuration.set("参数名字","参数值");Job job1 = Job.getInstance(configuration, "covid_bean_demo");//2、设置置作业驱动类job1.setJarByClass(CovidDriver.class);//3、设置文件读取输入类的名字和文件的读取路径FileInputFormat.addInputPath(job1, new Path(args[0]));//4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型job1.setMapperClass(CovidMapper.class);job1.setMapOutputKeyClass(Text.class); //设置K2类型job1.setMapOutputValueClass(CovidBean.class); //设置V2类型//5:设置分区、排序,规约、分组(保留)//6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型job1.setReducerClass(CovidReducer.class);job1.setOutputKeyClass(Text.class); //设置K3类型job1.setOutputValueClass(CovidBean.class); //设置V3类型//7、设置文件读取输出类的名字和文件的写入路径//7.1 如果目标目录存在,则删除String fsType = "file:///";String outputPath = args[1];URI uri = new URI(fsType);FileSystem fileSystem =FileSystem.get(uri, configuration);boolean flag = fileSystem.exists(new Path(outputPath));if(flag == true){fileSystem.delete(new Path(outputPath),true);}FileOutputFormat.setOutputPath(job1, new Path(outputPath));//todo 将普通的作用包装成受控作业ControlledJob cj1 = new ControlledJob(configuration);cj1.setJob(job1);//1:创建Job2任务对象//configuration.set("参数名字","参数值");Job job2 = Job.getInstance(configuration, "covid_sort_demo");//2、设置置作业驱动类job2.setJarByClass(CovidSortDriver.class);//3、设置文件读取输入类的名字和文件的读取路径FileInputFormat.addInputPath(job2, new Path(args[1]));//4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型job2.setMapperClass(CovidSortMapper.class);job2.setMapOutputKeyClass(CovidSortBean.class); //设置K2类型job2.setMapOutputValueClass(NullWritable.class); //设置V2类型//5:设置分区、排序,规约、分组(保留)//6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型job2.setReducerClass(CovidSortReducer.class);job2.setOutputKeyClass(CovidSortBean.class); //设置K3类型job2.setOutputValueClass(NullWritable.class); //设置V3类型//7、设置文件读取输出类的名字和文件的写入路径//7.1 如果目标目录存在,则删除String fsType2 = "file:///";String outputPath2 = args[2];URI uri2 = new URI(fsType);FileSystem fileSystem2 =FileSystem.get(uri2, configuration);boolean flag2 = fileSystem.exists(new Path(outputPath2));if(flag2 == true){fileSystem2.delete(new Path(outputPath2),true);}FileOutputFormat.setOutputPath(job2, new Path(outputPath2));//todo 将普通的作用包装成受控作业ControlledJob cj2 = new ControlledJob(configuration);cj2.setJob(job2);//todo 设置作业之间的依赖关系cj2.addDependingJob(cj1);//todo 创建主控制器 控制上面两个作业 一起提交JobControl jc = new JobControl("myctrl");jc.addJob(cj1);jc.addJob(cj2);//使用线程启动JobControlThread t = new Thread(jc);t.start();while (true){if(jc.allFinished()){System.out.println(jc.getSuccessfulJobList());jc.stop();break;}}}
}
运行
集群运行
hadoop jar module3_mapreduce-1.0-SNAPSHOT.jar /mapreduce/input/covid19 /mapreduce/output/covid19_bean /mapreduce/output/covid19_bean_sort#编写Shell脚本#!/bin/bash
HADOOP_PATH=/export/server/hadoop-3.3.0/bin/hadoop${HADOOP_PATH} jar /root/mapreduce/module3_mapreduce-1.0-SNAPSHOT.jar /mapreduce/input/covid19 /mapreduce/output/covi
d19_bean /mapreduce/output/covid19_bean_sort
~
介绍
1、规约是MapReduce的一种优化手段,可有可无,有了就属于锦上添花,有或者没有,都不会改变最终的结果
2、规约并不是所有MapReduce任务都能使用,前提是不能影响最终结果
3、规约主要是对每一个Map端的数据做提前的聚合,减少Map端和Reduce端传输的数据量,提交计算效率
4、规约可以理解为将Reduce端代码在Map端提前执行
5、如果你的规约代码和Reducer代码一致,则规约代码可以不用写,直接使用Reducer代码即可job.setCombinerClass(WordCountReducer.class);
代码编写步骤
1、 自定义一个combiner继承Reducer,重写reduce方法,逻辑和Reducer一样
2、 在job中设置:
job.setCombinerClass(CustomCombiner.class)
介绍
1、分组是对Map端传输过来的数据进行去重聚合
# K2 V2hello 1hello 1 --分组--> hello [1,1,1] --reduce方法--> hello 3hello 1world 1 2、分区和分组区别?分区是决定K2和V2去往哪一个Reduce进行处理 分组是在同一个Reduce内部进行聚合3、一般默认的分组就能完成分析操作,但是有时候在特定场景下,默认的分组不能满足我们的需求,则需要我们自定义分组
需求
找出美国每个州state的确诊案例数最多的县county是哪一个。该问题也是俗称的TopN问题。
select * from t_covid order by cases desc limit 1;找出美国每个州state的确诊案例数最多前三个县county是哪些。该问题也是俗称的TopN问题。
select * from t_covid order by cases desc limit 3;
思路
#如何自定义分组
1、写类继承 WritableComparator,重写Compare方法。
2、job.setGroupingComparatorClass(xxxx.class);
代码
//-----------------------------
package pack11_mapreduce_grouping;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class GroupingBean implements WritableComparable {private String state; //州private int cases; //确诊病例数public String getState() {return state;}public void setState(String state) {this.state = state;}public int getCases() {return cases;}public void setCases(int cases) {this.cases = cases;}//定义排序规则/*1:按照同一个州的确诊人数进行降序排序*/@Overridepublic int compareTo(GroupingBean o) {int result = this.state.compareTo(o.state);if(result == 0){return o.cases - this.cases;}return result;}//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(state);out.writeInt(cases);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {this.state = in.readUTF();this.cases = in.readInt();}
}
//-----------------------------
package pack11_mapreduce_grouping;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class GroupingMapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {//1:获取K2String[] array = value.toString().split(",");GroupingBean k2 = new GroupingBean();k2.setState(array[2]);k2.setCases(Integer.parseInt(array[4]));//2:获取V2,就是V1//3:将K2和V2写入上下文context.write(k2,value);}
}
//------------------------------------
package pack11_mapreduce_grouping;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;//1:自定义类去继承WritableComparator类
public class MyGroupingComparator extends WritableComparator {//2:编写无参构造,将你的自定义类传给父类/*参1:表示传给父类的JavaBean类型参2:表示允许父类通过反射造子类对象*/public MyGroupingComparator() {super(GroupingBean.class,true);}//3:在方法中指定分组的规则:两个GroupingBean对象只要你们的state(州)是一样的,就应该分到同一组//这个方法会被自动调用,只要该方法返回0,则两个GroupingBean对象就分到同一组@Override // GroupingBean GroupingBeanpublic int compare(WritableComparable a, WritableComparable b) {GroupingBean g1 = (GroupingBean) a;GroupingBean g2 = (GroupingBean) b;//如果g1和g2的州state同,则应该return 0,则这两个对象就会被分到同一组//if(g1.getState().equals(g2.getState())) {// return 0;//}else{// return 1;//}return g1.getState().compareTo(g2.getState());}
}//------------------------------------package pack11_mapreduce_grouping;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class GroupingReducer extends Reducer {@Overrideprotected void reduce(GroupingBean key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {//1:得到K3//2:得到V3//3:将K3和V3写入上下文中int count = 1;for (Text value : values) {context.write(value,NullWritable.get());if(++count > 1) {break;}}}
}
//------------------------------------------
package pack11_mapreduce_grouping;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import pack08_covid_sort.CovidSortBean;
import pack08_covid_sort.CovidSortDriver;
import pack08_covid_sort.CovidSortMapper;
import pack08_covid_sort.CovidSortReducer;
import pack10_mapreduce_combiner.WorCountMapper;
import pack10_mapreduce_combiner.WordCountDriver;
import pack10_mapreduce_combiner.WordCountReducer;import java.net.URI;public class GroupingDriver {public static void main(String[] args) throws Exception {//1:创建Job任务对象Configuration configuration = new Configuration();//configuration.set("参数名字","参数值");Job job = Job.getInstance(configuration, "grouping_demo");//2、设置置作业驱动类job.setJarByClass(GroupingDriver.class);//3、设置文件读取输入类的名字和文件的读取路径FileInputFormat.addInputPath(job, new Path(args[0]));//4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型job.setMapperClass(GroupingMapper.class);job.setMapOutputKeyClass(GroupingBean.class); //设置K2类型job.setMapOutputValueClass(Text.class); //设置V2类型//5:设置分区、[排序],规约、分组(保留)job.setGroupingComparatorClass(MyGroupingComparator.class);//6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型job.setReducerClass(GroupingReducer.class);job.setOutputKeyClass(Text.class); //设置K3类型job.setOutputValueClass(NullWritable.class); //设置V3类型//7、设置文件读取输出类的名字和文件的写入路径//7.1 如果目标目录存在,则删除String fsType = "file:///";//String outputPath = "file:///D:\\output\\wordcount";//String fsType = "hdfs://node1:8020";//String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount";String outputPath = args[1];URI uri = new URI(fsType);FileSystem fileSystem =FileSystem.get(uri, configuration);boolean flag = fileSystem.exists(new Path(outputPath));if(flag == true){fileSystem.delete(new Path(outputPath),true);}FileOutputFormat.setOutputPath(job, new Path(outputPath));//8、将设置好的job交给Yarn集群去执行// 提交作业并等待执行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);}
}
介绍
1、Reduce Join是在Reduce完成Join操作
2、Reduce端Join,Join的文件在Map阶段K2就是Join字段
3、Reduce会存在数据倾斜的风险,如果存在该文件,则可以使用MapJoin来解决
4、Reduce端Join的代码必须放在集群运行,不能在本地运行
案例思路
代码
//------------------------------------
package pack12_reduce_join;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1:确定读取的是哪个源数据文件FileSplit fileSplit = (FileSplit) context.getInputSplit(); //获取文件切片String fileName = fileSplit.getPath().getName(); //获取源文件的名字String[] array = value.toString().split("\\|");//2:处理订单文件if ("itheima_order_goods.txt".equals(fileName)) { //订单文件//2.1:获取K2String k2 = array[1];//2.2:获取v2String v2 = "o_"+array[0] + "\t" + array[2];//2.3:将k2和v2写入上下文中context.write(new Text(k2), new Text(v2));}//3:处理商品文件if ("itheima_goods.txt".equals(fileName)) { //商品文件//3.1 获取K2String k2 = array[0];String v2 = "g_"+array[0] + "\t" + array[2];//3.2:将k2和v2写入上下文中context.write(new Text(k2), new Text(v2));}}
}//-------------------------------------------
package pack12_reduce_join;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;public class ReduceJoinReducer extends Reducer {ArrayList orderList = new ArrayList<>();@Overrideprotected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {orderList.clear();String goods_value="";//1: 遍历集合,分别获取订单信息和商品信息for (Text value : values) {if(value.toString().startsWith("o_")){ //订单信息orderList.add(value.toString().substring(2)); // o_12020203}if(value.toString().startsWith("g_")){ //商品信息goods_value = value.toString().substring(2);}}//2:将订单信息和商品信息进行拼接for (String order : orderList) {System.out.println(order);context.write(new Text(order+"\t"+goods_value),NullWritable.get());}}
}//---------------------------------------
package pack12_reduce_join;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class ReduceJoinDriver {public static void main(String[] args) throws Exception {//1:创建Job任务对象Configuration configuration = new Configuration();//configuration.set("参数名字","参数值");Job job = Job.getInstance(configuration, "reduce_join_demo");//2、设置置作业驱动类job.setJarByClass(ReduceJoinDriver.class);//3、设置文件读取输入类的名字和文件的读取路径FileInputFormat.addInputPath(job, new Path(args[0]));//4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型job.setMapperClass(ReduceJoinMapper.class);job.setMapOutputKeyClass(Text.class); //设置K2类型job.setMapOutputValueClass(Text.class); //设置V2类型//5:设置分区、[排序],规约、分组(保留)//6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型job.setReducerClass(ReduceJoinReducer.class);job.setOutputKeyClass(Text.class); //设置K3类型job.setOutputValueClass(NullWritable.class); //设置V3类型//7、设置文件读取输出类的名字和文件的写入路径//7.1 如果目标目录存在,则删除String fsType = "file:///";//String outputPath = "file:///D:\\output\\wordcount";//String fsType = "hdfs://node1:8020";//String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount";String outputPath = args[1];URI uri = new URI(fsType);FileSystem fileSystem =FileSystem.get(uri, configuration);boolean flag = fileSystem.exists(new Path(outputPath));if(flag == true){fileSystem.delete(new Path(outputPath),true);}FileOutputFormat.setOutputPath(job, new Path(outputPath));//8、将设置好的job交给Yarn集群去执行// 提交作业并等待执行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);}
}
介绍
1、Map端join就是在Map端将Join操作完成
2、Map端join的前提是小表Join大表,小表的大小默认是20M
3、Map端Join需要将小表存在在分布式缓存中,然后读取到每一个MapTask的本地内存的Map集合中
4、Map端Join一般不会数据倾斜问题,因为Map的数量是由数据量大小自动决定的
5、Map端Join代码不需要Reduce
案例思路
代码
package pack13_map_join;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;public class MapJoinMapper extends Mapper {HashMap goodsMap = new HashMap<>();/*** setup方法会在map方法执行之前先执行,而且只会执行一次,主要用来做初始化工作* @param context* @throws IOException* @throws InterruptedException*///将小表从分布式缓存中读取,存入Map集合@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//1:获取分布式缓存中文件的输入流BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt")));String line = null;while ((line = bufferedReader.readLine()) != null){String[] array = line.split("\\|");goodsMap.put(array[0], array[2]);}/*{100101,四川果冻橙6个约180g/个}{100102,鲜丰水果秭归脐橙中华红}*/}@Overrideprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {//1:得到K2String[] array = value.toString().split("\\|");String k2 = array[1];String v2 = array[0] + "\t" + array[2];//2:将K2和Map集合进行JoinString mapValue = goodsMap.get(k2);context.write(new Text(v2 + "\t" + mapValue), NullWritable.get());}
}//----------------------------------------package pack13_map_join;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class MapJoinDriver {public static void main(String[] args) throws Exception{Configuration configuration = new Configuration();//1:创建一个Job对象Job job = Job.getInstance(configuration, "map_join");//2:对Job进行设置//2.1 设置当前的主类的名字job.setJarByClass(MapJoinDriver.class);//2.2 设置数据读取的路径(大表路径)FileInputFormat.addInputPath(job,new Path("hdfs://node1:8020/mapreduce/input/map_join/big_file"));//2.3 指定你自定义的Mapper是哪个类及K2和V2的类型job.setMapperClass(MapJoinMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//2.3 指定你自定义的Reducer是哪个类及K3和V3的类型//job.setReducerClass(ReduceJoinReducer.class);//job.setOutputKeyClass(Text.class);//job.setOutputValueClass(NullWritable.class);//将小表存入分布式缓存job.addCacheFile(new URI("hdfs://node1:8020/mapreduce/input/map_join/small_file/itheima_goods.txt"));//2.4 设置数据输出的路径--该目录要求不能存在,否则报错//Path outputPath = new Path("file:///D:\\output\\wordcount");Path outputPath = new Path("hdfs://node1:8020/output/map_join");FileOutputFormat.setOutputPath(job,outputPath);//2.5 设置Shuffle的分组类FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());boolean is_exists = fileSystem.exists(outputPath);if(is_exists == true){//如果目标文件存在,则删除fileSystem.delete(outputPath,true);}//3:将Job提交为Yarn执行boolean bl = job.waitForCompletion(true);//4:退出任务进程,释放资源System.exit(bl ? 0 : 1);}
}
MapReduce慢的原因
1、MapReduce在运行的过程中,要经过多次的IO操作,数据要多次落硬盘
2、后期几乎所有大数据计算框架都是基于内存处理MR = 文件---》内存 ---》硬盘 --》内存 ---》文件Spark = 文件---》内存 ---》内存 --》内存 ---》文件
下一篇:es6 常见规范