SparkSQL使用UDF函数代替MySQL空间函数读取MySQL空间字段
2023-04-18 13:06:00 时间
一、问题描述
SparkSQL虽然可以访问MySQL数据,但是对于MySQL的空间字段,SparkSQL并没有提供内置函数去解析
二、问题分析
SparkSQL没有内置函数解析空间类型,需要手动编写UDF函数实现 SparkSQL网络传输的数据格式是Byte数组,返回的数据格式中没有Geometry类型,需要将Geometry类型转成String类型返回
三、代码实现
1、自定义UDF函数
@throws[Exception]
def sparkUDFSTAsText(geometryAsBytes: Array[Byte]): Geometry = {
var dbGeometry: Geometry = null
if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes")
//first four bytes of the geometry are the SRID,
//followed by the actual WKB. Determine the SRID
//here
val sridBytes = new Array[Byte](4)
System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4)
val bigEndian: Boolean = geometryAsBytes(4) == 0x00
var srid = 0
if (bigEndian) for (i <- 0 until sridBytes.length) {
srid = (srid << 8) + (sridBytes(i) & 0xff)
}
else for (i <- 0 until sridBytes.length) {
srid += (sridBytes(i) & 0xff) << (8 * i)
}
//use the JTS WKBReader for WKB parsing
val wkbReader = new WKBReader
//copy the byte array, removing the first four
//SRID bytes
val wkb = new Array[Byte](geometryAsBytes.length - 4)
System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length)
dbGeometry = wkbReader.read(wkb)
dbGeometry.setSRID(srid)
dbGeometry
}
Java版
public Geometry sparkUDFSTAsText(byte[] geometryAsBytes) throws Exception {
Geometry dbGeometry = null;
if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes");
byte[] sridBytes = new byte[4];
System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4);
boolean bigEndian = geometryAsBytes[4] == 0x00;
int srid = 0;
if(bigEndian) {
for(int i=0; i<sridBytes.length; i++) {
srid = (srid << 8) + (sridBytes[i] & 0xff);
}
} else {
for(int i=0; i<sridBytes.length; i++) {
srid += (sridBytes[i] & 0xff) << (8 * i);
}
}
WKBReader wkbReader = new WKBReader();
byte[] wkb = new byte[geometryAsBytes.length - 4];
System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length);
dbGeometry = wkbReader.read(wkb);
dbGeometry.setSRID(srid);
return dbGeometry;
}
2、SparkSQL调用UDF函数
def toGeometryText(binary: Array[Byte]) = sparkUDFSTAsText(binary).toText
spark.udf.register("ST_ASTEXT",toGeometryText(_))
val rddROW: RDD[Row] = spark.sql("SELECT id, ST_ASTEXT(point), ST_ASTEXT(polygon) FROM t_point_polygon").limit(10).rdd
四、知识拓展
1、MySQL中的空间扩展
https://www.mysqlzh.com/doc/172.html
http://dcx.sap.com/1201/zh/dbspatial/pg-api-spatial-st-geometry-type.html
2、MySQL中的空间类型
相关文章
- 直接在代码里面对list集合进行分页
- .NET Framework 4.5新特性详解
- 大数据的简要介绍
- 大数据的由来
- 高斯混合模型的自然梯度变量推理
- timing-wheel 仿Kafka实现的时间轮算法
- 使用Navicat软件连接自建数据库(Linux系统)
- 那一天,我被Redis主从架构支配的恐惧
- Redis 深入了解键的过期时间
- C#使用委托调用实现用户端等待闪屏
- 基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
- GRAND | 转录调控网络预测数据库
- JFreeChart API中文文档
- 临床相关突变查询数据库
- TIGER | 人类胰岛基因变化查询数据库
- 视频边缘计算网关EasyNVR在视频整体监控解决方案中的应用分析
- Apache Arrow - 大数据在数据湖后的下一个风向标
- 常见的电商数据指标体系
- AKShare-艺人数据-艺人流量价值
- MySQL中多表联合查询与子查询的这些区别,你可能不知道!