zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Spark实践-日志查询

日志Spark 查询 实践
2023-09-27 14:26:36 时间
spark.executor.id driver spark.externalBlockStore.folderName spark-5242ec5b-3653-42e4-9ba2-da3ef515a1d5 spark.master local[1] spark.scheduler.mode FIFO
 "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +

 "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +

 "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +

 ".NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " +

 "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" " +

 "62.24.11.25 images.com 1358492167 - Whatup",

 "10.10.10.10 - \"FRED\" [18/Jan/2013:18:02:37 +1100] \"GET http://images.com/2013/Generic.jpg " +

 "HTTP/1.1\" 304 306 \"http:/referall.com\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; " +

 "GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR " +

 "3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " +

 "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +

 "0 73.23.2.15 images.com 1358492557 - Whatup"

思路:
1.利用正则表达式提取出日志特征,然后map在分片后的RDD上。

JavaPairRDD Tuple3 String, String, String , Stats extracted

2.执行reducebykey,merge相同的Stats


import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; * 日志查询 * @author jinhang public final class JavaLogQuery { //模拟日志 exampleApacheLogs public static final List String exampleApacheLogs = Lists.newArrayList( "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " + "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " + "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " + ".NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " + "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" " + "62.24.11.25 images.com 1358492167 - Whatup", "10.10.10.10 - \"FRED\" [18/Jan/2013:18:02:37 +1100] \"GET http://images.com/2013/Generic.jpg " + "HTTP/1.1\" 304 306 \"http:/referall.com\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; " + "GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR " + "3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " + "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " + "0 73.23.2.15 images.com 1358492557 - Whatup"); public static final Pattern apacheLogRegex = Pattern.compile( "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*"); public static class Stats implements Serializable { private final int count; private final int numBytes; public Stats(int count, int numBytes) { this.count = count; this.numBytes = numBytes; public Stats merge(Stats other) { return new Stats(count + other.count, numBytes + other.numBytes); public String toString() { return String.format("bytes=%s\tn=%s", numBytes, count); public static Tuple3 String, String, String extractKey(String line) { Matcher m = apacheLogRegex.matcher(line); if (m.find()) { String ip = m.group(1); String user = m.group(3); String query = m.group(5); if (!user.equalsIgnoreCase("-")) { return new Tuple3 String, String, String (ip, user, query); return new Tuple3 String, String, String (null, null, null); public static Stats extractStats(String line) { Matcher m = apacheLogRegex.matcher(line); if (m.find()) { int bytes = Integer.parseInt(m.group(7)); return new Stats(1, bytes); } else { return new Stats(1, 0); public static void main(String[] args) { Logger.getLogger(JavaLogQuery.class).setLevel(Level.FATAL); SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery").setMaster("local[1]"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD String dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs); JavaPairRDD Tuple3 String, String, String , Stats extracted = dataSet.mapToPair(new PairFunction String, Tuple3 String, String, String , Stats () { @Override public Tuple2 Tuple3 String, String, String , Stats call(String s) { return new Tuple2 Tuple3 String, String, String , Stats (extractKey(s), extractStats(s)); JavaPairRDD Tuple3 String, String, String , Stats counts = extracted.reduceByKey(new Function2 Stats, Stats, Stats () { @Override public Stats call(Stats stats, Stats stats2) { return stats.merge(stats2); List Tuple2 Tuple3 String, String, String , Stats output = counts.collect(); //遍历结果 for (Tuple2 ?,? t : output) { System.out.println(t._1() + "\t" + t._2()); jsc.stop();

这里写图片描述

分析下执行过程:

加载SLF4J

Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/D:/JavaProject/spark-demo/lib/spark-assembly-1.6.1-hadoop2.0.0-mr1-cdh4.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/D:/JavaProject/spark-demo/lib/spark-examples-1.6.1-hadoop2.0.0-mr1-cdh4.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

初始化sparkcontext上下文

16/04/29 09:36:22 INFO SparkContext: Running Spark version 1.6.1

//-Djava.library.path=$HADOOP_HOME/lib/native/Linux-amd64-64/*.jar可以解决

16/04/29 09:36:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

16/04/29 09:36:23 INFO SecurityManager: Changing view acls to: hp

16/04/29 09:36:23 INFO SecurityManager: Changing modify acls to: hp

16/04/29 09:36:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hp); users with modify permissions: Set(hp)

