zl程序教程

您现在的位置是:首页 >  其他

当前栏目

hadoop和spark超大矩阵点乘思路在多源数据POI融合中应用

思路hadoop应用数据Spark 矩阵 融合 POI
2023-09-14 09:15:34 时间

最近碰到一个奇葩需求,需要通过各种特征做多数据源poi数据融合。说白了就是要把各家图商以及网路上扒下来数据合成到一张表里,去掉冗余数据,增加各图商特性数据。这个问题乍看起来不难,里面其实挺大坑。首先,需要找到一个在个数据源都可行的唯一ID,作为数据挂接的唯一标识,然而这样的标识是不存在的。解决思路就是通过多个特征,计算他们之间相似度找到最可能的数据融合点。

做相似度计算是个经典问题,有很多种行之有效方法。然后对于这种多元数据,每一维度都有自己的距离测度方法,如果用统一的计算方法效果会很差。那么该如何实现这种不同维度用不同测度方法计算距离呢。如果数据量不大,可以两层for遍历每个特征用自己测度算法计算距离,串行化执行就可以。然而对于两个矩阵数据都较大问题该如何解决呢。

通过矩阵计算是必然的,同时需要并行化计算。具体怎么做呢,且看以下分析:

一、方法一

需要两步完成, Mi,kNk,j
1.Map阶段,对于矩阵 M,把列作为 key,对于矩阵 N 把行作为 key
2.Reduce阶段,对于相同 key 的值,M 矩阵和 N 矩阵的值做笛卡尔积,输出 【(M 的行)+ (N 的列值)+ (MN相乘 value 值)】

public static class ReadMapper extends Mapper<Object, Text, Text, Text>{
    public void map(Object key ,Text value,Context context) throws IOException, InterruptedException{
        String str=value.toString(); String [] strs=str.split(" ");
    if(strs[0].equals("M")&&strs.length==4){//如果是 M矩 阵,则以 j 作为 key
         context.write(new Text(strs[2]),new Text(strs[0]+" "+strs[1]+" "+strs[3]));
        } else if(strs[0].equals("N")&&strs.length==4){
        context.write(new Text(strs[1]),new Text(strs[0]+" "+strs[2]+" "+strs[3]));
    }

    }
}
    public static class WriteReducer extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            //定义连个 ArrayList 第一个存放 M,第二个存放 N
            ArrayList<String> mTrix=new ArrayList<String>();
            ArrayList<String> nTrix=new ArrayList<String>();
            for(Text value : values){
                if((value.toString()).contains("M")){
                    mTrix.add(value.toString());
                } else {
                    nTrix.add(value.toString());
                }
            }
            String[] mItems, nItems;
            //进行合并计算
            for(String m : mTrix){
                mItems=m.split(" ");
                for(String n : nTrix){
                    nItems=n.split(" ");
                }
                context.write(new Text(key+"+"+mItems[1]+"+"+nItems[1]+"+"+(Integer.parseInt(nIt ems[2])*Integer.parseInt(mItems[2]))+"+"),new Text(""));
            }
        }
    }

4.map 阶段读取上一阶段 reduce 产生的 i,j,value 值
5.把所有相同 key 的 value 值相加,输出即可

二、方法二

(1)在 Map 阶段,把来自表 M 的元素 Aij,标识成 l 条<key, value>的形式。其中 key= (i,k),k=0,1,..l-1,value=(M,j,aij);把来自表 N 的元素 Bij,标识成 m 条<key,value>形式,其中 key=(i,k),i=0,1,...,m-1,value=(‘N’,j,Bjk)。 于是乎,在Map 阶段,我们实现了这样的战术目的:通过 key,我们把参与计算 Cik 的数据 归为一类。通过 value,我们能区分元素是来自 M 还是 N,以及具体的位置。
(2)在 Reduce阶段,将上一阶段的 map 输出值,找到 MN 矩阵的对应位置相乘计入 result 中,最后输出(i,j,result)

public class SigleStep {
    public static final int WIDTH = 2;
    public static final int LENGTH = 2;
    public static final int MATRIX_K = 3;

    public static class MatrixMapper extends Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split(" ");
            if (values[0].equals("M")) {
                for (int i = 0; i < LENGTH; i++) { //把矩阵每个元素转换为 i,k,M,j,v 形式 
                      context.write(new Text(values[1]+" "+i), new Text(values[0]+" "+values[2]+" "+values[3])); 
                  }
             } else {
                  for(int i=0;i<WIDTH;i++){ 
                 //把矩阵每个元素转换为 i,k,M,j,v 形式 
                   context.write(new Text(i+" "+values[2]), new Text(values[0]+" "+values[1]+" "+values[3]));
                }
            }
        }
    }

    public static class MatrixReducer extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //对获得的五元组进行处理 
            Integer[] m_matrix = new Integer[MATRIX_K];
            Integer[] n_matrix = new Integer[MATRIX_K];
            int i = 0;
            for (Text value : values) {
                String temp[] = value.toString().split(" ");
                if (temp[0].equals("M")) {//如果是 M 矩阵
                    m_matrix[Integer.parseInt(temp[1])] = Integer.parseInt(temp[2]
                    );
                } else {
                    n_matrix[Integer.parseInt(temp[1])] = Integer.parseInt(temp[2]
                    );
                }
            }
            //对两个矩阵进行相乘相加 int result = 0;
            for (i = 0; i < MATRIX_K; i++) {
                if (m_matrix[i] != null && n_matrix[i] != null) result += m_matrix[i] * n_matrix[i];
            }
            System.out.println(key + "+++++++" + result);
            context.write(key, new
                    Text(result + "")
            );
        }
    }
}

转自:https://www.jianshu.com/p/ab23dab67b63