zl程序教程

您现在的位置是:首页 >  其他

当前栏目

spark 开发考题!面试题! 根据IP地址查询归属地,统计归属地IP地址数

面试题统计Spark开发 查询 根据 IP地址 归属
2023-09-27 14:26:50 时间

spark开发考题!面试题! 网络开发式运营人才选拔!

题目:

现有一批IP地址(详见文件ip.txt),需要根据IP地址库信息(详见文件iplib.txt),查询归属地信息,并统计每一个归属地IP地址的总数。
请编写spark任务并在测试环境上提交运行。统计结果请以文本文件格式请保存在xxxx目录下,源代码请保存在xxxx目录下。
运行结果格式示例:
江苏  移动 241
江西  电信 67
河南  联通 89 


IPLib_test格式:

1.1.1.0 1.1.1.255 广东 电信
2.0.0.0 2.1.1.255 天津 电信

......


IPs_test 格式:

101.100.24.56
144.0.1.100

......


 

package com.dt.spark.chinatelecomExam;


public class IpUtil {
    public static boolean ipExistsInRange(String ip, String ipSection) {

        ipSection = ipSection.trim();

        ip = ip.trim();

        int idx = ipSection.indexOf('-');

        String beginIP = ipSection.substring(0, idx);

        String endIP = ipSection.substring(idx + 1);

        return getIp2long(beginIP) <= getIp2long(ip) && getIp2long(ip) <= getIp2long(endIP);

    }


    public static long getIp2long(String ip) {

        ip = ip.trim();

        String[] ips = ip.split("\\.");

        long ip2long = 0L;

        for (int i = 0; i < 4; ++i) {

            ip2long = ip2long << 8 | Integer.parseInt(ips[i]);

        }

        return ip2long;

    }


}



package com.dt.spark.Exam

import scala.collection.immutable.HashSet
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import util.control.Breaks._
import scala.collection.mutable

/**
  *  
  */
object IPQueryForTest {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    var masterUrl = "local[8]"
    if (args.length > 0) {
      masterUrl = args(0)
    }
    val sparkConf = new SparkConf().setMaster(masterUrl).setAppName("IPQueryForTest")
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val sc = spark.sparkContext
    //数据存放的目录;
    var dataPath = "data/Exam/"
    /**
      * 读取数据,用什么方式读取数据呢?在这里是使用RDD!
      */
    val IPLibRDD: RDD[String] = sc.textFile(dataPath + "IPLib_test.txt")
    val IPsRDD: RDD[String] = sc.textFile(dataPath + "IPs_test.txt")

    /**
      * 在Spark中如何实现mapjoin呢,显然是要借助于Broadcast,会把数据广播到Executor级别让该Executor上的所有任务共享
      * 该唯一的数据,而不是每次运行Task的时候都要发送一份数据的拷贝,这显著的降低了网络数据的传输和JVM内存的消耗
      */
    val IPLibSet: HashSet[String] = HashSet() ++ IPLibRDD.collect()
    val IPLibSetBroadcast: Broadcast[HashSet[String]] = sc.broadcast(IPLibSet)

    println("纯粹通过RDD的方式实现IP地址统计分析:")
    val resultIPQuery: RDD[(String, Long)] = IPsRDD.map(line => {

      var ip = line.trim
      var exists = false
      var locationKey: String = "NoMatchIPQuery"
      var IPValue: Long = 0L
      val iplibSets: HashSet[String] = IPLibSetBroadcast.value
      for (iplib <- iplibSets) {
        val ipSection: String = iplib.split("\t")(0).trim + "-" + iplib.split("\t")(1).trim
        breakable {
          exists = IpUtil.ipExistsInRange(ip, ipSection)
          if (exists == true) {
            locationKey = iplib.split("\t")(2).trim + "\t" + iplib.split("\t")(3).trim
            IPValue = 1L
            break()
          }
        }
      }
      (locationKey, IPValue)
    })

    val result = resultIPQuery.filter(!_._1.contains("NoMatchIPQuery")).reduceByKey(_ + _)
    result.map(line =>
      line._1 + "\t" + line._2).saveAsTextFile(dataPath + "IPQueryResult.txt")
  }
}