16/04/29 09:36:23 INFO Utils: Successfully started service sparkDriver on port 36010.

16/04/29 09:36:23 INFO Slf4jLogger: Slf4jLogger started

16/04/29 09:36:24 INFO Remoting: Starting remoting

16/04/29 09:36:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.170.26.123:36023]

16/04/29 09:36:24 INFO Utils: Successfully started service sparkDriverActorSystem on port 36023.

16/04/29 09:36:24 INFO SparkEnv: Registering MapOutputTracker

16/04/29 09:36:24 INFO SparkEnv: Registering BlockManagerMaster

16/04/29 09:36:24 INFO DiskBlockManager: Created local directory at C:\Users\hp\AppData\Local\Temp\blockmgr-84667505-0018-439b-9627-a4360d872118

16/04/29 09:36:24 INFO MemoryStore: MemoryStore started with capacity 517.4 MB

16/04/29 09:36:24 INFO SparkEnv: Registering OutputCommitCoordinator

16/04/29 09:36:24 INFO Utils: Successfully started service SparkUI on port 4040.

16/04/29 09:36:24 INFO SparkUI: Started SparkUI at http://10.170.26.123:4040

16/04/29 09:36:24 INFO Executor: Starting executor ID driver on host localhost

16/04/29 09:36:24 INFO Utils: Successfully started service org.apache.spark.network.netty.NettyBlockTransferService on port 36030.

16/04/29 09:36:24 INFO NettyBlockTransferService: Server created on 36030

16/04/29 09:36:24 INFO BlockManagerMaster: Trying to register BlockManager

16/04/29 09:36:24 INFO BlockManagerMasterEndpoint: Registering block manager localhost:36030 with 517.4 MB RAM, BlockManagerId(driver, localhost, 36030)

16/04/29 09:36:24 INFO BlockManagerMaster: Registered BlockManager

这里写图片描述
SecurityManager
‘sparkDriver’ on port 36010
Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.170.26.123:36023]
MapOutputTracker
BlockManagerMaster
DiskBlockManager: Created local directory at C:\Users\hp\AppData\Local\Temp\blockmgr-84667505-0018-439b-9627-
OutputCommitCoordinator
Executor
org.apache.spark.network.netty.NettyBlockTransferService
这几个是几个主要过程。

开始执行job

16/04/29 10:12:31 INFO SparkContext: Starting job: collect at JavaLogQuery.java:112

16/04/29 10:12:31 INFO DAGScheduler: Registering RDD 1 (mapToPair at JavaLogQuery.java:98)

16/04/29 10:12:31 INFO DAGScheduler: Got job 0 (collect at JavaLogQuery.java:112) with 1 output partitions

16/04/29 10:12:31 INFO DAGScheduler: Final stage: ResultStage 1 (collect at JavaLogQuery.java:112)

16/04/29 10:12:31 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

16/04/29 10:12:31 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

16/04/29 10:12:31 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at mapToPair at JavaLogQuery.java:98), which has no missing parents

16/04/29 10:12:31 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes

16/04/29 10:12:31 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.1 KB, free 3.1 KB)

16/04/29 10:12:31 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1897.0 B, free 5.0 KB)

16/04/29 10:12:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:36394 (size: 1897.0 B, free: 517.4 MB)

16/04/29 10:12:31 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006

16/04/29 10:12:31 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at mapToPair at JavaLogQuery.java:98)

16/04/29 10:12:31 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/04/29 10:12:31 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 3033 bytes)

16/04/29 10:12:31 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/04/29 10:12:32 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1158 bytes result sent to driver

16/04/29 10:12:32 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 320 ms on localhost (1/1)

16/04/29 10:12:32 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 

16/04/29 10:12:32 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at JavaLogQuery.java:98) finished in 0.349 s

16/04/29 10:12:32 INFO DAGScheduler: looking for newly runnable stages

16/04/29 10:12:32 INFO DAGScheduler: running: Set()

16/04/29 10:12:32 INFO DAGScheduler: waiting: Set(ResultStage 1)

16/04/29 10:12:32 INFO DAGScheduler: failed: Set()

16/04/29 10:12:32 INFO DAGScheduler: Submitting ResultStage 1 ***(ShuffledRDD[2] at reduceByKey at JavaLogQuery.java:105), which has no missing parents***

