zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

一种低成本的数据访问行为分析方法

2023-03-02 10:27:17 时间

越来越多的企业构建数据湖以加速企业的数字化转型,提升企业的市场竞争能力,数据湖内的数据是企业的重要资产,确保数据资产被正确的用户访问是保护数据资产的重要工作,分析数据的访问行为是保护数据资产的重要方法。

本文受众:数据湖架构师、云中心成本管理员、安全管理员

一. S3 Bucket访问日志记录方法

Amazon官方提供了两种S3 Bucket的访问日志记录方法,官方也提供了详细的比较

Performance and Cost AWS CloudTrail Amazon S3 Server Logs
Price Management events (first delivery) are free; data events incur a fee, in addition to storage of logs No additional cost in addition to storage of logs
Speed of log delivery Data events every 5 mins; management events every 15 mins Within a few hours
Log format JSON Log file with space-separated, newline-delimited records

它们提供了类似功能,但是在时效性、成本上有所差异。如果客户对于数据资产访问行为的成本敏感,本文提供了一种低成本、时效性较高的解决方法。

二. 详细实现方法

Athena可以访问分析S3 Server Logs,官方给出了详细的方法

CREATE EXTERNAL TABLE `s3_access_logs_db.mybucket_logs`(
  `bucketowner` STRING, 
  `bucket_name` STRING, 
  `requestdatetime` STRING, 
  `remoteip` STRING, 
  `requester` STRING, 
  `requestid` STRING, 
  `operation` STRING, 
  `key` STRING, 
  `request_uri` STRING, 
  `httpstatus` STRING, 
  `errorcode` STRING, 
  `bytessent` BIGINT, 
  `objectsize` BIGINT, 
  `totaltime` STRING, 
  `turnaroundtime` STRING, 
  `referrer` STRING, 
  `useragent` STRING, 
  `versionid` STRING, 
  `hostid` STRING, 
  `sigv` STRING, 
  `ciphersuite` STRING, 
  `authtype` STRING, 
  `endpoint` STRING, 
  `tlsversion` STRING)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.RegexSerDe' 
