第12卷第5期 智能系统学报 Vol.12 No.5 2017年10月 CAAI Transactions on Intelligent Systems 0ct.2017 D0I:10.11992/is.201706016 网络出版地址:htp:/kns.cmki.net/kcms/detail/23.1538.TP.20171021.1350.014.html 基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 何明,常盟盟,刘郭洋2,顾程祥2,彭继克2 (1.北京工业大学信息学部,北京100124:2.海通证券股份有限公司信息技术管理部,上海200001) 摘要:随着计算机和网络技术的迅猛发展以及数据获取手段的不断丰富,海量数据的实时处理需求日益增多,传 统的日志分析技术在处理海量数据时存在计算瓶颈。大数据时代下,随着开放式处理平台的发展,能够处理大规模 且多样化数据的大数据处理系统应运而生。为了让原有的业务能够充分利用Hop的优势,本文首先研究了基于 大数据技术的网络日志分析方法,构建了网络日志分析平台以实现万亿级日志采集、解析、存储和高效、灵活的查询 与计算。对比分析了Hiwe、Impala和Spark SOL这3种具有代表性的SQL-om-Hadoop查询系统实例.并展示了这类系 统的性能特点。采用TPCH测试基准对它们的决策支持能力进行测试及评估,通过对实验数据的分析和解释得到了 若干有益的结论。实现了海量日志数据计算与分析在证券领域的几种典型应用,为进一步的研究工作奠定了基础。 关键词:大数据;日志分析:数据挖掘;Hadoop:查询引擎;数据采集:索引存储;证券行业 中图分类号:TP391文献标志码:A文章编号:1673-4785(2017)05-0717-12 中文引用格式:何明,常盟盟,刘郭洋,等.基于SQL-on-Hadoop查询引擎的日志挖掘及其应用[J].智能系统学报,2017,12(5): 717-728. 英文引用格式:HE Ming,CHANG Mengmeng,LIU Guoyang,etal.Log mining and application based on sql-on-hadoop query engine[J].CAAI transactions on intelligent systems,2017,12(5):717-728. Log mining and application based on sql-on-hadoop query engine HE Ming',CHANG Mengmeng',LIU Guoyang?,GU Chengxiang?,PENG Jike2 (1.Faculty of Information Technology,Beijing University of Technology,Beijing 100124,China;2.Information Technology Management Department,Haitong Securities Co.,Ltd.,Shanghai 200001,China) Abstract:With the rapid development of computing and networking technologies,and the increase in the number of data acquisition methods,the demand for real-time processing of massive amounts of log data is increasing every day,and there is a calculation bottleneck when traditional log analysis technology is used to process massive amounts of data.With the development of open processing platforms in the era of big data,a number of big data processing systems have emerged for dealing with large-scale and diverse data.To effectively apply the advantages of Hadoop to the original businesses,in this study,we first investigated network log analysis methods based on big data technology and constructed a network log analysis platform for the acquisition,analysis,storage,high- efficiency and flexible queries,and the calculation of trillions of log entries.In addition,we compared and analyzed three representative SQL-on-Hadoop query systems including Hive,Impala,and Spark SQL,and identified the performance characteristics of this type of system.We used the TPC-H testing reference to test and assess their decision-making support abilities.We drew some useful conclusions from the analysis of the experimental data.We also suggest a few typical applications for this analysis and processing system for massive log data in the securities fields,which provides a solid foundation for further research. Keywords:big data;log analysis;data mining;Hadoop;query engine;data collection;indexed storage; securities business 随着互联网的飞速发展和逐层推进,企业内部 业网络中的计算机设备和网络组件持久地记录着 的规模和业务量也不断增加,致使数据量猛增。企 海量的网络日志。日志文件是系统软硬件信息和 用户行为信息记录的载体,通过日志分析能够实时 收稿日期:2017-06-07.网络出版日期:2017-10-21. 基金项目:国家自然科学基金项目(91646201.91546111,60803086):国家科 获取设备、网络运行状态和用户行为交易等信息, 技支撑计划子课题(2013BAH2IB02-01);北京市自然科学基金 项目(4153058,4113076):北京市教委重点项目有利于保证系统的稳定运行和来往业务的安全性。 (KZ20160005009):北京市教委面上项目(KM201710005023). 通信作者:何明.E-mail:heming(@bjut.cdu.cn. 目前,较为成熟的日志集中管理系统解决了各类设
第 12 卷第 5 期 智 能 系 统 学 报 Vol.12 №.5 2017 年 10 月 CAAI Transactions on Intelligent Systems Oct. 2017 DOI:10.11992 / tis.201706016 网络出版地址:http: / / kns.cnki.net / kcms/ detail / 23.1538.TP.20171021.1350.014.html 基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 何明1 ,常盟盟1 ,刘郭洋2 ,顾程祥2 ,彭继克2 (1.北京工业大学 信息学部,北京 100124; 2. 海通证券股份有限公司 信息技术管理部,上海 200001) 摘 要:随着计算机和网络技术的迅猛发展以及数据获取手段的不断丰富,海量数据的实时处理需求日益增多,传 统的日志分析技术在处理海量数据时存在计算瓶颈。 大数据时代下,随着开放式处理平台的发展,能够处理大规模 且多样化数据的大数据处理系统应运而生。 为了让原有的业务能够充分利用 Hadoop 的优势,本文首先研究了基于 大数据技术的网络日志分析方法,构建了网络日志分析平台以实现万亿级日志采集、解析、存储和高效、灵活的查询 与计算。 对比分析了 Hive、Impala 和 Spark SQL 这 3 种具有代表性的 SQL⁃on⁃Hadoop 查询系统实例,并展示了这类系 统的性能特点。 采用 TPC⁃H 测试基准对它们的决策支持能力进行测试及评估,通过对实验数据的分析和解释得到了 若干有益的结论。 实现了海量日志数据计算与分析在证券领域的几种典型应用,为进一步的研究工作奠定了基础。 关键词:大数据;日志分析;数据挖掘;Hadoop;查询引擎;数据采集;索引存储;证券行业 中图分类号:TP391 文献标志码:A 文章编号:1673-4785(2017)05-0717-12 中文引用格式:何明,常盟盟,刘郭洋,等. 基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用[ J]. 智能系统学报, 2017, 12(5): 717-728. 英文引用格式: HE Ming,CHANG Mengmeng,LIU Guoyang, et al. Log mining and application based on sql⁃on⁃hadoop query engine[J]. CAAI transactions on intelligent systems, 2017, 12(5): 717-728. Log mining and application based on sql⁃on⁃hadoop query engine HE Ming 1 , CHANG Mengmeng 1 , LIU Guoyang 2 , GU Chengxiang 2 , PENG Jike 2 (1. Faculty of Information Technology, Beijing University of Technology, Beijing 100124, China; 2. Information Technology Management Department, Haitong Securities Co., Ltd., Shanghai 200001, China) Abstract:With the rapid development of computing and networking technologies, and the increase in the number of data acquisition methods, the demand for real⁃time processing of massive amounts of log data is increasing every day, and there is a calculation bottleneck when traditional log analysis technology is used to process massive amounts of data. With the development of open processing platforms in the era of big data, a number of big data processing systems have emerged for dealing with large⁃scale and diverse data. To effectively apply the advantages of Hadoop to the original businesses, in this study, we first investigated network log analysis methods based on big data technology and constructed a network log analysis platform for the acquisition, analysis, storage, high⁃ efficiency and flexible queries, and the calculation of trillions of log entries. In addition, we compared and analyzed three representative SQL⁃on⁃Hadoop query systems including Hive, Impala, and Spark SQL, and identified the performance characteristics of this type of system. We used the TPC⁃H testing reference to test and assess their decision⁃making support abilities. We drew some useful conclusions from the analysis of the experimental data. We also suggest a few typical applications for this analysis and processing system for massive log data in the securities fields, which provides a solid foundation for further research. Keywords: big data; log analysis; data mining; Hadoop; query engine; data collection; indexed storage; securities business 收稿日期:2017-06-07. 网络出版日期:2017-10-21. 基金项目:国家自然科学基金项目(91646201, 91546111, 60803086); 国家科 技支撑计划子课题(2013BAH21B02-01); 北京市自然科学基金 项 目 ( 4153058, 4113076 ); 北 京 市 教 委 重 点 项 目 (KZ20160005009); 北京市教委面上项目(KM201710005023). 通信作者:何明. E⁃mail:heming@ bjut.edu.cn. 随着互联网的飞速发展和逐层推进,企业内部 的规模和业务量也不断增加,致使数据量猛增。 企 业网络中的计算机设备和网络组件持久地记录着 海量的网络日志。 日志文件是系统软硬件信息和 用户行为信息记录的载体,通过日志分析能够实时 获取设备、网络运行状态和用户行为交易等信息, 有利于保证系统的稳定运行和来往业务的安全性。 目前,较为成熟的日志集中管理系统解决了各类设
·718· 智能系统学报 第12卷 备、服务器和应用日志的采集与格式统一问题,日 得到了若干有益的结论: 志分析也从最初简单的正则匹配向结构化查询、报 3)实现了大规模网络日志数据分析与计算在证 表和预测演进山。越来越多的行业领域面临海量 券领域的几种典型应用。 (volume)、高速(velocity)和多样(variety)等多V挑 1 相关工作 战,大数据时代已真正到来[2-)。 互联网中海量的信息为证券领域日志分析提 大数据技术在互联网领域海量网络日志分析和 供了丰富的数据支撑,如何利用大数据分析技术进 处理过程中得到了广泛的应用,日志分析系统主要 行实时准确的日志分析成为重要的科学问题。在 包括日志同步、数据存储、分布式计算和数据仓库等 大型证券公司的内部网络中,随着网络带宽的迅速 相关技术。开源的日志分析系统如Facebook的 扩容日志量急剧增长且日志源众多,包括网上交易 Scribet6],Apache Chukwat7],LinkedIn Kafkats], 日志、移动证券日志和网站日志等主要系统的日 Cloudera的Flume)等。Facebook公司庞大的用户群 志。以海通证券为例,目前在全国设有几十个节 体产生了大量的信息与社交数据,现有8亿多用户 点,几百台服务器,峰值在线用户约几十万,每个节 的信息需要处理,产生了大规模的数据和日志:同 点各部署了1台负载均衡设备。网上交易应用服务 时,离线的大规模数据分析计算已无法满足实时数 器全天24小时将客户请求数据与应答数据实时或 据分析的用户需求,Scribe结合了Google的分布式 小批量定时写入磁盘日志文件,每台交易应用服务 文件系统GFS[10](google file system,GFS)。操作流 器的日志文件大小为100MB~3GB,总计在100GB 程是收集异构数据源上的日志,集中存储到分布式 左右。同时,每台网上交易应用服务器还会生成一 文件系统,从而在此基础上进行统计分析。Amazon 份发送给柜台程序的网关日志数据。此外,各节点 基于S3和EC2,开发了Amazon EMR来提供大数据 负载均衡设备的日志采用SNMP协议进行采集,采 处理服务,可以将数据分布在可重新调整大小的 集每个站点的网络流量、用户连接数据。每日合计 EC2集群中进行处理,包括日志分析、索引、数据仓 有3亿多条日志,总量共计约300GB。仅上述3类 库和机器学习等。阿里巴巴集团使用目前国内最 日志存储一年就将产生约108TB数据,若接入更多 大的Hadoop集群“云梯”进行各部门产品的线上数 设备、操作系统、业务平台日志,数据规模则更大。 据备份、系统日志以及爬虫数据分析,并建设开放 传统的日志处理方法在面对海量大数据时,其存储 平台为个人和企业提供各种增值服务。腾讯微信 方式和计算能力都受到了限制,因此分布式存储和 等应用产品拥有上亿级别的用户,产生了海量的个 并行计算成为了新的发展趋势。如何采集、传输、 人用户日志数据,这些数据中蕴藏着巨大的商业价 存储、分析及应用大规模的日志数据,已成为证券 值,并提出“大数据营销”的概念。人人网基于 行业在大数据时代下面临的重大挑战。 Hadoop的Hive)、HBase[12]和Streamingl]组件, Hadoop)分布式处理平台为大数据存储和分析 构建了SNS推荐平台进行分析计算、内容推荐等工 提供了有效的解决方案。在大数据应用方面,虽然 作。百度的高性能计算系统规划中的架构将有超 学术界和工业界对大数据的关注各有侧重,但有一 过1万个节点,每天的数据生成量在10PB以上,主 个共同的认识:大数据只有和具体的行业深入结合 要用于日志的存储分析以及统计挖掘等功能。Wi 才能落到实处,才能产生真正的价值。通过前期的 等设计了Analysis Farm摒弃了传统的关系型数据 积累和算法的升级,大数据应用将对证券行业产生 (relational database management system,RDBMS), 革命性影响。 利用NoSQL(not only SQL)数据库MongoDB构建了 本文的主要贡献如下: 可横向扩展的日志分析平台,以支撑NetFlow日志 1)研究基于SQL-on-Hadoop查询系统的性能特 存储和查询4。Rabkin等设计了基于Hadoop的日 点,对比分析了Hive、Impala和Spark SQL这3种具 志收集和分析系统Chukwa,日志处理程序在 有代表性的SQL-on-Hadoop查询系统实例,构建了 MapReduce框架上开发。文献[l6-17]从原位 海量日志采集与实时计算分析平台: 分析的角度出发,分别实现了针对大规模日志分析 2)采用TPC-H测试基准对它们的决策支持能 的MapReduce(In-situ MapReduce)和Continuous处 力进行测试及评估,通过对实验数据的分析和解释 理机制,但MapReduce模型计算代价很大,并不能
备、服务器和应用日志的采集与格式统一问题,日 志分析也从最初简单的正则匹配向结构化查询、报 表和预测演进[1] 。 越来越多的行业领域面临海量 (volume)、高速(velocity)和多样(variety)等多 V 挑 战,大数据时代已真正到来[2-4] 。 互联网中海量的信息为证券领域日志分析提 供了丰富的数据支撑,如何利用大数据分析技术进 行实时准确的日志分析成为重要的科学问题。 在 大型证券公司的内部网络中,随着网络带宽的迅速 扩容日志量急剧增长且日志源众多,包括网上交易 日志、移动证券日志和网站日志等主要系统的日 志。 以海通证券为例,目前在全国设有几十个节 点,几百台服务器,峰值在线用户约几十万,每个节 点各部署了 1 台负载均衡设备。 网上交易应用服务 器全天 24 小时将客户请求数据与应答数据实时或 小批量定时写入磁盘日志文件,每台交易应用服务 器的日志文件大小为 100 MB~3 GB,总计在 100 GB 左右。 同时,每台网上交易应用服务器还会生成一 份发送给柜台程序的网关日志数据。 此外,各节点 负载均衡设备的日志采用 SNMP 协议进行采集,采 集每个站点的网络流量、用户连接数据。 每日合计 有 3 亿多条日志,总量共计约 300 GB。 仅上述 3 类 日志存储一年就将产生约 108 TB 数据,若接入更多 设备、操作系统、业务平台日志,数据规模则更大。 传统的日志处理方法在面对海量大数据时,其存储 方式和计算能力都受到了限制,因此分布式存储和 并行计算成为了新的发展趋势。 如何采集、传输、 存储、分析及应用大规模的日志数据,已成为证券 行业在大数据时代下面临的重大挑战。 Hadoop [5]分布式处理平台为大数据存储和分析 提供了有效的解决方案。 在大数据应用方面,虽然 学术界和工业界对大数据的关注各有侧重,但有一 个共同的认识:大数据只有和具体的行业深入结合 才能落到实处,才能产生真正的价值。 通过前期的 积累和算法的升级,大数据应用将对证券行业产生 革命性影响。 本文的主要贡献如下: 1)研究基于 SQL⁃on⁃Hadoop 查询系统的性能特 点,对比分析了 Hive、Impala 和 Spark SQL 这 3 种具 有代表性的 SQL⁃on⁃Hadoop 查询系统实例,构建了 海量日志采集与实时计算分析平台; 2)采用 TPC⁃H 测试基准对它们的决策支持能 力进行测试及评估,通过对实验数据的分析和解释 得到了若干有益的结论; 3)实现了大规模网络日志数据分析与计算在证 券领域的几种典型应用。 1 相关工作 大数据技术在互联网领域海量网络日志分析和 处理过程中得到了广泛的应用,日志分析系统主要 包括日志同步、数据存储、分布式计算和数据仓库等 相关 技 术。 开 源 的 日 志 分 析 系 统 如 Facebook 的 Scribe [6] ,Apache 的 Chukwa [7] , LinkedIn 的 Kafka [8] , Cloudera 的 Flume [9]等。 Facebook 公司庞大的用户群 体产生了大量的信息与社交数据,现有 8 亿多用户 的信息需要处理,产生了大规模的数据和日志;同 时,离线的大规模数据分析计算已无法满足实时数 据分析的用户需求, Scribe 结合了 Google 的分布式 文件系统 GFS [10] (google file system,GFS)。 操作流 程是收集异构数据源上的日志,集中存储到分布式 文件系统,从而在此基础上进行统计分析。 Amazon 基于 S3 和 EC2,开发了 Amazon EMR 来提供大数据 处理服务,可以将数据分布在可重新调整大小的 EC2 集群中进行处理,包括日志分析、索引、数据仓 库和机器学习等。 阿里巴巴集团使用目前国内最 大的 Hadoop 集群“云梯”进行各部门产品的线上数 据备份、系统日志以及爬虫数据分析,并建设开放 平台为个人和企业提供各种增值服务。 腾讯微信 等应用产品拥有上亿级别的用户,产生了海量的个 人用户日志数据,这些数据中蕴藏着巨大的商业价 值,并提出 “ 大 数 据 营 销” 的 概 念。 人 人 网 基 于 Hadoop 的 Hive [11] 、HBase [12] 和 Streaming [13] 组件, 构建了 SNS 推荐平台进行分析计算、内容推荐等工 作。 百度的高性能计算系统规划中的架构将有超 过 1 万个节点,每天的数据生成量在 10 PB 以上,主 要用于日志的存储分析以及统计挖掘等功能。 Wei 等设计了 Analysis Farm 摒弃了传统的关系型数据 库(relational database management system,RDBMS), 利用 NoSQL(not only SQL)数据库 MongoDB 构建了 可横向扩展的日志分析平台,以支撑 NetFlow 日志 存储和查询[14] 。 Rabkin 等设计了基于 Hadoop 的日 志收 集 和 分 析 系 统 Chukwa, 日 志 处 理 程 序 在 MapReduce 框架上开发[15] 。 文献[ 16 - 17] 从原位 分析的角度出发,分别实现了针对大规模日志分析 的 MapReduce( In⁃situ MapReduce) 和 Continuous 处 理机制, 但 MapReduce 模型计算代价很大,并不能 ·718· 智 能 系 统 学 报 第 12 卷
第5期 何明,等:基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 ·719· 很好地支持迭代运算。 主要分为文本数据、数据库数据和实时/准实时数 然而HDFS1]和MapReducet]大数据处理架构 据等。 主要是针对静态数据的批处理,在运算过程中产生 2.1HDS数据采集 的大量/O操作无法保证处理过程的实时性。针对 网络日志的生成是分布式的,与传统的日志管 上述问题,本文将研究基于SQL-on-Hadoop查询引 理系统一样,日志采集是本文平台的基础。本文平 擎构建网络日志分析平台,通过使用广泛的标准 台采集的日志直接存储在Hadoop文件系统 SQL语言来实现快速、灵活的查询性能。通过利用 (HDFS)中,由于平台构建于Hadoop之上,能够处 TB级日志数据对存储、查询性能进行测试、优化和 理海量分布式存储的日志数据,同时易于水平扩 比较,构建具有稳定性、高性能、可扩展性、易用性 展,本文的日志数据基本流程按功能可划分为5层, 和安全性的网络日志统一采集查询和监控平台,以 如图1所示。 满足对TB或PB级容量和万亿日志管理的应用需 1)原始数据层:业务上完成日志格式梳理,系统 求,为面向证券行业的日志大数据分析及其应用提 运行日志支持实时访问和采集接口。 供技术支撑。 2)数据采集层:主要负责通用的日志数据解析 2基于Hadoop的结构化数据处理 高效采集和安全可控。 3)数据处理层:主要包括对日志数据的批量式 网络日志源的种类具有多样性的特点,包括结 处理和实时处理。 构化、半结构化和非结构化的数据。不同类型的日 4)数据服务层:主要提供标准的数据访问接口 志存储方式有所不同。日志管理系统的采集器对 ODBC、JDBC、HIVE等。 不同格式的日志进行标准化处理,从而以结构化的 5)数据展示层:实现实时监控类和报表类数据 形式进行日志存储和分析。本文所采用的源数据 的展示。 数据处理 原始数据层 存储后处理 External Database HDFS Map/Reduce 数据服务层 :数据展示层 ”””””” ODBC Statistics Report Textfile 数据采集层 JDBC 实时处理 HIVE OLAP Network Sprak Data Cache Streaming 图1日志数据处理基本流程 Fig.1 Basic log data processing framework 根据应用需求,本文日志的采集方式分为以下on-Hadoop处理结果输出到RDBMS,供现有的日志 3种。 分析系统进行报表及可视化处理。 1)文件导人:对已分布在个服务器磁盘的日志 2.2SQL-on-Hadoop查询引擎 文件,经网络文件系统挂载,直接将日志文件导入 SQL是结构化数据的查询语言,SQL-on-Hadoop HDFS。该方式允许日志文件批量可靠导入,可在网 是构建在Hadoop之上的SQL查询系统,利用 络利用率低谷时段进行传送。 Hadoop能够进行海量数据(TB级别以上)的处理。 2)流数据导入:基于Apache Flume[2o]构建,实 目前已有的SQL-on-Hadoop系统大致可以分为两大 类:第一类将SQL查询转换为Map-Reduce job;第二 现多个日志源数据实时汇聚,接收网上交易应用服 类系统基于MPP(massively parallel processing)的设 务器和网络设备发送的日志。 计方式,仅仅使用Hadoop作为存储引擎,上层自行 3)RDBMS导人:为实现与现有日志系统兼容, 实现分布式查询的逻辑。第一类系统的代表是 基于Apache Sqoop2,实现与Oracle、MSQL和 Facebook的Hive。Hive是原始的SQL-on-Hadoop解 PostgreSQL等RDBMS对接,支持直接导入存储在上 决方案。它是一个开源的Java项目,能够将SQL转 述数据库中的数据记录。Sqoop同时可以将SQL 换成一系列可以在标准的Hadoop TaskTrackers上运
很好地支持迭代运算。 然而 HDFS [18]和 MapReduce [19]大数据处理架构 主要是针对静态数据的批处理,在运算过程中产生 的大量 I/ O 操作无法保证处理过程的实时性。 针对 上述问题,本文将研究基于 SQL⁃on⁃Hadoop 查询引 擎构建网络日志分析平台,通过使用广泛的标准 SQL 语言来实现快速、灵活的查询性能。 通过利用 TB 级日志数据对存储、查询性能进行测试、优化和 比较,构建具有稳定性、高性能、可扩展性、易用性 和安全性的网络日志统一采集查询和监控平台,以 满足对 TB 或 PB 级容量和万亿日志管理的应用需 求,为面向证券行业的日志大数据分析及其应用提 供技术支撑。 2 基于 Hadoop 的结构化数据处理 网络日志源的种类具有多样性的特点,包括结 构化、半结构化和非结构化的数据。 不同类型的日 志存储方式有所不同。 日志管理系统的采集器对 不同格式的日志进行标准化处理,从而以结构化的 形式进行日志存储和分析。 本文所采用的源数据 主要分为文本数据、数据库数据和实时/ 准实时数 据等。 2.1 HDFS 数据采集 网络日志的生成是分布式的,与传统的日志管 理系统一样,日志采集是本文平台的基础。 本文平 台采 集 的 日 志 直 接 存 储 在 Hadoop 文 件 系 统 (HDFS)中,由于平台构建于 Hadoop 之上,能够处 理海量分布式存储的日志数据,同时易于水平扩 展,本文的日志数据基本流程按功能可划分为 5 层, 如图 1 所示。 1)原始数据层:业务上完成日志格式梳理,系统 运行日志支持实时访问和采集接口。 2)数据采集层:主要负责通用的日志数据解析、 高效采集和安全可控。 3)数据处理层:主要包括对日志数据的批量式 处理和实时处理。 4)数据服务层:主要提供标准的数据访问接口 ODBC、JDBC、HIVE 等。 5)数据展示层:实现实时监控类和报表类数据 的展示。 图 1 日志数据处理基本流程 Fig.1 Basic log data processing framework 根据应用需求,本文日志的采集方式分为以下 3 种。 1)文件导入:对已分布在个服务器磁盘的日志 文件,经网络文件系统挂载,直接将日志文件导入 HDFS。 该方式允许日志文件批量可靠导入,可在网 络利用率低谷时段进行传送。 2)流数据导入:基于 Apache Flume [20] 构建,实 现多个日志源数据实时汇聚,接收网上交易应用服 务器和网络设备发送的日志。 3)RDBMS 导入:为实现与现有日志系统兼容, 基于 Apache Sqoop [21] , 实 现 与 Oracle、 MySQL 和 PostgreSQL 等 RDBMS 对接,支持直接导入存储在上 述数据库中的数据记录。 Sqoop 同时可以将 SQL⁃ on⁃Hadoop 处理结果输出到 RDBMS,供现有的日志 分析系统进行报表及可视化处理。 2.2 SQL⁃on⁃Hadoop 查询引擎 SQL 是结构化数据的查询语言,SQL⁃on⁃Hadoop 是构 建 在 Hadoop 之 上 的 SQL 查 询 系 统, 利 用 Hadoop 能够进行海量数据( TB 级别以上)的处理。 目前已有的 SQL⁃on⁃Hadoop 系统大致可以分为两大 类:第一类将 SQL 查询转换为 Map⁃Reduce job;第二 类系统基于 MPP(massively parallel processing)的设 计方式,仅仅使用 Hadoop 作为存储引擎,上层自行 实现分布式查询的逻辑。 第一类系统的代表是 Facebook 的 Hive。 Hive 是原始的 SQL⁃on⁃Hadoop 解 决方案。 它是一个开源的 Java 项目,能够将 SQL 转 换成一系列可以在标准的 Hadoop TaskTrackers 上运 第 5 期 何明,等:基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 ·719·
·720 智能系统学报 第12卷 行的MapReduce任务。如图2中的Hive架构部分 Impala并没有使用MapReduce执行查询,而是使用 所示,Hive通过一个metastore(本身就是一个数据 了自己的执行守护进程操作本地磁盘文件。由于 库)存储表模式、分区和位置以期提供像MySQL一 没有MapReduce开销以及磁盘/O、查询语句编译 样的功能。它支持大部分MySQL语法,同时使用相 等一系列优化,Impala通常要比Hive具有更快的数 似的database/able/view约定组织数据集。Hive内 据访问性能[s]。Impala共享Hive的metastore,可直 部机制是基于MapReduce,从而导致了计算过程中 接与Hive管理的数据互操作。Spark[24]使用轻量级 消耗大量的/0,降低了运行效率。Impala2]是由 的线程作为执行器,减少了执行作业的开销,同时 Cloudera构建的一个针对Hadoop的开源的MPP 提高了调度的响应速度,如图2中的Spark部分所 (massively parallel processing)“交互式”SQL查询引 示。Spark SQL是在Spark之上搭建的SQL查询引 擎。Impala同样提供了一种SQL查询方法,如图2 擎,支持在Spark中使用Sql、HiveSql、Scala中的关 中的Impala架构部分所示,与Hive不同的是, 系型查询表达式。 Impala Hive SQL APP JDBC Statestore Catalog CLI JDBC/ODBC HUE ODBC Impalad Impalad Impalad Query Planner Query Planner Query Planner Thrift Server MetaStore Query Coordinator Query Coordinator Query Coordinator Query Executor Query Executor Query Executor Compiler Optimizer Executor Hadoop Spark Standalone Job Tracker Name Node Spark Context Spark Master 1 Data Node Data Node Data Node Spark Worker Spark Worker Spark Worker 'ask Tracker Task Tracker Task Tracker kecutor Backenc Executor Backend Executor Backend 图2 Hadoop、Hive、Impala与Spark执行结构图 Fig.2 Structure for implementation of Hadoop,Hive,Impala and Spark 2.3结构化数据存储与压缩 多种格式的数据格式的支持。Text是原始的文本数 目前,很多研究者提出了在Hadoop中优化结构 据,通常为CSV或其他特定字符分隔。Hive的格式 化数据存储的方法。He等[2]提出的RCFile格式旨 支持更为全面,由于Impala和Hive共享metastore, 在提高数据导人和处理效率。它首先将数据水平 因此本文平台实际应用中通常由Hive导入数据而 分割为多个行组(ov-group),然后对每个组内的数 后台使用Spark SQL查询。 据垂直分割成列存储。列存储将数据表同一列的 表1Hive、Impala和Spark SOL数据格式支持比较 数据连续存放,当查询只涉及部分列时,可大幅减 Table 1 Data format comparison of Hive,Impala and 少所需读取的数据量。ORC(optimized RCFile)是对 Spark SQL RCFile的改进,解决其在数据类型和性能上的多个 数据 Hive Impala Spark SQL 局限性,改善查询和空间利用效率。Parquet是 格式 查询插入 查询插人 查询 插入 Hadoop生态圈中一种新型列式存储格式,灵感来自 于2010年Google发表的Dremel论文[2],它可以兼 Text 容Hadoop生态圈中大多数生态框架(Hadoop、Spark RCFile 等),被多种查询引擎支持(Hive、Impala、Spark SQL、 ORC Dill等),并且它与语言和平台无关的。表1比较 了本文2.2节描述的3种查询引擎从HDFS上读取 Parquet
行的 MapReduce 任务。 如图 2 中的 Hive 架构部分 所示,Hive 通过一个 metastore(本身就是一个数据 库)存储表模式、分区和位置以期提供像 MySQL 一 样的功能。 它支持大部分 MySQL 语法,同时使用相 似的 database / table / view 约定组织数据集。 Hive 内 部机制是基于 MapReduce,从而导致了计算过程中 消耗大量的 I/ O,降低了运行效率。 Impala [22] 是由 Cloudera 构建的一个针对 Hadoop 的开源的 MPP (massively parallel processing)“交互式” SQL 查询引 擎。 Impala 同样提供了一种 SQL 查询方法,如图 2 中的 Impala 架 构 部 分 所 示, 与 Hive 不 同 的 是, Impala 并没有使用 MapReduce 执行查询,而是使用 了自己的执行守护进程操作本地磁盘文件。 由于 没有 MapReduce 开销以及磁盘 I/ O、查询语句编译 等一系列优化,Impala 通常要比 Hive 具有更快的数 据访问性能[23] 。 Impala 共享 Hive 的 metastore,可直 接与 Hive 管理的数据互操作。 Spark [24]使用轻量级 的线程作为执行器,减少了执行作业的开销,同时 提高了调度的响应速度,如图 2 中的 Spark 部分所 示。 Spark SQL 是在 Spark 之上搭建的 SQL 查询引 擎,支持在 Spark 中使用 Sql、HiveSql、Scala 中的关 系型查询表达式。 图 2 Hadoop、Hive、Impala 与 Spark 执行结构图 Fig.2 Structure for implementation of Hadoop, Hive, Impala and Spark 2.3 结构化数据存储与压缩 目前,很多研究者提出了在 Hadoop 中优化结构 化数据存储的方法。 He 等[25]提出的 RCFile 格式旨 在提高数据导入和处理效率。 它首先将数据水平 分割为多个行组(row⁃group),然后对每个组内的数 据垂直分割成列存储。 列存储将数据表同一列的 数据连续存放,当查询只涉及部分列时,可大幅减 少所需读取的数据量。 ORC(optimized RCFile)是对 RCFile 的改进,解决其在数据类型和性能上的多个 局限性, 改 善 查 询 和 空 间 利 用 效 率。 Parquet 是 Hadoop 生态圈中一种新型列式存储格式,灵感来自 于 2010 年 Google 发表的 Dremel 论文[26] ,它可以兼 容 Hadoop 生态圈中大多数生态框架(Hadoop、Spark 等),被多种查询引擎支持(Hive、Impala、Spark SQL、 Drill 等),并且它与语言和平台无关的。 表 1 比较 了本文 2.2 节描述的 3 种查询引擎从 HDFS 上读取 多种格式的数据格式的支持。 Text 是原始的文本数 据,通常为 CSV 或其他特定字符分隔。 Hive 的格式 支持更为全面,由于 Impala 和 Hive 共享 metastore, 因此本文平台实际应用中通常由 Hive 导入数据而 后台使用 Spark SQL 查询。 表 1 Hive、Impala 和 Spark SQL 数据格式支持比较 Table 1 Data format comparison of Hive, Impala and Spark SQL 数据 格式 Hive Impala Spark SQL 查询 插入 查询 插入 查询 插入 Text √ √ √ √ √ √ RCFile √ √ √ — — — ORC √ √ — — — — Parquet √ √ √ √ √ √ ·720· 智 能 系 统 学 报 第 12 卷
第5期 何明,等:基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 ·721. 数据压缩是另一种性能优化方法。压缩一方 DSparkConf conf new SparkConf(); 面节省存储空间,另一方面在相同磁盘/0速度可 ②创建上下文对象; 读写更多记录。Hive、Impala和Spark SQL均支持直 3StreamingContext(conf,Interval); 接查询压缩的数据文件,常用压缩算法有Gzip/ZIib 4MapOffsets=kafka.getOffset(); 和侧重于解压缩速度的Snappy。ORC格式本身已 ⑤获取kafka读取偏移量; 内嵌轻量级的压缩机制。 ⑥DStream stream; 2.4结构化数据处理算法 7KafkaUtils.createDStream(input); RDD数据集包含对父RDD的一组依赖,这种依 ⑧Return streamo 赖描述了RDD之间的传承关系。RDD将操作分为 2)RDD数据处理 两类:Transformation与Action。Transformation操作 ①stream.foreachRDD; 不执行运算,只有当Action操作时才触发运算。在 2new VoidFunction>(); RDD的实现机制中,基于迭代器的接口实现原理使 3call(RDDrdd); 得数据的访问更加高效,同时避免了大量中间结果 4HasOffsetRanges offrange rdd.rdd(); 对内存的消耗。Spark SQL包含了结构化数据和数 ⑤合并请求应答,并解析存储数据: 据之上进行运算的更多信息,Spark SQL使用这些信 6rdd.mapPartitionsToPair; 息进行优化,使得结构化数据的操作更加高效和方 7 new FlumeKafkaFunction(); 便,基于Spark SQL的数据操作流程如下。 8foreachPartition(ProceFunction()); 算法1 SparkSQLonRdd(,:) 9kafka.setOffset(offrange); 输入Kafka输入数据流input,Spark上下文 ①保存kafka读取偏移量。 context; 3)ProceFunction数据后处理 输出分布式集合dataframe。 DIterator>iter; 1)DStream line:Kafka->DStream(input); 2while (iter.hasNext()); 2)获取Kaka流数据输入; 3KafkaData data iter.next()._2(); 3)SglContext sc new SqlContext(context); 4json =data.getData(); 4)DStreamrdd=line.map; 5Record record =Object(json,class); 5)new Function: 6record.setCollect_time; 6)public Row call(T); 7data.getExtData(TIME)); 7)创建Row对象; 8Utils.save(item_topic,record); 8)List ⑨Return record.. (): 其中,RDD根据数据记录的key对结构进行分 9)Struct Fields.add CreateDataType (Column 区。分片数据采用迭代器Iterator流式访问,hasNext >)): 方法是由RDD lineage上各个Transformation携带的 10)重复步骤9)创建逻辑表结构: 闭包函数复合而成,使得对象被序列化,通过网络 11)Struct Type st:DataTypes.CreateStructType 传输到其他节点上进行装载运算。Iterator每访问 (sf); 一个元素,就对该元素应用相应的复合函数,得到 12)DataFrame df 的结果再流式地存储。 13)sc->DataFrame(rdd,st); 3平台架构与集群环境部署 14)df.RegisterTable(); 15)DataFrame dataframe=sc.sql(); 3.1平台架构与处理框架 16)Return dataframe. 本文基于Hadoop,构建证券交易应用服务器和 算法2 RddProcessing() 网络设备海量日志采集、解析、存储与实时计算分 输入Kafka输入数据流input 析平台,平台的核心架构如下。 输出数据集对象record。 1)数据采集层:负责实时采集来自通达信、恒 1)数据采集与预处理 生、核新的网上交易应用服务器全天24小时的客户
数据压缩是另一种性能优化方法。 压缩一方 面节省存储空间,另一方面在相同磁盘 I/ O 速度可 读写更多记录。 Hive、Impala 和 Spark SQL 均支持直 接查询压缩的数据文件,常用压缩算法有 Gzip / Zlib 和侧重于解压缩速度的 Snappy。 ORC 格式本身已 内嵌轻量级的压缩机制。 2.4 结构化数据处理算法 RDD 数据集包含对父 RDD 的一组依赖,这种依 赖描述了 RDD 之间的传承关系。 RDD 将操作分为 两类:Transformation 与 Action。 Transformation 操作 不执行运算,只有当 Action 操作时才触发运算。 在 RDD 的实现机制中,基于迭代器的接口实现原理使 得数据的访问更加高效,同时避免了大量中间结果 对内存的消耗。 Spark SQL 包含了结构化数据和数 据之上进行运算的更多信息,Spark SQL 使用这些信 息进行优化,使得结构化数据的操作更加高效和方 便,基于 Spark SQL 的数据操作流程如下。 算法 1 SparkSQLonRdd(<input>,<context>) 输入 Kafka 输入数据流 input, Spark 上下文 context; 输出 分布式集合 dataframe。 1)DStream line:Kafka->DStream(input); 2)获取 Kafka 流数据输入; 3)SqlContext sc = new SqlContext(context); 4)DStream<Row> rdd = line.map; 5)new Function; 6)public Row call(T) {}; 7)创建 Row 对象; 8)List < StructField > sf = new;List < StructField > (); 9) Struct Fields. add ( CreateDataType ( < Column >)); 10)重复步骤 9)创建逻辑表结构; 11) Struct Type st: DataTypes. CreateStructType (sf); 12)DataFrame df : 13)sc->DataFrame(rdd, st); 14)df.RegisterTable(<Table Name>); 15)DataFrame dataframe = sc.sql(<Sql Query>); 16)Return dataframe。 算法 2 RddProcessing(<input>) 输入 Kafka 输入数据流 input; 输出 数据集对象 record。 1)数据采集与预处理 ①SparkConf conf = new SparkConf(); ②创建上下文对象; ③StreamingContext(conf, Interval); ④Map<E,T> Offsets = kafka.getOffset(); ⑤获取 kafka 读取偏移量; ⑥DStream stream; ⑦KafkaUtils.createDStream(input); ⑧Return stream。 2)RDD 数据处理 ①stream.foreachRDD; ②new VoidFunction<RDD>>(); ③call(RDD<MessageAndMetadata> rdd); ④HasOffsetRanges offrange = rdd.rdd(); ⑤合并请求应答,并解析存储数据; ⑥rdd.mapPartitionsToPair; ⑦ new FlumeKafkaFunction(); ⑧foreachPartition(ProceFunction()); ⑨kafka.setOffset(offrange); ⑩保存 kafka 读取偏移量。 3)ProceFunction 数据后处理 ①Iterator<Tuple2<T, KafkaData>> iter; ②while (iter.hasNext()); ③KafkaData data = iter.next()._2(); ④json = data.getData(); ⑤Record record =Object(json, class); ⑥record.setCollect_time; ⑦data.getExtData(TIME)); ⑧Utils.save(item_topic, record); ⑨Return record。 其中,RDD 根据数据记录的 key 对结构进行分 区。 分片数据采用迭代器 Iterator 流式访问,hasNext 方法是由 RDD lineage 上各个 Transformation 携带的 闭包函数复合而成,使得对象被序列化,通过网络 传输到其他节点上进行装载运算。 Iterator 每访问 一个元素,就对该元素应用相应的复合函数,得到 的结果再流式地存储。 3 平台架构与集群环境部署 3.1 平台架构与处理框架 本文基于 Hadoop,构建证券交易应用服务器和 网络设备海量日志采集、解析、存储与实时计算分 析平台,平台的核心架构如下。 1)数据采集层:负责实时采集来自通达信、恒 生、核新的网上交易应用服务器全天 24 小时的客户 第 5 期 何明,等:基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 ·721·
·722. 智能系统学报 第12卷 请求应答数据以及网络设备日志数据,为大数据分 4)稳定性和可靠性:基于成熟的、经过实践验证 析平台提供数据源。 稳定可靠的Hadoop技术组件服务器节点非常容易 2)数据汇集层:将各个数据采集节点的日志数 实现横向扩展,分布式环境保障集群中的任意一台 据源源不断地汇集到各自的集群。 服务器出现宕机时不影响系统的稳定可靠运行。 3)数据缓冲层:根据不同的Topic对海量日志数 3.2环境部署 据进行缓冲,有助于控制和优化数据流经过系统的 基于Hadoop的网络日志分析平台在海通证券 速度。 网络信息中心的搭建部署,如图4所示。共42台服 4)数据分发与解析处理层:负责数据的解析、勾 务器,其中11台是Flume汇聚节点(256GB内存, 对、计算和分发。 2×600GB,RAID1阵列),5台Kafka节点(256GB 5)数据存储与计算层:用于存储、管理日志数 内存,2×6O0GB,RAID1阵列),3台Couchbase节点 据,支持多维检索、统计分析和查询处理。 (512GB内存,2×600GB,RAID1阵列),5台 6)应用层:负责面向终端用户提供日志分析与 Zookeeper节点(256GB内存,2×600GB,RAD1阵 管理的泛在接入,提供实时运维监控、实时预警、明 列),2台作为Namenode(256GB内存,2×600GB, 细毫秒级查询以及实时报表输出等应用。 RAID1阵列),14台是Datanode节点(256GB内存, 可以看到,在这个大数据分析体系结构中,系统 2×600GB,RAID1阵列,2×600GB,RAID1阵列+6× 支持TB级、PB级或者更大规模数据的分析和处 2TB,RAID0阵列),2台Tomeat(256GB内存,2× 理:系统可以处理结构化数据、非结构化和半结构 600GB,RAID1阵列)。 化数据,有良好的扩展性。基于上述平台结构,本 文设计了能够有效地利用大数据技术解决海量系 统访问日志多条件实时快速查询的处理框架,如图 交易应用 Tomeat集群 服务器 3所示。 Node I 了 通 Fume集群Kafka集群 Zookeeper集群 计算节点集群Hbase、Spark) 交易应用 交易应用 服务器集群 服务器 Node I 汇 消息队列 集群 Node 2 Kafka topic Spark Flume Kafka topic 2 Streaming NameNode DataNode Flume CouchBase集群 Kafka topic 3 交易应用 Flume 丁Tomcat集群 服务器集群 Kafka topic Node 2 交易应用 服务器 Hbase Hive Elasticsearch Node n ZooKeeper集群 交易应用 图4集群拓扑图 服务器集群 Node N Fig.4 Cluster topology 图3处理框架 所有节点通过I0GB以太网互联。Hadoop部署 Fig.3 Processing framework 采用Cloudera的发行版,版本为CDH5.5.0,HDFS总 容量近60TB。接入日志分析平台的数据来自网上 该处理框架能够保证平台系统如下的几个 交易应用服务器日志数据和网络设备日志数据。 特性。 网上交易日志每天产生的记录数约1.2亿条,体积 1)实时性:实时采集Agent包,从产生时刻起到 约100GB:网络设备日志数据日志每天的记录数约 实时采集,再到传输到数据中心,整个时间间隔控 650万条,体积约6GB。 制在1s内实时勾对、解析等计算,并保存到数据中 心的集群,这个过程的时间间隔控制在3~5s。 4实验与性能评估 2)准确性和完整性:传输通道实现不重传、不漏 4.1实验环境与数据集 传、断点续传,保证数据完整性。 我们采用的实验环境为7台物理测试机构建的 3)安全性:非对称加密算法对传输的日志数据 集群,选取2台机器作为主节点,其余作为计算节点 进行加密,使用SSL/TLS协议,保障网络传输通道 进行SQL-on-Hadoop实验,测试集群拓扑如图5 的安全性。 所示
请求应答数据以及网络设备日志数据,为大数据分 析平台提供数据源。 2)数据汇集层:将各个数据采集节点的日志数 据源源不断地汇集到各自的集群。 3)数据缓冲层:根据不同的 Topic 对海量日志数 据进行缓冲,有助于控制和优化数据流经过系统的 速度。 4)数据分发与解析处理层:负责数据的解析、勾 对、计算和分发。 5)数据存储与计算层:用于存储、管理日志数 据,支持多维检索、统计分析和查询处理。 6)应用层:负责面向终端用户提供日志分析与 管理的泛在接入,提供实时运维监控、实时预警、明 细毫秒级查询以及实时报表输出等应用。 可以看到,在这个大数据分析体系结构中,系统 支持 TB 级、PB 级或者更大规模数据的分析和处 理;系统可以处理结构化数据、非结构化和半结构 化数据,有良好的扩展性。 基于上述平台结构,本 文设计了能够有效地利用大数据技术解决海量系 统访问日志多条件实时快速查询的处理框架,如图 3 所示。 图 3 处理框架 Fig.3 Processing framework 该处理框架能够保证平台系统如下的几个 特性。 1)实时性:实时采集 Agent 包,从产生时刻起到 实时采集,再到传输到数据中心,整个时间间隔控 制在 1 s 内实时勾对、解析等计算,并保存到数据中 心的集群,这个过程的时间间隔控制在 3~5 s。 2)准确性和完整性:传输通道实现不重传、不漏 传、断点续传,保证数据完整性。 3)安全性:非对称加密算法对传输的日志数据 进行加密,使用 SSL / TLS 协议,保障网络传输通道 的安全性。 4)稳定性和可靠性:基于成熟的、经过实践验证 稳定可靠的 Hadoop 技术组件服务器节点非常容易 实现横向扩展,分布式环境保障集群中的任意一台 服务器出现宕机时不影响系统的稳定可靠运行。 3.2 环境部署 基于 Hadoop 的网络日志分析平台在海通证券 网络信息中心的搭建部署,如图 4 所示。 共 42 台服 务器,其中 11 台是 Flume 汇聚节点(256 GB 内存, 2×600 GB,RAID1 阵列),5 台 Kafka 节点(256 GB 内存,2×600 GB,RAID1 阵列),3 台 Couchbase 节点 (512 GB 内 存, 2 × 600 GB, RAID1 阵 列), 5 台 Zookeeper 节点(256 GB 内存,2×600 GB,RAID1 阵 列),2 台作为 Namenode(256 GB 内存,2×600 GB, RAID1 阵列),14 台是 Datanode 节点(256 GB 内存, 2×600 GB,RAID1 阵列,2×600 GB,RAID1 阵列 +6× 2 TB,RAID0 阵列),2 台 Tomcat(256 GB 内存,2× 600 GB,RAID1 阵列)。 图 4 集群拓扑图 Fig.4 Cluster topology 所有节点通过 10 GB 以太网互联。 Hadoop 部署 采用 Cloudera 的发行版,版本为 CDH5.5.0,HDFS 总 容量近 60 TB。 接入日志分析平台的数据来自网上 交易应用服务器日志数据和网络设备日志数据。 网上交易日志每天产生的记录数约 1.2 亿条,体积 约 100 GB;网络设备日志数据日志每天的记录数约 650 万条,体积约 6 GB。 4 实验与性能评估 4.1 实验环境与数据集 我们采用的实验环境为 7 台物理测试机构建的 集群,选取 2 台机器作为主节点,其余作为计算节点 进行 SQL⁃on⁃Hadoop 实 验, 测 试 集 群 拓 扑 如 图 5 所示。 ·722· 智 能 系 统 学 报 第 12 卷
第5期 何明,等:基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 ·723. 31图 4.2性能评估 千兆网交换机 本文选择SQL-on-Hadoop作为基础查询引擎,对 3个引擎的处理时效性进行分析。从表3中各个引 擎的总运行时间可以看出,Impala比Hive快了l.S 倍,Spark SQL比Hive快了2.7倍。 表3查询执行时间比较 主节点1 主节点2 工作节点1 工作节点5 Name Node Name Node Data Node Data Node Table 3 Comparison of query runtime 图5测试环境拓扑图 TPC-H 执行时间/s Fig.5 Test environment topology 查询子句 Hive Impala Spark SQL 实验采用针对OLAP应用的TPC-H测试基准来 Q1 115 25 14 评估执行引擎的性能。TPC-H面向商务采购应用, Q2 161 146 18 其数据库模式遵循第三范式。性能评测基准定义 Q3 274 173 83 了22个复杂SELECT语句和2个更新数据语句,遵 循SQL-92标准。数据库的规模由自带的扩展因子 Q4 152 211 126 (scale factor,SF)决定,有10个级别,从1GB到100TB Q5 342 326 64 不等供用户选择。TPC-H基准以每小时内执行的 Q6 46 38 12 查询数作为度量标准,在工业和科研领域当中应用 Q7 678 339 87 广泛。 文献[23]讨论了ORCFile和Parquet两种列式 Q8 468 331 90 存储格式的性能差别,通过空间使用和查询性能比 Q9 735 467 较,认为Parquet针对文本文件压缩率较高,从而节 Q10 298 163 40 省了HDFS的存储空间,同时减小了磁盘/O的开 Q11 246 35 30 销,但是解压缩会占用部分计算资源,对性能有一 定影响。因此,本文采用Parquet紧凑的列存储格 Q12 126 22 14 式,并选用了压缩比和解压速度较为均衡的 Q13 29 142 66 Snappy,相对原始文本日志节省了近70%的空间。 Q14 101 21 本文实验使用TPC-H作为测试数据集,在SF= Q15 27 16 300的数据规模上进行测试,其描述和相关压缩处 12 理如表2所示。 Q16 4 267 54 表2实验数据集 Q17 523 428 254 Table 2 Experimental dataset Q18 449 549 121 Table Rows Volume Parquet/Snappy Q19 243 226 283 Lineitem 1799989091 187.8GB 50.8GB Q20 419 170 91 Order 450000000 43.6GB 15.8GB Q21 681 713 792 Partsupp 240000000 32.2GB 12.2GB Q22 82 67 92 Part 60000000 8.7GB 2.3GB Total 7592 5223 2831 Customer 45000000 7.5GB 3.9GB 实验结果表明,Impala在Q1、Q6、Q12、Q15上 Supplier 3000000 0.4GB 0.23GB 的性能优于Hive,查询语句结构如下: Nation 25 3.1KB 2.1KB SELECT Region 620B 390B fieldl,field12, 总计 2597989121 280.2GB 85.2GB SUM(field3)as alias1
图 5 测试环境拓扑图 Fig.5 Test environment topology 实验采用针对 OLAP 应用的 TPC⁃H 测试基准来 评估执行引擎的性能。 TPC⁃H 面向商务采购应用, 其数据库模式遵循第三范式。 性能评测基准定义 了 22 个复杂 SELECT 语句和 2 个更新数据语句,遵 循 SQL-92 标准。 数据库的规模由自带的扩展因子 (scale factor,SF)决定,有10 个级别,从1 GB 到100 TB 不等供用户选择。 TPC⁃H 基准以每小时内执行的 查询数作为度量标准,在工业和科研领域当中应用 广泛。 文献[23] 讨论了 ORCFile 和 Parquet 两种列式 存储格式的性能差别,通过空间使用和查询性能比 较,认为 Parquet 针对文本文件压缩率较高,从而节 省了 HDFS 的存储空间,同时减小了磁盘 I/ O 的开 销,但是解压缩会占用部分计算资源,对性能有一 定影响。 因此,本文采用 Parquet 紧凑的列存储格 式, 并 选 用 了 压 缩 比 和 解 压 速 度 较 为 均 衡 的 Snappy,相对原始文本日志节省了近 70%的空间。 本文实验使用 TPC⁃H 作为测试数据集,在 SF = 300 的数据规模上进行测试,其描述和相关压缩处 理如表 2 所示。 表 2 实验数据集 Table 2 Experimental dataset Table Rows Volume Parquet / Snappy Lineitem 1 799 989 091 187.8 GB 50.8 GB Order 450 000 000 43.6 GB 15.8 GB Partsupp 240 000 000 32.2 GB 12.2 GB Part 60 000 000 8.7 GB 2.3 GB Customer 45 000 000 7.5 GB 3.9 GB Supplier 3 000 000 0.4 GB 0.23 GB Nation 25 3.1 KB 2.1 KB Region 5 620 B 390 B 总计 2 597 989 121 280.2 GB 85.2 GB 4.2 性能评估 本文选择 SQL⁃on⁃Hadoop 作为基础查询引擎,对 3 个引擎的处理时效性进行分析。 从表 3 中各个引 擎的总运行时间可以看出,Impala 比 Hive 快了 1.5 倍,Spark SQL 比 Hive 快了 2.7 倍。 表 3 查询执行时间比较 Table 3 Comparison of query runtime TPC⁃H 查询子句 执行时间/ s Hive Impala Spark SQL Q1 115 25 14 Q2 161 146 18 Q3 274 173 83 Q4 152 211 126 Q5 342 326 64 Q6 146 38 12 Q7 678 339 87 Q8 468 331 90 Q9 1104 735 467 Q10 298 163 40 Q11 246 35 30 Q12 126 22 14 Q13 293 142 66 Q14 107 101 21 Q15 277 16 12 Q16 408 267 54 Q17 523 428 254 Q18 449 549 121 Q19 243 226 283 Q20 419 170 91 Q21 681 713 792 Q22 82 67 92 Total 7 592 5 223 2 831 实验结果表明,Impala 在 Q1、Q6、Q12、Q15 上 的性能优于 Hive,查询语句结构如下: SELECT { field1, field12, SUM(field3) as alias1, 第 5 期 何明,等:基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 ·723·
·724· 智能系统学报 第12卷 SUM(field4)as alias2, S ” ORDER AVG(field5)as alias3, AGG3 …, [S2] COUNT(*)as alias4 CROSS TMP2 PRODUCT AGGD FROM TableExpression TMP2 OUTERJOIN WHERE TMPI (AGG2 [field6 由于Hive和Spark SQL均在JVM之上运行,对 SELECT AVG (c_acetbal) CPU和内存的使用依赖于JVM。如图7所示, FROM customer WHERE c_acctbal 0.00 Impala的CPU占用时间要明显少于Hive和Spark and substring (c_phone from 1 for 2)in SQL,这是由于Impala在执行查询过程中,在每个计 ('[1]','[12]','[13]','[14]','[15]',[16]', 算节点上运行只占用一个CPU线程。而Hive和 '[I7]')) Spark SQL在CPU使用上的优化完全依赖于JVM。 and not exists (SELECT FROM orders where 如图8所示,Impala和Hive内存使用率明显小于 o_custkey=c_custkey) Spark SQL,同时使用线程来执行耗费资源较多的 )as custsale Executor Backend进程。 Group BY cntrycode ORDER BY cntrycode; 8230 如图6所示,Q22中作业由3个子查询组成。子 200 150 Hive Impala-Spark SQL 查询S1对customer表进行扫描并将结果保存到临 时表Temp1中;子查询S2对Templ进行聚集操作 AGG1后将结果保存到临时表Temp2中;子查询S3 50 在与表Orders执行聚集操作AGG2后依次与Templ 102030405060708090 时间/s 和Temp2进行关联操作求笛卡尔乘积AGG3然后 排序。 图8集群内存平均使用量 Fig.8 Average cluster memory usage
SUM(field4) as alias2, ……, AVG(field5) as alias3, ……, COUNT(∗) as alias4 } FROM TableExpression WHERE [field6 < = date′yyyy⁃mm⁃dd′⁃interval ′[DELTA]′ day (3)] GROUP BY [field6, field7] ORDER BY [field6, field7] 根据该语句的执行计划,可以判断查询时对整 个表进行了遍历。 对于 Spark SQL 而言,其在大多 数查询上的表现优于 Hive 和 Impala。 由于 Spark 的 接口丰富和 SQL 优势,在执行查询时的速度较快。 4.3 Q22 资源消耗情况 Q22 的查询语句如下: SELECT cntrycode, COUNT ( ∗) as numcust, sum ( c _ acctbal) as totacctbal FROM ( SELECT substring ( c _ phone from 1 for 2 ) as cntrycode, c_acctbal FROM customer WHERE substring(c_phone from 1 for 2) in (′[I1]′,′[I2]′,′[I3]′,′[I4]′,′[ I5]′,′[16]′, ′[I7]′) and c_acctbal > ( SELECT AVG (c_acctbal) FROM customer WHERE c_acctbal > 0.00 and substring (c_phone from 1 for 2) in (′[1]′,′[12]′,′[13]′,′[14]′,′[15]′,′[16]′, ′[I7]′)) and not exists ( SELECT ∗ FROM orders where o_custkey = c_custkey) ) as custsale Group BY cntrycode ORDER BY cntrycode; 如图 6 所示,Q22 中作业由 3 个子查询组成。 子 查询 S1 对 customer 表进行扫描并将结果保存到临 时表 Temp1 中;子查询 S2 对 Temp1 进行聚集操作 AGG1 后将结果保存到临时表 Temp2 中;子查询 S3 在与表 Orders 执行聚集操作 AGG2 后依次与 Temp1 和 Temp2 进行关联操作求笛卡尔乘积 AGG3 然后 排序。 图 6 Implementation of Q22 Fig.6 Q22 的执行过程 实验分析对比了不同的查询方式在运行 Q22 时 集群资源使用情况(如图 7~11 所示),包括 CPU、内 存、网络、磁盘 I/ O。 注意到,在查询 Q22 执行过程 中,Impala 对集群资源的占用是最少的, 其次是 Hive,Spark SQL 占用资源最多。 由于 Spark SQL 是 基于内存计算的框架,所以在内存占用方面和磁盘 读取上更为明显。 图 7 集群平均 CPU 使用率 Fig.7 Average cluster CPU usage 由于 Hive 和 Spark SQL 均在 JVM 之上运行,对 CPU 和 内 存 的 使 用 依 赖 于 JVM。 如 图 7 所 示, Impala 的 CPU 占用时间要明显少于 Hive 和 Spark SQL,这是由于 Impala 在执行查询过程中,在每个计 算节点上运行只占用一个 CPU 线程。 而 Hive 和 Spark SQL 在 CPU 使用上的优化完全依赖于 JVM。 如图 8 所示,Impala 和 Hive 内存使用率明显小于 Spark SQL,同时使用线程来执行耗费资源较多的 Executor Backend 进程。 图 8 集群内存平均使用量 Fig.8 Average cluster memory usage ·724· 智 能 系 统 学 报 第 12 卷
第5期 何明,等:基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 ·725· 在磁盘性能方面,Impala和Hive的磁盘读取速 找到相应的数据:从压缩角度来看,本文采用 率优于Spark SQL。从图9可以看出,Hive和Impala Snappy压缩方式,以减少数据输人量和加快查询速 数据访问量在Sl时相对一致,高于SparkSQL。在 度;从Spark SQL自身特性分析,Spark SQL基于内 S2中,mpala数据访问量较小,Hive次之,SparkSQL 存计算执行速度快,可以操作Hadoop上多样化格式 最高。在图10中,Impala在S1和S2执行结束后将 的数据并进行高效的结构化分析与处理,提供可用 结果写入HDS时,对磁盘的写入速率迅速增加。 性更好的API进行数据分析,更加灵活且易扩展。 1.6.×10 鉴于此,综合考虑以上几个方面并结合应用驱动的 1.4 -Hive --Impala -Spark SQL 大数据分析与计算实际需求,本文选择Spark SQL 作为SQL-on-Hadoop查询系统,以适应快速证券大 0.8 0.6 数据分析与计算场景下的高并发实时查询应用 0.4 0.2 需求。 102030405060708090 时间s 5 实际应用 图9集群磁盘读取总速率 在本文实现基于SQL-on-Hadoop网上交易日志 Fig.9 Cluster disk read speed 实时分析与计算平台上,目前已存储约60TB的网 上交易日志,并开发和移植了实时监控、统计分析、 350 Hive-Impala -Spark SQL 明细查询等实际应用。 翻 5.1实时运维监控 150 对分布在全国各个节点和服务器的状态实时监 100 控并对各种状态进行及时判断和处理,能够对整个 50 邂 102030405060708090 系统的使用状况有宏观的把控。实时运维监控主 时间/s 要包括技术指标监控、业务指标监控和客户分布。 图10集群磁盘写入总速率 1)如图12所示,技术指标监控主要针对实时的 Fig.10 Cluster disk write speed 请求延迟、成功率、系统冗余(带宽流量/在线数)等 在图I1中,Impala在最后一个阶段的网络流量 指标进行监控。数据从千万级别的当日日志数据 迅速增长,主要由于执行过程中的内部表连接产生 中实时提取,从采集到存储达到秒级实现。延迟情 的结果通过网络传输给其他节点导致。 况主要包括登录、委托、查询(资金查询)和转账这4 450 Hive -Impala -Spark SQL 类业务单位时间段内的平均耗时和峰值情况。系 350 统冗余用于指示系统的资源使用情况,包括系统容 量情况和系统带宽使用情况,能实时展示系统当前 冗余,有助于系统管理员及时掌握当前系统的使用 情况。成功率主要包括登录、委托、转账这几类业 2 30 405060708090 务单位时间段内的处理成功率情况。 时间s 图11集群网络流量 Fig.11 Cluster network traffic 综上所述,Spark SQL执行效率相对较快,它的 查询速度比Hive要快2.7倍。然而,当查询总大小 超过内存大小时,mpala则无法查询。Hive处理的 结果准确率较高,处理速度较慢。因此,Hive比较 适用于批处理应用;mpala适合交互式查询,系统的 稳定性还有待提高;Spark SQL能够降低Hive的延 迟,比较适合多并发和流数据处理场景。 0% 10 1112 通过以上的实验分析和比较,从文件格式角度 来讲,本文选择能够更好地适配Spark SQL的 图12技术指标监控 Parquet列式存储格式,以便快速地从HDFS中扫描 Fig.12 Technical index monitor
在磁盘性能方面,Impala 和 Hive 的磁盘读取速 率优于 Spark SQL。 从图 9 可以看出,Hive 和 Impala 数据访问量在 S1 时相对一致,高于 SparkSQL。 在 S2 中,Impala 数据访问量较小,Hive 次之,SparkSQL 最高。 在图 10 中,Impala 在 S1 和 S2 执行结束后将 结果写入 HDFS 时,对磁盘的写入速率迅速增加。 图 9 集群磁盘读取总速率 Fig.9 Cluster disk read speed 图 10 集群磁盘写入总速率 Fig.10 Cluster disk write speed 在图 11 中,Impala 在最后一个阶段的网络流量 迅速增长,主要由于执行过程中的内部表连接产生 的结果通过网络传输给其他节点导致。 图 11 集群网络流量 Fig.11 Cluster network traffic 综上所述,Spark SQL 执行效率相对较快,它的 查询速度比 Hive 要快 2.7 倍。 然而,当查询总大小 超过内存大小时,Impala 则无法查询。 Hive 处理的 结果准确率较高,处理速度较慢。 因此,Hive 比较 适用于批处理应用;Impala 适合交互式查询,系统的 稳定性还有待提高;Spark SQL 能够降低 Hive 的延 迟,比较适合多并发和流数据处理场景。 通过以上的实验分析和比较,从文件格式角度 来讲, 本 文 选 择 能 够 更 好 地 适 配 Spark SQL 的 Parquet 列式存储格式,以便快速地从 HDFS 中扫描 找到 相 应 的 数 据; 从 压 缩 角 度 来 看, 本 文 采 用 Snappy 压缩方式,以减少数据输入量和加快查询速 度;从 Spark SQL 自身特性分析,Spark SQL 基于内 存计算执行速度快,可以操作 Hadoop 上多样化格式 的数据并进行高效的结构化分析与处理,提供可用 性更好的 API 进行数据分析,更加灵活且易扩展。 鉴于此,综合考虑以上几个方面并结合应用驱动的 大数据分析与计算实际需求,本文选择 Spark SQL 作为 SQL⁃on⁃Hadoop 查询系统,以适应快速证券大 数据分析与计算场景下的高并发实时查询应用 需求。 5 实际应用 在本文实现基于 SQL⁃on⁃Hadoop 网上交易日志 实时分析与计算平台上,目前已存储约 60 TB 的网 上交易日志,并开发和移植了实时监控、统计分析、 明细查询等实际应用。 5.1 实时运维监控 对分布在全国各个节点和服务器的状态实时监 控并对各种状态进行及时判断和处理,能够对整个 系统的使用状况有宏观的把控。 实时运维监控主 要包括技术指标监控、业务指标监控和客户分布。 1)如图 12 所示,技术指标监控主要针对实时的 请求延迟、成功率、系统冗余(带宽流量/ 在线数)等 指标进行监控。 数据从千万级别的当日日志数据 中实时提取,从采集到存储达到秒级实现。 延迟情 况主要包括登录、委托、查询(资金查询)和转账这 4 类业务单位时间段内的平均耗时和峰值情况。 系 统冗余用于指示系统的资源使用情况,包括系统容 量情况和系统带宽使用情况,能实时展示系统当前 冗余,有助于系统管理员及时掌握当前系统的使用 情况。 成功率主要包括登录、委托、转账这几类业 务单位时间段内的处理成功率情况。 图 12 技术指标监控 Fig.12 Technical index monitor 第 5 期 何明,等:基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 ·725·
·726 智能系统学报 第12卷 2)如图13所示,业务指标监控主要针对登录情 ×10 1.2 况、转账情况、委托情况和实时在线人数等指标进 。交易活跃用户 ※1.0 在线用户 行监控。可以从千万级别的当日数据中实时观察 0.8 0.6 当前系统的客户登陆数、系统发生的交易数量和转 0.4 账金额等情况,整个过程实现秒级响应。 802 北京河北上海江苏浙江福建山东河南湖北广东深圳四川 图15活跃用户分布 Fig.15 Active user distribution 特色 撤卖 委卖 登录 130013151330134514:0014151430144515:00 操作时间点/时刻 图16行为轨迹分析 图13业务指标监控 Fig.16 Behavioral traces analysis Fig.13 Business index monitor 5.3业务统计查询与分析 3)如图14所示,客户分布主要针对实时在线客 1)统计报表:如图17所示,统计报表是实际日 志处理中的一项重要需求和应用,对计算的性能和 户与委托分布,站点和来源省份的分布。在线分布 从千万级别的数据中指示一段时间客户的登陆来 实时性要求更高。根据具体业务功能需求,按照指 源和委托来源分布。系统通过登陆和委托源P关 定周期完成系统资源、登录、委托和业务监控等统 联全球P分布区域得出客户的分布情况,P分布 计任务。统计报表主要是提供给系统管理员分析 来自10亿级别的P分布数据源。 系统的资源使用情况和系统健康状态,以便做好相 应的措施和规划,同时为管理者决策提供数据支撑 和参考依据。 300 登录总数 ≤ 250 失败总数 200 150 100 0 0 周 周二周三 周四 周五 (a)登录报表 300 100% 图14客户分布 250 80% Fig.14 Customer distribution 200 60% 150 5.2客户交易行为监控 100 40% +委托数 日志分析更有价值的应用在于发现客户的异常 50 一成功总数 20% +一委托成功率 行为。如图15、16所示,通过大数据平台,可以实时 周 周二周三周四 周五 0% 掌握不同区域和营业部活跃用户的分布,为业务部 (b)业务报表 门做绩效考核、精准营销等提供数据支撑。 图17业务报表 Fig.17 Business report
2)如图 13 所示,业务指标监控主要针对登录情 况、转账情况、委托情况和实时在线人数等指标进 行监控。 可以从千万级别的当日数据中实时观察 当前系统的客户登陆数、系统发生的交易数量和转 账金额等情况,整个过程实现秒级响应。 图 13 业务指标监控 Fig.13 Business index monitor 3)如图 14 所示,客户分布主要针对实时在线客 户与委托分布,站点和来源省份的分布。 在线分布 从千万级别的数据中指示一段时间客户的登陆来 源和委托来源分布。 系统通过登陆和委托源 IP 关 联全球 IP 分布区域得出客户的分布情况, IP 分布 来自 10 亿级别的 IP 分布数据源。 图 14 客户分布 Fig.14 Customer distribution 5.2 客户交易行为监控 日志分析更有价值的应用在于发现客户的异常 行为。 如图 15、16 所示,通过大数据平台,可以实时 掌握不同区域和营业部活跃用户的分布,为业务部 门做绩效考核、精准营销等提供数据支撑。 图 15 活跃用户分布 Fig.15 Active user distribution 图 16 行为轨迹分析 Fig.16 Behavioral traces analysis 5.3 业务统计查询与分析 1)统计报表:如图 17 所示,统计报表是实际日 志处理中的一项重要需求和应用,对计算的性能和 实时性要求更高。 根据具体业务功能需求,按照指 定周期完成系统资源、登录、委托和业务监控等统 计任务。 统计报表主要是提供给系统管理员分析 系统的资源使用情况和系统健康状态,以便做好相 应的措施和规划,同时为管理者决策提供数据支撑 和参考依据。 图 17 业务报表 Fig.17 Business report ·726· 智 能 系 统 学 报 第 12 卷