一:二次排序自定义GroupingComparator
1.问题:现有如下订单数据,需要求出每一个订单中成交金额最大的一笔交易
订单id | 商品id | 成交金额 |
Order_0000001 | Pdt_01 | 222.8 |
Order_0000001 | Pdt_05 | 25.8 |
Order_0000002 | Pdt_03 | 522.8 |
Order_0000002 | Pdt_04 | 122.4 |
Order_0000002 | Pdt_05 | 722.4 |
Order_0000003 | Pdt_01 | 222.8 |
Order_0000003 | Pdt_02 | 22.8 |
Order_0000004 | Pdt_03 | 522.8 |
Order_0000004 | Pdt_04 | 122.4 |
Order_0000004 | Pdt_05 | 1034.4 |
2.分析
a、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
b、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值
3.实现
1 package com.oracle.www.secondarySort_Max2; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Job;12 import org.apache.hadoop.mapreduce.Mapper;13 import org.apache.hadoop.mapreduce.Reducer;14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;;16 17 public class SecondarySort {18 static class MyMapper extends Mapper{19 @Override20 protected void map(LongWritable key, Text value,21 Mapper .Context context)22 throws IOException, InterruptedException {23 String[] datas = value.toString().split("\t");24 OrderBean bean = new OrderBean(datas[0], datas[1], Double.parseDouble(datas[2]));25 context.write(bean, NullWritable.get());26 }27 }28 29 static class MyReducer extends Reducer {30 @Override31 protected void reduce(OrderBean key, Iterable value,32 Reducer .Context context)33 throws IOException, InterruptedException {34 context.write(key, NullWritable.get());35 }36 }37 38 public static void main(String[] args) throws ClassNotFoundException, InterruptedException {39 Configuration conf = new Configuration();40 try {41 Job job = Job.getInstance();42 43 job.setJarByClass(SecondarySort.class);44 job.setMapperClass(MyMapper.class);45 job.setReducerClass(MyReducer.class);46 job.setPartitionerClass(MyPartition.class);47 job.setGroupingComparatorClass(MyGroup.class);48 49 job.setMapOutputKeyClass(OrderBean.class);50 job.setMapOutputValueClass(NullWritable.class);51 52 job.setOutputKeyClass(OrderBean.class);53 job.setOutputValueClass(NullWritable.class);54 55 job.setNumReduceTasks(3);56 57 Path outPath = new Path("hdfs://192.168.9.13:8020/Order_item_data2");58 FileSystem fs = outPath.getFileSystem(conf);59 if (fs.exists(outPath)) {60 fs.delete(outPath, true);61 }62 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/Order_item"));63 FileOutputFormat.setOutputPath(job, outPath);64 job.waitForCompletion(true);65 66 } catch (IOException e) {67 // TODO Auto-generated catch block68 e.printStackTrace();69 }70 71 }72 73 }
1 package com.oracle.www.secondarySort_Max2; 2 3 import org.apache.hadoop.io.NullWritable; 4 import org.apache.hadoop.mapreduce.Partitioner; 5 6 public class MyPartition extends Partitioner{ 7 8 @Override 9 public int getPartition(OrderBean key, NullWritable value, int numPartitions) {10 return key.getOrderId().hashCode() & Integer.MAX_VALUE % numPartitions;11 }12 13 }
1 package com.oracle.www.secondarySort_Max2; 2 3 import org.apache.hadoop.io.WritableComparable; 4 import org.apache.hadoop.io.WritableComparator; 5 6 public class MyGroup extends WritableComparator { 7 public MyGroup() { 8 super(); 9 }10 11 @Override12 public int compare(WritableComparable a, WritableComparable b) {13 OrderBean bean1 = (OrderBean) a;14 OrderBean bean2 = (OrderBean) b;15 return bean1.getOrderId().compareTo(bean2.getOrderId());16 17 }18 19 }
1 package com.oracle.www.secondarySort_Max2; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.WritableComparable; 8 9 public class OrderBean implements WritableComparable{10 private String orderId;11 private String productId;12 private double price;13 14 public OrderBean() {15 16 }17 18 public OrderBean(String orderId, String productId, double price) {19 this.orderId = orderId;20 this.productId = productId;21 this.price = price;22 }23 24 public String getOrderId() {25 return orderId;26 }27 28 public void setOrderId(String orderId) {29 this.orderId = orderId;30 }31 32 public String getProductId() {33 return productId;34 }35 36 public void setProductId(String productId) {37 this.productId = productId;38 }39 40 public double getPrice() {41 return price;42 }43 44 public void setPrice(double price) {45 this.price = price;46 }47 48 /*49 * (non-Javadoc)50 * 51 * @see java.lang.Object#toString()52 */53 @Override54 public String toString() {55 return "OrderBean [orderId=" + orderId + ", productId=" + productId + ", price=" + price + "]";56 }57 58 @Override59 public void write(DataOutput out) throws IOException {60 out.writeUTF(orderId);61 out.writeUTF(productId);62 out.writeDouble(price);63 }64 65 @Override66 public void readFields(DataInput in) throws IOException {67 this.orderId = in.readUTF();68 this.productId = in.readUTF();69 this.price = in.readDouble();70 71 }72 73 @Override74 public int compareTo(OrderBean o) {75 // 先按订单id进行排序,再按销售进行排序76 int temp = this.orderId.compareTo(o.orderId);77 if (temp == 0) {78 // this.-o.79 // 0:顺序不发生改变(set集合会覆盖,list集合顺序不发生改变)80 // 1(>0):当前对象大,顺序往后放81 // -1(<0):当前对象小,顺序往前放82 double error = this.getPrice() - o.getPrice();83 if (error > 0) {84 return -1;85 } else {86 return 1;87 }88 }89 return temp;90 }91 92 }