zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

SparkSQL使用UDF函数代替MySQL空间函数读取MySQL空间字段

2023-04-18 13:06:00 时间

一、问题描述

SparkSQL虽然可以访问MySQL数据,但是对于MySQL的空间字段,SparkSQL并没有提供内置函数去解析

二、问题分析

SparkSQL没有内置函数解析空间类型,需要手动编写UDF函数实现 SparkSQL网络传输的数据格式是Byte数组,返回的数据格式中没有Geometry类型,需要将Geometry类型转成String类型返回

三、代码实现

1、自定义UDF函数

  @throws[Exception]
  def sparkUDFSTAsText(geometryAsBytes: Array[Byte]): Geometry = {
    var dbGeometry: Geometry = null
    if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes")
    //first four bytes of the geometry are the SRID,
    //followed by the actual WKB.  Determine the SRID
    //here
    val sridBytes = new Array[Byte](4)
    System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4)
    val bigEndian: Boolean = geometryAsBytes(4) == 0x00
    var srid = 0
    if (bigEndian) for (i <- 0 until sridBytes.length) {
      srid = (srid << 8) + (sridBytes(i) & 0xff)
    }
    else for (i <- 0 until sridBytes.length) {
      srid += (sridBytes(i) & 0xff) << (8 * i)
    }
    //use the JTS WKBReader for WKB parsing
    val wkbReader = new WKBReader
    //copy the byte array, removing the first four
    //SRID bytes
    val wkb = new Array[Byte](geometryAsBytes.length - 4)
    System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length)
    dbGeometry = wkbReader.read(wkb)
    dbGeometry.setSRID(srid)
    dbGeometry
  }

Java版

    public Geometry sparkUDFSTAsText(byte[] geometryAsBytes) throws Exception {
        Geometry dbGeometry = null;
        if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes");
        byte[] sridBytes = new byte[4];
        System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4);
        boolean bigEndian = geometryAsBytes[4] == 0x00;
        int srid = 0;
        if(bigEndian) {
            for(int i=0; i<sridBytes.length; i++) {
                srid = (srid << 8) + (sridBytes[i] & 0xff);
            }
        } else {
            for(int i=0; i<sridBytes.length; i++) {
                srid += (sridBytes[i] & 0xff) << (8 * i);
            }
        }
        WKBReader wkbReader = new WKBReader();
        byte[] wkb = new byte[geometryAsBytes.length - 4];
        System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length);
        dbGeometry = wkbReader.read(wkb);
        dbGeometry.setSRID(srid);
        return dbGeometry;
    }

2、SparkSQL调用UDF函数

    def toGeometryText(binary: Array[Byte]) = sparkUDFSTAsText(binary).toText
    spark.udf.register("ST_ASTEXT",toGeometryText(_))
    val rddROW: RDD[Row] = spark.sql("SELECT id, ST_ASTEXT(point), ST_ASTEXT(polygon) FROM t_point_polygon").limit(10).rdd

四、知识拓展

1、MySQL中的空间扩展

https://www.mysqlzh.com/doc/172.html

http://dcx.sap.com/1201/zh/dbspatial/pg-api-spatial-st-geometry-type.html

2、MySQL中的空间类型