16/04/29 10:12:32 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.9 KB, free 7.9 KB)

16/04/29 10:12:32 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1746.0 B, free 9.6 KB)

16/04/29 10:12:32 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:36394 (size: 1746.0 B, free: 517.4 MB)

16/04/29 10:12:32 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/04/29 10:12:32 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[2] at reduceByKey at JavaLogQuery.java:105)

16/04/29 10:12:32 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

16/04/29 10:12:32 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)

16/04/29 10:12:32 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/04/29 10:12:32 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/29 10:12:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 16 ms

16/04/29 10:12:32 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1449 bytes result sent to driver

16/04/29 10:12:32 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 107 ms on localhost (1/1)

16/04/29 10:12:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 

16/04/29 10:12:32 INFO DAGScheduler: ResultStage 1 (collect at JavaLogQuery.java:112) finished in 0.108 s

16/04/29 10:12:32 INFO DAGScheduler: Job 0 finished: collect at JavaLogQuery.java:112, took 0.850227 s

(10.10.10.10,"FRED",GET http://images.com/2013/Generic.jpg HTTP/1.1) bytes=621 n=2

16/04/29 10:12:56 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:36394 in memory (size: 1746.0 B, free: 517.4 MB)

这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述

结束


16/04/29 10:12:56 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:36394 in memory (size: 1746.0 B, free: 517.4 MB)

16/04/29 10:16:13 INFO ContextCleaner: Cleaned accumulator 2

16/04/29 10:16:13 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:36394 in memory (size: 1897.0 B, free: 517.4 MB)

16/04/29 10:16:13 INFO ContextCleaner: Cleaned accumulator 1

16/04/29 10:24:29 WARN QueuedThreadPool: 5 threads could not be stopped

16/04/29 10:24:29 INFO SparkUI: Stopped Spark web UI at http://10.170.26.123:4040

16/04/29 10:24:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/04/29 10:24:29 INFO MemoryStore: MemoryStore cleared

16/04/29 10:24:29 INFO BlockManager: BlockManager stopped

16/04/29 10:24:29 INFO BlockManagerMaster: BlockManagerMaster stopped

16/04/29 10:24:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/04/29 10:24:30 INFO SparkContext: Successfully stopped SparkContext

16/04/29 10:24:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/04/29 10:24:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

16/04/29 10:24:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

总结
java的代码实现spark API虽然代码冗余很多,但是很清楚显示了spark的执行过程,先比于scala的代码,较为清楚,而且java的代码和其他的项目结合效果可能好些。
模拟IDC spark读写MaxCompute实践 现有湖仓一体架构是以 MaxCompute 为中心读写 Hadoop 集群数据,有些线下 IDC 场景,客户不愿意对公网暴露集群内部信息,需要从 Hadoop 集群发起访问云上的数据。本文以 EMR (云上 Hadoop)方式模拟本地 Hadoop 集群访问 MaxCompute数据。
【小白视角】大数据基础实践(七) Spark的基本操作 1. Spark概述 1.1 背景 1.2 特点 1.3 使用趋势 2. Spark生态系统 2.1 Spark与Hadoop的对比。 2.2 Job 2.3 容错率 2.4 通用性 2.5 实际应用 2.6 Spark生态系统组件的应用场景 2.7 Spark组件 2.7.1 Spark Core 2.7.2 Spark SQL 2.7.3 Spark Streaming 2.7.4 MLlib 2.7.5 Graphx 2.7.6 Cluster Managers 3. Spark运行架构 3.1 基本概念 3.2 架构设计 3.3 Spark 运行基本流程 3.4 Spark 运行
阿里云天池Apache Spark落幕:AI医疗进入落地实践深水期,达摩院如何用生态破局? 一次疫情,让阿里达摩院医疗 AI 团队一战成名。 他们利用整个假期,疫情爆发初期迅速将技术落地,率先在「郑州小汤山」落地的第一套 CT 影像识别系统代码和图片已经被分别收藏在中国国家博物馆和中国科技馆。 疫情之后,达摩院医疗 AI 产品迅速进入落地阶段,成长与痛点并存。 面对技术落地面临的普遍困境,达摩院以「数字人体」系列比赛为抓手,逐渐搭建起行业生态。