WITH SERDEPROPERTIES ( 
  'input.regex'='([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://awsexamplebucket1-logs/prefix/'

创建该表后,用户就可以直接访问S3 Bucket的访问日志数据,S3 Bucket内的日志文件组织如下:

细心的工程师会发现,

  • 随着数据资产的增多,频繁的访问,日志量会越来越多。
  • 由于数据没有分区,访问某一时间段的数据会变得很慢。
  • 同时针对每个数据资产创建单独的访问日志表也会带来高额的管理成本。

针对以上问题,本文提供了如下的功能增强:

  • 集中管理数据资产访问日志。
  • 对数据进行分区,提供按数据资产,时间:年、月、日、小时的细粒度分析能力。

实现架构如下:

第一步:创建支持分区功能的S3 Server Logs表

创建存储S3 Server Logs的Bucket,并且依据内部数据治理标准设置Lifecycle。

CREATE EXTERNAL TABLE `salog_rawdata_4`(
  `bucketowner` string COMMENT '', 
  `bucket_name` string COMMENT '', 
  `requestdatetime` string COMMENT '', 
  `remoteip` string COMMENT '', 
  `requester` string COMMENT '', 
  `requestid` string COMMENT '', 
  `operation` string COMMENT '', 
  `objkey` string COMMENT '', 
  `request_uri` string COMMENT '', 
  `httpstatus` string COMMENT '', 
  `errorcode` string COMMENT '', 
  `bytessent` string COMMENT '', 
  `objectsize` string COMMENT '', 
  `totaltime` string COMMENT '', 
  `turnaroundtime` string COMMENT '', 
  `referrer` string COMMENT '', 
  `useragent` string COMMENT '', 
  `versionid` string COMMENT '', 
  `hostid` string COMMENT '', 
  `sigv` string COMMENT '', 
  `ciphersuite` string COMMENT '', 
  `authtype` string COMMENT '', 
  `request_endpoint` string COMMENT '', 
  `tlsversion` string COMMENT '')
PARTITIONED BY ( 
  `reqbucket` string, 
  `reqyear` bigint, 
  `reqmonth` bigint, 
  `reqday` bigint, 
  `reqhour` bigint)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.RegexSerDe' 
WITH SERDEPROPERTIES ( 
  'input.regex'='([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://serveraccesslog-bucket/'

第二步:启用S3 Bucket Server Logs

设置Target  bucket

S3://serveraccesslog-bucket/reqbucket={current-bucket-name}/

第三步:创建Lambda,按照partition规范重组日志数据

例如:fn-reorg

import json
import boto3
import re
import os
 
def lambda_handler(event, context):
    # TODO implement
    bucketName = os.getenv('bucketName')
    salogTableName = os.getenv('salogTableName')
    athena_cache = os.getenv('AthenaCache')
    dsBucketName_List = event['dsBucketNameList']
 
    client = boto3.client('s3')
    for dsBucketName in dsBucketName_List:
        l2_prefix = client.list_objects_v2(Bucket = bucketName, Delimiter = '/', Prefix = 'reqbucket='+dsBucketName+'/')
        #print(l2_prefix)
        print("----------------------%s----------------------"%dsBucketName)
        if l2_prefix.get('Contents',False):
            for log in l2_prefix['Contents']:
                log_name = log['Key']
                #print(log_name)
                match = re.search('(?P<requestedbucket>reqbucket=.*)/(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})-(?P<hour>\d{2})',log_name,re.ASCII)
                match1 = re.search('reqbucket=.*/(?P<logshortname>\d{4}-.*)', log_name, re.ASCII)
                if match and match1:
                    #print(match.group('requestedbucket'))
                    requested_bucket=match.group('requestedbucket')
                    #print(match.group('year'))
                    requested_year='reqyear=' + match.group('year')
                    #print(match.group('month'))
                    requested_month='reqmonth='+ match.group('month')
                    #print(match.group('day'))
                    requested_day='reqday='+ match.group('day')
                    #print(match.group('hour'))
                    requested_hour='reqhour='+ match.group('hour')
                    #print(match1.group('logshortname'))
                    log_short_name=match1.group('logshortname')
                    # copy object
                    client.copy({'Bucket': bucketName,'Key': log_name}, Bucket = bucketName, Key = requested_bucket + '/' + requested_year + '/' + requested_month + '/' + requested_day + '/' + requested_hour + '/' + log_short_name)
                    print("The log file: %s has been moved!"%log_name)
                    # delete object
                    client.delete_object(Bucket = bucketName, Key = log_name)
            print('--------------------------------------------------------')
    
   
    return {
        'statusCode': 200,
        'body': json.dumps('S3 Server Log re-organize function!')
    }

重组后的S3 Bucket结构

serveraccesslog/reqbucket=airports/reqyear=2022/reqmonth=04/reqday=07/reqhour=03/

创建刷新分区的Lambda

例如:fn-refresh-partition

import json
import boto3
import re
import os
 
def lambda_handler(event, context):
    # TODO implement
    bucketName = os.getenv('bucketName')
    salogTableName = os.getenv('salogTableName')
    athena_cache = os.getenv('AthenaCache')
    dsBucketName_List = event['dsBucketNameList']
 
    client1 = boto3.client('athena')
    rsp=client1.start_query_execution(QueryString='msck repair table '+ salogTableName, ResultConfiguration={'OutputLocation': athena_cache})
    #print(rsp)
    
    query_execution_id = rsp["QueryExecutionId"]
    
    # Query in progress
    print("Refresh Partition", end="", flush=True)
    while 1:
        print(".", end="", flush=True)
        query_status = client1.get_query_execution(QueryExecutionId=query_execution_id)
    
        #print(query_status["QueryExecutions"][0]["Status"]["State"])
        if query_status["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
            print("\nDone",flush=True)
            break
            
    return {
        'statusCode': 200,
        'body': json.dumps('S3 Server Log re-organize function!')
    }

第四步:创建EventBridge Rule

fn-reorg

调度频率:15分钟

{
  "dsBucketNameList": ["airports", "airport-data", "airport-analysis"]
}

fn-refresh-partition

调度频率:15分钟。

本篇作者

杨帅军

资深数据架构师,专注于数据处理。目前主要为车企提供数据治理服务。