书名:Kafka Streams实战
ISBN:978-7-115-50739-6
本书由人民邮电出版社发行数字版。版权所有,侵权必究。
您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。
我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。
如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。
著 [美]小威廉 • P. 贝杰克(William P. Bejeck Jr.)
译 牟大恩
责任编辑 杨海玲
人民邮电出版社出版发行 北京市丰台区成寿寺路11号
邮编 100164 电子邮件 315@ptpress.com.cn
网址 http://www.ptpress.com.cn
读者服务热线:(010)81055410
反盗版热线:(010)81055315
Original English language edition, entitled Kafka Streams in Action: Real-time Apps and Microservices with the Kafka Streams API by William P. Bejeck Jr. published by Manning Publications Co., 209 Bruce Park Avenue, Greenwich, CT 06830. Copyright © 2018 by Manning Publications Co.
Simplified Chinese-language edition copyright © 2018 by Posts & Telecom Press. All rights reserved. 本书中文简体字版由Manning Publications Co.授权人民邮电出版社独家出版。未经出版者书面许可,不得以任何方式复制或抄袭本书内容。
版权所有,侵权必究。
Kafka Streams是Kafka提供的一个用于构建流式处理程序的Java库,它与Storm、Spark等流式处理框架不同,是一个仅依赖于Kafka的Java库,而不是一个流式处理框架。除Kafka之外,Kafka Streams不需要额外的流式处理集群,提供了轻量级、易用的流式处理API。
本书包括4部分,共9章,从基础API到复杂拓扑的高级应用,通过具体示例由浅入深地详细介绍了Kafka Streams基础知识及使用方法。本书的主要内容包含流式处理发展历程和Kafka Streams工作原理的介绍,Kafka基础知识的介绍,使用Kafka Streams实现一个具体流式处理应用程序(包括高级特性),讨论状态存储及其使用方法,讨论表和流的二元性及使用场景,介绍Kafka Streams应用程序的监控及测试方法,介绍使用Kafka Connect将现有数据源集成到Kafka Streams中,使用KSQL进行交互式查询等。
本书适合使用Kafka Streams实现流式处理应用的开发人员阅读。
当我在2015年从领英的流数据架构组离职加入Confluent的时候,与Jay和Neha两人有过一次长时间的交流。当时公司刚刚成立,一切都还是从零起步。Jay问我,接下来想要开展哪些工作,我回答说,我已经在流式存储层面,也就是Kafka Core做了两年多的时间,接下来我的兴趣是在存储上,也就是计算层面寻求一些新的挑战。大数据这个提法叫了这么多年了,可是一直以来我们都致力在数据的大规模(volume)上,比如数据系统的可延展性等;我觉得接下来大数据的趋势会向第二个“V”,也就是快速率(velocity)发展,因为越来越多的人已经不满意批处理带来的时间延迟,他们需要的是就在下一秒,从收集的数据中获得信息,产生效益。
所以,接下来我想做流式数据处理。这个想法和他们一拍即合,从那时候开始我投入到Kafka Streams的开发中来。
从写下第一行Kafka Streams的代码到今天已经快4年的时间了,在这期间我有幸目睹了流式数据处理和流事件驱动架构在硅谷的互联网行业,进而在全世界的各个商业领域中突飞猛进的发展。越来越多的人开始从请求/响应以及批处理的应用编程模式向流式处理转移,越来越多的企业开始思考实时计算如何能够给他们的产品或者服务带来信息收益,而Apache Kafka作为当今流数据平台的事实标准,正在被越来越多的人注意和使用。而Kafka Streams作为Apache Kafka项目下原生的流式处理库,也越来越多地被投入到生产环境中,并且得到了大量社区贡献者的帮助。这对我本人而言,是莫大的喜悦和欣慰。
在今年上半年,我的同事Bill Bejeck完成了这本《Kafka Streams实战》,本书是Bill通过总结自身开发并维护真实生产环境下的Kafka Streams的经验完成的,对于想要学习并掌握Kafka Streams以及流事件驱动架构的读者来说是最好的方式之一。本书的译者牟大恩对Kafka源代码了解颇深,此前已著有《Kafka入门与实践》一书,我相信一定能够准确还原Bill在书中想要带给大家的关于流式数据处理应用实践的思维模式。
祝各位读者在探索Kafka Streams的路上不断有惊喜的发现!
——王国璋(Guozhang Wang)
Confluent流数据处理系统架构师
Apache Kafka PMC,Kafka Streams作者之一
Kafka在0.10版本中引入了Kafka Streams,它是一个轻量级、简单易用的基于Kafka实现的构建流式处理应用程序的Java库。虽然它只是一个Java库,但具备了流式处理的基本功能,同时它利用Kafka的分区特性很容易实现透明的负载均衡以及水平扩展,从而达到高吞吐量。
一年前我在写《Kafka入门与实践》一书时,用了专门一章讲解Kafka Streams,由于那是一本关于Kafka的书,因此对Kafka Streams的讲解并没有面面俱到。巧合的是,本书作为一本关于Kafka Streams的书,也是用专门一章来介绍Kafka。就我个人而言,我觉得这两本书中的内容在某种程度上可以互为补充,大家可以根据自己的偏好选择适合自己的Kafka书籍。
我很荣幸有机会翻译本书。通过翻译本书,无论是Kafka Streams知识本身还是本书作者的写作编排方式,都使我收获颇多。Kafka Streams的诸多设计优点在本书中都有详细介绍,并结合具体示例对相关API进行讲解。本书通过模拟近乎真实的场景,从场景描述开始,逐步对问题进行剖析,然后利用Kafka Streams解决问题。阅读本书,读者不仅能够全面掌握Kafka Streams相关的API,而且能够轻松学会如何使用Kafka Streams解决具体问题。
在翻译本书的过程当中,我理解最深的是,国外的技术书籍不是直接给出解决问题的完整代码,而是在场景描述、问题分析、技术选型等方面给予更多的篇幅,这种方式更能够帮助读者真正深入地掌握相关技术的要领,正所谓“授人以鱼,不如授人以渔”。
在此特别感谢人民邮电出版社的杨海玲编辑及其团队,正是他们一丝不苟、认真专业的工作态度,才使本书得以圆满完成。借此机会,我还要感谢我公司信息技术部副总经理、开发中心总经理王洪涛和部门经理熊友根对我的培养,以及同事给予我的帮助。同时还要感谢我的妻子吴小华,姐姐屈海林、尚立霞,妹妹石俊豪,感谢她们在我翻译本书时对我和我儿子的照顾,正是有了她们的帮助,才使我下班回到家时可以全身心投入到翻译工作中。同时,将本书送给我的宝贝儿子牟经纬,作为宝宝周岁的生日礼物,祝他健康、茁壮成长!
虽然在翻译过程中我力争做到“信、达、雅”,但本书许多概念和术语目前尚无公认的中文翻译,加之译者水平有限,译文中难免有不妥或错误之处,恳请读者批评指正。
牟大恩
2018年10月
牟大恩,武汉大学硕士研究生毕业,曾先后在网易杭州研究院、掌门科技、优酷土豆集团担任高级开发工程师和资深开发工程师职务,目前就职于海通证券总部。有多年的Java开发及系统设计经验,专注于互联网金融及大数据应用相关领域。著有《Kafka 入门与实践》,已提交技术发明专利两项,发表论文一篇。
我相信以实时事件流和流式处理为中心的架构将在未来几年变得无处不在。像Netflix、Uber、Goldman Sachs、Bloomberg等技术先进的公司已经建立了这种大规模运行的大型事件流平台。虽然这是一个大胆的断言,但我认为流式处理和事件驱动架构的出现将会对公司如何使用数据产生与关系数据库同样大的影响。
如果你还处在请求/响应风格的应用程序以及使用关系型数据库的思维模式,那么围绕流式处理的事件思维和构建面向事件驱动的应用程序需要你改变这种思维模式,这就是本书的作用所在。
流式处理需要从命令式思维向事件思维的根本性转变——这种转变使响应式的、事件驱动的、可扩展的、灵活的、实时的应用程序成为可能。在业务中,事件思维为组织提供了实时、上下文敏感的决策和操作。在技术上,事件思维可以产生更多自主的和解耦的软件应用,从而产生伸缩自如和可扩展的系统。
在这两种情况下,最终的好处是更大的敏捷性——在业务以及促进业务的技术方面。将事件思维应用于整个组织是事件驱动架构的基础,而流式处理是实现这种转换的技术。
Kafka Streams是原生的Apache Kafka流式处理库,它用Java语言实现,用于构建事件驱动的应用程序。使用Kafka Streams的应用程序可以对数据流进行复杂转换,这些数据流能够自动容错,透明且弹性地分布在应用程序的实例上。自2016年在Apache Kafka的0.10版本中首次发布以来,许多公司已经将Kafka Streams投入生产环境,这些公司包括P站(Pinterest)、纽约时报(The New York Times)、拉博银行(Rabobank)、连我(LINE)等。
我们使用Kafka Streams和KSQL的目标是使流式处理足够简单,并使流式处理成为构建响应事件的事件驱动应用程序的自然方式,而不仅是处理大数据的一个重量级框架。在我们的模型中,主要实体不是用于数据处理的代码,而是Kafka中的数据流。
这是了解Kafka Streams以及Kafka Streams如何成为事件驱动应用程序的关键推动者的极好方式。我希望你和我一样喜欢本书!
——Neha Narkhede
Confluent联合创始人兼首席技术官
Apache Kafka联合创作者
在我作为软件开发人员期间,我有幸在一些令人兴奋的项目上使用了当前软件。起初我客户端和后端都做,但我发现我更喜欢后端开发,因此我扎根于后端开发。随着时间的推移,我开始从事分布式系统相关的工作,从Hadoop开始(那时还是在1.0版本之前)。快进到一个新项目,我有机会使用了Kafka。我最初的印象是使用Kafka工作起来非常简单,也带来很多的强大功能和灵活性。我发现越来越多的方法将Kafka集成到交付项目数据中。编写生产者和消费者的代码很简单,并且Kafka提升了系统的性能。
然后我学习Kafka Streams相关的内容,我立刻意识到:“我为什么需要另一个从Kafka读取数据的处理集群,难道只是为了回写?”当我查看API时,我找到了我所需的流式处理的一切——连接、映射值、归约以及分组。更重要的是,添加状态的方法比我在此之前使用过的任何方法都要好。
我一直热衷于用一种简单易懂的方式向别人解释概念。当我有机会写关于Kafka Streams的书时,我知道这是一项艰苦的工作,但是很值得。我希望为本书付出的辛勤工作能证明一个事实,那就是Kafka Streams是一个简单但优雅且功能强大的执行流式处理的方法。
本书由异步社区出品,社区(https://www.epubit.com/)为您提供相关资源和后续服务。
本书提供源代码下载,要获得以上配套资源,请在异步社区本书页面中点击,跳转到下载界面,按提示进行操作即可。注意:为保证购书读者的权益,该操作会给出相关提示,要求输入提取码进行验证。
作者和编辑尽最大努力来确保书中内容的准确性,但难免会存在疏漏。欢迎您将发现的问题反馈给我们,帮助我们提升图书的质量。
当您发现错误时,请登录异步社区,按书名搜索,进入本书页面,点击“提交勘误”,输入勘误信息,点击“提交”按钮即可。本书的作者和编辑会对您提交的勘误进行审核,确认并接受后,您将获赠异步社区的100积分。积分可用于在异步社区兑换优惠券、样书或奖品。
我们的联系邮箱是contact@epubit.com.cn。
如果您对本书有任何疑问或建议,请您发邮件给我们,并请在邮件标题中注明本书书名,以便我们更高效地做出反馈。
如果您有兴趣出版图书、录制教学视频,或者参与图书翻译、技术审校等工作,可以发邮件给我们;有意出版图书的作者也可以到异步社区在线提交投稿(直接访问www.epubit.com/ selfpublish/submission即可)。
如果您是学校、培训机构或企业,想批量购买本书或异步社区出版的其他图书,也可以发邮件给我们。
如果您在网上发现有针对异步社区出品图书的各种形式的盗版行为,包括对图书全部或部分内容的非授权传播,请您将怀疑有侵权行为的链接发邮件给我们。您的这一举动是对作者权益的保护,也是我们持续为您提供有价值的内容的动力之源。
“异步社区”是人民邮电出版社旗下IT专业图书社区,致力于出版精品IT技术图书和相关学习产品,为作译者提供优质出版服务。异步社区创办于2015年8月,提供大量精品IT技术图书和电子书,以及高品质技术文章和视频课程。更多详情请访问异步社区官网https://www.epubit.com。
“异步图书”是由异步社区编辑团队策划出版的精品IT专业图书的品牌,依托于人民邮电出版社近30年的计算机图书出版积累和专业编辑团队,相关图书在封面上印有异步图书的LOGO。异步图书的出版领域包括软件开发、大数据、AI、测试、前端、网络技术等。
异步社区
微信服务号
首先,我要感谢我的妻子Beth,感谢她在这一过程中给予我的支持。写一本书是一项耗时的任务,没有她的鼓励,这本书就不会完成。Beth,你太棒了,我很感激你能成为我的妻子。我也要感谢我的孩子们,他们在大多数周末都忍受整天坐在办公室里的爸爸,当他们问我什么时候能写完的时候,我总模糊地回答“很快”。
接下来,我要感谢Kafka Streams的核心开发者Guozhang Wang、Matthias Sax、Damian Guy和Eno Thereska。如果没有他们卓越的洞察力和辛勤的工作,就不会有Kafka Streams,我也就没机会写这个颠覆性的工具。
感谢本书的编辑,Manning出版社的Frances Lefkowitz,她的专业指导和无限的耐心让写书变得很有趣。我还要感谢John Hyaduck提供的准确的技术反馈,以及技术校对者Valentin Crettaz对代码的出色审查。此外,我还要感谢审稿人的辛勤工作和宝贵的反馈,正是他们使本书更高质量地服务于所有读者,这些审稿人是Alexander Koutmos、Bojan Djurkovic、Dylan Scott、Hamish Dickson、James Frohnhofer、Jim Manthely、Jose San Leandro、Kerry Koitzsch、László Hegedüs、Matt Belanger、Michele Adduci、Nicholas Whitehead、Ricardo Jorge Pereira Mano、Robin Coe、Sumant Tambe和Venkata Marrapu。
最后,我要感谢Kafka的所有开发人员,因为他们构建了如此高质量的软件,特别是Jay Kreps、Neha Narkhede和Jun Rao,不仅是因为他们当初开发了Kafka,也因为他们创办了Confluent公司——一个优秀而鼓舞人心的工作场所。
William P. Bejeck Jr.(本名Bill Bejeck),是Kafka的贡献者,在Confluent公司的Kafka Streams团队工作。他已从事软件开发近15年,其中有6年专注于后端开发,特别是处理大量数据,并在数据提炼团队中,使用Kafka来改善下游客户的数据流。他是Getting Started with Google Guava(Packt,2013)的作者和“编码随想”(Random Thoughts on Coding)的博主。
我写本书的目的是教大家如何开始使用Kafka Streams,更确切地说,是教大家总体了解如何进行流式处理。我写这本书的方式是以结对编程的视角,我假想当你在编码和学习API时,我就坐在你旁边。你将从构建一个简单的应用程序开始,在深入研究Kafka Streams时将添加更多的特性。你将会了解到如何对Kafka Streams应用程序进行测试和监控,最后通过开发一个高级Kafka Streams应用程序来整合这些功能。
本书适合任何想要进入流式处理的开发人员。虽然没有严格要求,但是具有分布式编程的知识对理解Kafka和Kafka Streams很有帮助。Kafka本身的知识是有用的,但不是必需的,我将会教你需要知道的内容。经验丰富的Kafka开发人员以及Kafka新手将会学习如何使用Kafka Streams开发引人注目的流式处理应用程序。熟悉序列化之类的Java中、高级开发人员将学习如何使用这些技能来构建Kafka Streams应用程序。本书源代码是用Java 8编写的,大量使用Java 8的lambda语法,因此具有lambda(即使是另一种开发语言)程序的开发经验会很有帮助。
本书有4部分,共9章。第一部分介绍了一个Kafka Streams的心智模型,从宏观上向你展示它是如何工作的。以下章节也为那些想学习或想回顾的人提供了Kafka的基础知识。
第二部分继续讨论Kafka Streams,从基础API开始,一直到更复杂的特性,第二部分各章介绍如下。
KTable
。KStream是事件流,而KTable
是相关事件的流或者更新流。第三部分将从开发Kafka Streams应用程序转到对Kafka Streams的管理知识的讨论。
第四部分是本书的压轴部分,在这里你将深入研究使用Kafka Streams开发高级应用程序。
本书包含了很多源代码的例子,包括书中编号的代码清单所标明的代码,以及内联在普通文本中的代码。在这两种情况下,源代码都采用固定宽度字体的格式,以便与普通文本区分开。
在很多情况下,原始源代码已经被重新格式化了。我们增加了断行以及重新缩进,以适应书中可用的页面空间。在极少数情况下,甚至空间还不够,代码清单中包括续行标识()。此外,当在文本中描述代码时,源代码中的注释常常从代码清单中删除。代码清单中附带的许多代码注释,突出显示重要的概念。
最后,需要注意的是:许多代码示例并不是独立存在的,它们只是包含当前讨论的最相关部分代码的节选。你在本书附带的源代码中将会找到所有示例的完整代码。
本书的源代码是使用Gradle工具构建的一个包括所有代码的项目。你可以使用合适的命令将项目导入IntelliJ或Eclipse中。在附带的README.md文件中可以找到使用和导航源代码的完整说明。
购买本书可以免费访问一个由Manning出版社运营的私人网络论坛,可以在论坛上对本书进行评论、咨询技术问题、接受本书作者或者其他用户的帮助。要访问该论坛,请访问Manning出版社官方网站本书页面。你还可以从Manning出版社官方网站了解更多关于Manning论坛及其行为规则。
Manning的论坛承诺为我们的读者提供一个可以在读者之间,以及读者与作者之间进行有意义对话的地方,但并不承诺作者的参与程度,作者对论坛的贡献是自愿的(并没有报酬)。建议你试着问他一些有挑战性的问题,以免他对你的问题没有兴趣!只要本书在印刷中,论坛和之前所讨论的问题归档就会从出版社的网站上获得。
本书封面上的图片描述的是“18世纪一位土耳其绅士的习惯”,这幅插图来自Thomas Jefferys的A Collection of the Dresses of Different Nations, Ancient and Modern(共4卷),于1757年和1772年之间出版于伦敦。扉页上写着:这些是手工着色的铜版雕刻品,用阿拉伯胶加深了颜色。Thomas Jefferys(1719—1771)被称为“乔治三世的地理学家”。他是一位英国制图师,是当时主要的地图供应商。他为政府和其他官方机构雕刻和印刷地图,制作了各种商业地图和地图集,尤其是北美地区的。作为一名地图制作者,他在所调查和绘制的地区激起了人们对当地服饰习俗的兴趣,这些都在这本图集中得到了很好的展示。向往远方、为快乐而旅行,在18世纪后期还是相对较新的现象,类似于这套服饰集的书非常受欢迎,把旅行者和神游的旅行者介绍给其他国家的居民。Jefferys卷宗中绘画的多样性生动地说明了200多年前世界各国的独特性和个性。从那时起,着装样式已经发生了变化,各个国家和地区当时非常丰富的着装多样性也逐渐消失。现在仅依靠衣着很难把一个大陆的居民和另一个大陆的居民区分开来。或许我们已经用文化和视觉上的多样性换取了个人生活的多样化——当然是更为丰富和有趣的文化和艺术生活。
在一个很难将计算机书籍区分开的时代,Manning以两个世纪以前丰富多样的地区生活为基础,通过以Jefferys的图片作为书籍封面来庆祝计算机行业的创造性和首创精神。
在本书第一部分,我们将论述大数据时代的起源,以及它是如何从最初为了满足处理大量数据的需求,到最终发展成为流式处理——当数据到达时立即被处理。本部分还会讨论什么是Kafka Streams,并向大家展示一个没有任何代码的“心智模型” [1](mental model)是如何工作的,以便大家可以着眼于全局。我们还将简要介绍Kafka,让大家快速了解如何使用它。
[1] 心智模型(mental model)又叫心智模式。心智模型的理论是基于一个试图对某事做出合理解释的个人会发展可行的方法的假设,在有限的领域知识和有限的信息处理能力上,产生合理的解释。心智模型是对思维的高级建构,心智模型表征了主观的知识。通过不同的理解解释了心智模型的概念、特性、功用。(引自百度百科)——译者注
本章主要内容
在本书中,你将学习如何使用Kafka Streams来解决流式应用程序的需求问题。从基本的提取、转换、加载(ETL)到复杂的有状态转换再到连接记录,将会覆盖Kafka Streams的各组件,这样你就能够应对流应用程序中遇到的这些挑战。
在深入研究Kafka Streams之前,我们将简要地探索一下大数据处理的历史。当我们在确定问题和解决方案时,将会清楚地看到对Kafka和Kafka Streams的需求是如何演变的。让我们看看大数据时代是如何开始的,是什么导致了应用Kafka Streams的解决方案。
随着大数据框架和技术的出现,现代编程语言出现了爆炸式增长。当然,客户端开发经历了自身的转变,移动设备应用程序的数量也出现了爆炸式增长。但是,无论移动设备市场有多大,客户端技术如何革新,有一个不变的事实:我们每天需要处理的数据越来越多。随着数据量的增长,分析和利用这些数据带来的好处的需求也在同时增长。
然而,有能力批量处理大量数据(批处理)还不够。越来越多的组织机构发现它们需要在数据到达时就要对其进行处理(流式处理)。Kafka Streams提供一种前沿的流式处理方式,它是一个对记录的每个事件进行处理的库。基于每个事件进行处理意味着每个单独的数据记录一到达就能够被及时处理,并不需要将数据分成小批量(微批处理)。
注意
当数据到达时即对其进行处理的需求变得越来越明显时,一种新的策略应运而生——微批处理。顾名思义,所谓微批处理也是批处理,只不过数据量更小。通过减少批尺寸,微批处理有时可以更快地产生结果;但是微批处理仍然是批处理,尽管间隔时间更短。它并不能真正做到对每个事件进行处理。
20世纪90年代中期,互联网才开始真正影响人们的日常生活。从那时起,网络提供的互联互通给我们带来了前所未有的信息访问以及与世界任何地方的任何人即时沟通的能力。在所有这些互联互通访问过程中,一个意想不到的副产品出现了——大量数据的生成。
但在我看来,大数据时代正式始于Sergey Brin和Larry Page创立了谷歌公司的1998年。Sergey Brin和Larry Page开发了一种新的网页搜索排名方法——PageRank算法。在一个很高的层面上来说,PageRank算法通过计算链接到网站的数量和质量来对该网站进行评级。该算法假定一个Web页面越重要或越相关,就会有越多的站点引用它。
图1-1提供了PageRank算法的图形化表示。
图1-1 PageRank算法应用。圆圈代表网站,其中较大的圆圈表示有更多的其他站点链接到它
虽然图1-1是对PageRank算法的极度简化,但展示出该算法实现原理的基本思想。
当时,PageRank是一种革命性的方法。以前,Web上的搜索更倾向于使用布尔逻辑来返回结果。如果一个网站包含了你想要搜索的所有或大部分词条,那么这个网站就会出现在搜索结果中,而不管内容的质量如何。但在所有互联网内容上运行PageRank算法需要一种新的方法——传统的数据处理方法耗时太长。谷歌公司要生存和成长,就需要快速索引所有的内容(“快速”是一个相对的术语),并向公众展示高质量的结果。
谷歌公司为处理所有这些数据开发了另一种革命性的方法——MapReduce范式。MapReduce不仅使谷歌能够做一个公司需要的工作,而且无意中还催生了一个全新的计算产业。
在谷歌公司开发MapReduce时,map和reduce函数并不是什么新概念。谷歌方法的独特之处在于在许多机器上大规模地应用这些简单的概念。
MapReduce的核心在于函数式编程。一个map函数接受一些输入,并在不改变原始值的情况下将这些输入映射到其他对象。下面是一个用Java 8实现的一个简单实例,该实例将一个LocalDate
对象映射为一个字符串消息,而原始的LocalDate
对象则不会被修改。代码片段如下:
Function<LocalDate, String> addDate =
(date) -> "The Day of the week is " + date.getDayOfWeek();
尽管简单,但这个简短的例子足以展示出了一个映射函数是做什么的。
但reduce函数接受一组参数,并将这些参数归约成一个值或者归约后至少参数规模更小。取一组数字并将它们加在一起是一个reduce操作的很好例子。
对一组数字执行归约,首先要初始化一个起始值,本例将起始值设置为0(加法的恒等值)。下一步是将起始值与数字列表中的第一个数相加,然后将第一步相加的结果与列表中的第二个数相加。函数重复执行这个过程,直到列表中最后一个数字,产生一个数值。
下面是归约处理一个包括整型数字1、2、3的列表的步骤,代码片段如下:
0 + 1 = 1 ←--- 将起始值与第一个数相加
1 + 2 = 3 ←--- 将第一步计算结果与列表中的第二个数相加
3 + 3 = 6 ←--- 将第二步的相加之和与列表中第三个数也就是列表中最后一个数相加
正如所看到的,reduce函数将结果集合并在一起形成更小规模的结果集。与映射函数类似,reduce函数也不会修改原始数字列表。
现在,让我们来看看如何使用Java 8的lambda表达式来实现这样一个简单的reduce函数,代码片段如下:
List<Integer> numbers = Arrays.asList(1, 2, 3);
int sum = numbers.reduce(0, (i, j) -> i + j );
由于本书的主要话题不是讲解MapReduce,因此在这里对其背景不做探讨。但是,可以看到MapReduce范式(后来在Hadoop中实现了,最初的开源版本基于谷歌的MapReduce白皮书)引入的一些重要概念在Kafka Streams中依然适用。
接下来,将对这些概念做概括性的论述。需要注意的是,这些概念的介绍将会穿插在整本书当中,因此在下文它们会再次被提及。
对一台机器来说,处理5 TB(5000 GB)的数据可能是非常困难的。但是,如果将这些数据按每台服务器易处理的数据量进行分割,让多台机器去处理,那么数据量巨大的问题就会被最小化。表1-1清晰地说明了这一点。
表1-1 如何分割5 TB数据以提高数据处理吞吐量
服务器数量 |
每台服务器处理的数据量 |
---|---|
10 |
500 GB |
100 |
50 GB |
1000 |
5 GB |
5000 |
1 GB |
从表1-1可知,一开始可能需要处理大量的数据,但是通过将负载分散到更多的机器上,数据的处理就不再是一个问题了。表1-1中的最后一行中1 GB的数据由一台笔记本电脑就可以很轻松地处理。
这是理解关于MapReduce的第一个关键概念:通过在计算机集群中分散负载,可以将数据的巨大规模转换为可管理的数量。
键/值对是一个具有强大含义的简单数据结构。在上一节中,我们看到了将大量数据散布到计算机集群上的价值。分散数据解决了数据处理的问题,但现在的问题是如何将分布在不同机器上的数据汇集起来。
要重新组合分布式数据,可以使用键/值对的键来对数据进行分区。术语“分区”意味着分组,但并不是指使用完全相同的键,而是使用具有相同散列码的键进行分组。要按键将数据分割成分区,可以使用以下公式:
int partition = key.hashCode() % numberOfPartitions;
图1-2展示了如何应用散列函数来获取存储在不同服务器上的奥运赛事的结果,并将其分组到不同赛事的分区上。所有的数据都以键/值对存储,在图1-2中,键是赛事的名称,值是单个运动员的比赛结果。
图1-2 按键对分区上的记录进行分组。尽管记录开始在不同的服务器上,但它们最终会在适当的分区中
分区是一个重要概念,在后面的章节我们将会看到详细的例子。
谷歌MapReduce的另一个重要组件是谷歌文件系统(Google File System,GFS)。正如Hadoop是MapReduce的开源实现,Hadoop文件系统(Hadoop File System,HDFS)是GFS的开源实现。
从较高层次来看,GFS和HDFS都将数据分割成很多个数据块,并将这些数据块分布到集群中。但是GFS或HDFS的精髓部分在于如何处理服务器和硬盘故障,该处理框架不是试图阻止失败,而是通过跨集群复制数据块来接受失败(默认复制因子是3)。
通过复制不同服务器上的数据块,就不必再担心磁盘故障甚至整个服务器故障而导致停产。数据复制对于分布式应用提供容错能力至关重要,而容错能力对于分布式应用的成功是必不可少的。稍后将看到分区和复制是如何在Kafka Streams中工作的。
Hadoop迅速在计算领域流行起来,它允许在使用商业硬件时能够处理巨大数量的数据并具有容错性(节约成本)。但是Hadoop/MapReduce是面向批处理的,面向批处理意味着先收集大量数据,然后处理它,再将处理后的输出结果进行存储以便以后使用。批处理非常适合类似PageRank之类的场景,因为你无法通过实时观察用户的点击来判断整个互联网上哪些资源是有价值的。
但是企业也越来越面临着要求他们更快地响应重要问题的压力,这些问题诸如:
显然,需要另一种解决方案,而这种解决方案就是流式处理。
虽然流式处理有不同的定义,但在本书,我们将流式处理定义为当数据到达系统时就被处理。进一步提炼流式处理的定义为:流式处理是利用连续计算来处理无限数据流的能力,因为数据流是流动的,所以无须收集或存储数据以对其进行操作。
图1-3所示的这个简单的图表示一个数据流,线上的每个圆圈代表一个时间点的数据。数据不断地流动,因为在流式处理中的数据是无限的。
图1-3 这个弹珠图是流式处理的一个简单表示。图中每个圆圈代表某一特定时间点的某些信息或发生的事件,事件的数量是无限的,并且不断地从左向右移动
那么,谁需要使用流式处理呢?需要从可观察到的事件中得到快速反馈的任何人都需要用到流式处理。让我们来看一些例子。
和任何技术解决方案一样,流式处理也不是适用于所有情况的解决方案。对传入数据快速响应或报告的需求是流式处理的一个很好的用例。下面是几个例子。
然而,流式处理不是所有问题领域的通用解决方案。例如,为了有效地预测未来的行为,需要使用大量的数据来消除异常并识别模式和趋势。这里的重点是随着时间的推移分析数据,而不仅仅是最新的数据。
这里要记住一个要点:如果数据到达时需要被立即报告或处理,那么流式处理是一个不错的选择;如果需要对数据进行深入分析,或是为了编制一个大的数据仓库以备后期分析,那么流式处理方式可能就不合适了。现在来看一个流式处理的具体例子。
让我们从应用一般的流式处理方法对零售处理的示例开始。然后,我们将了解如何使用Kafka Streams来实现流式处理应用程序。
假定ane Doe在下班回家的路上想起需要一个牙膏。于是她在回家路上的一家ZMart超市停下来,进去拿了一盒牙膏,然后径直到收银台去支付她购买的物品。收银员问Jane是否是ZClub的会员,然后收银员扫描Jane的会员卡,这里Jane的会员信息就是购买事物中的一部分。
当总价算好之后,Jane将信用卡递给收银员,收银员刷卡并递给Jane收据。当Jane走出商店时,她查看她的电子邮件,收到了一条来自ZMart超市的信息,感谢Jane的惠顾,并附上各种优惠券在Jane下次光顾时可享受折扣。
这个交易是客户不会多加考虑的常见事件,然而你会意识到这意味着什么:丰富的信息可以帮助ZMart更高效地经营,更好地服务客户。让我们向前追溯一下,看看如何使这种交易成为现实。
假设你是ZMart流数据团队的开发负责人,ZMart是一个大型连锁零售店,分布在全国各地。ZMart经营得很好,每年的销售总额都在10亿美元以上。你想要从公司的交易数据中挖掘数据,以提高业务效率。由于你知道要处理的来自ZMart销售数据的数据量非常大,因此无论你选择哪一种技术去实现,这种技术都需要能够快速和大规模处理这些大量数据。
你最终选择流式处理技术,因为当有交易发生时可以利用业务决策和机会,而无须先收集数据然后等几个小时之后再做决策。你召集管理层和你团队的成员商讨并提出了保证流式处理方案成功所必需的4个基本要求。
这些需求本身已经足够明确了,但是对于Jane Doe这样的单笔购买交易,如何实现这些需求呢?
查看前面的需求,我们可以很快地把它们重塑为一个有向无环图(directed acyclic graph,DAG)。客户在注册地完成的交易点是整个图的源节点,那么需求就变成了主源节点的子节点,如图1-4所示。
图1-4 流应用程序的业务需求以有向无环图的形式呈现,图中每个顶点表示一个需求,边表示通过图表的数据流
接下来,我们将介绍如何将购买交易映射到需求图。
在本节中,我们将遍历购买的每一个步骤,并从一个较高层次上了解如何与图1-4中的需求图相关联。在下一节中,我们将介绍如何将Kafka Streams应用到这个过程中。
图的源节点是应用程序消费购买交易数据的地方,如图1-5所示。该节点是将流经该图的销售交易信息的来源。
图1-5 销售交易图的简单开始,该节点是流经该图的原始销售交易信息的来源
图中源节点的子节点是信用卡屏蔽操作所发生的地方,如图1-6所示,它在图中用来表示业务需求的第一个顶点或节点,也是从源节点接收原始销售数据的唯一节点,有效地使该节点成为连接到它的所有其他节点的源。
图1-6 图中的第一节点代表业务需求。这个节点负责屏蔽信用卡号码,并且是唯一一个从源节点接收原始销售数据的节点,有效地使该节点成为连接到它的所有其他节点的源
对于信用卡号屏蔽操作,先复制信用卡号码数据,然后将信用卡号码除最后4位数字之外的其他数字都转化为“x”字符。数据流经图中其余节点将信用卡号码转化为“xxxx-xxxx-xxxx-1122”格式的数据。
模式节点(如图1-7所示)抽取相关信息以确定客户在全国哪个地方购买产品。模式节点不是将数据进行复制,而是从数据中检索出购买相关的物品、日期以及邮政编码,并创建一个包含这些字段的新对象。
图1-7 图中添加了模式节点,该节点从屏蔽节点消费购买信息,并将这些信息转化为一条记录,该记录包括客户何时购买物品以及客户最终完成交易地点对应的邮政编码
这个流程中的下一个子节点是奖励累加器,如图1-8所示。ZMart有一个客户奖励计划,给在ZMart门店购买物品的客户积分。这个节点的职责就是从购买信息中抽取客户的ID和花费的金额,并创建一个包括这两个字段的新对象。
图1-8 奖励节点负责从屏蔽节点消费销售记录,并将其转换为包含购买总额和客户ID的记录
最后的子节点将购买数据写入NoSQL数据存储中以供进一步分析,如图1-9所示。
图1-9 存储节点也使用来自屏蔽节点的记录。这些记录不会转换为任何其他格式, 而是存储在NoSQL数据存储中,以便后期进行专门分析
现在,我们已通过ZMart的需求图跟踪示例购买交易,让我们看看如何使用Kafka Streams将此图转换为函数流式应用程序。
Kafka Streams是一个允许对记录的每个事件执行处理的库,可以在数据到达时使用它来处理数据,而不需要在微批中对数据进行分组。可以在每条记录可用时立即对其进行处理。
ZMart的大多数需求的目标都是对时间敏感的,要能够尽可能快地对数据进行处理,最好能够在事件发生的时候就收集数据。此外,由于ZMart在全国有很多的分店,为了对数据进行分析就需要将所有的交易记录汇集成一个单个流或数据流。基于这些原因,Kafka Streams是非常合适的,当数据到达时用户就可以对其进行处理,并且提供所需的低延迟处理。
在Kafka Streams中,定义了一个处理节点的拓扑结构(我们交替使用处理器和节点这两个术语)。一个或多个节点将Kafka的一个或多个主题作为数据源,还可以添加其他节点,这些节点被认为是子节点(如果对Kafka主题不熟悉,不用担心,第2章中会详细解释)。每个子节点可以定义其他子节点。每个处理节点执行分配给它的任务,然后将记录向前发送给它的每个子节点。这个执行过程以此类推,每个处理节点处理完后就将数据继续发送给它的所有子节点,直到每个子节点都执行了各自的功能。
这个过程听起来熟悉吗?应该熟悉,因为我们做过与其类似的操作,即将ZMart的业务需求转换为处理节点的图。遍历图就是Kafka Streams的工作方式,该图是一个有向无环图(DAG)或处理节点的拓扑结构。
从源节点或父节点开始,该节点有一个或多个子节点,数据总是从父节点流向子节点,永远不会从子节点流向父节点。依此类推,每个子节点依次可以定义自己的子节点。
记录以深度优先的方式流过图表。这种深度优先的方法具有重要的意义:每条记录(键/值对)都被整个图完整地处理完才接受另一条记录进行处理。由于每条记录都以深度优先的方式在整个有向无环图中被处理,因此无须在Kafka Streams中内置背压。
定义
虽然背压(backpressure)有不同的定义,但这里将背压定义为通过缓冲或使用阻塞机制来限制数据流的需要。当源产生数据比接收器能够接收和处理这个数据的速度更快时,背压是必需的。
通过连接或链接多个处理器,可以快速构建复杂的处理逻辑,同时每个组件保持相对简单。正是在处理器这种组合中,Kafka Streams的强大和复杂性才开始发挥作用。
定义
拓扑(topology)是一种将整个系统的各部分进行整理并将它们连接起来的方式。当我们说Kafka Streams有一个拓扑结构时,指的是通过在一个或多个处理器中运行来转换数据。
Kafka Streams与Kafka
正如我们可能已从名字中猜到的一样,Kafka Streams是运行在Kafka之上的,在这个介绍性章节中Kafka相关知识并不是必需的,因为我们更多地从概念上关注Kafka Streams是如何工作的。虽然可能会提到一些Kafka特定的术语,但在大多数情况下,我们关注的是Kafka Streams流式处理方面。
对于新接触Kafka或不熟悉Kafka的读者,第2章将会讲解需要了解的相关知识。了解Kafka的知识是有效使用Kafka Streams的基础。
我们再构建一张处理图,不过这次我们将创建一个Kafka Streams程序。提醒一下,图1-4展示了ZMart业务需求的需求图。请记住,顶点是处理数据的处理节点,而边显示数据流。
虽然在构建新图时,将会创建一个Kafka Streams程序,这依然是一个高层次的方式,将忽略一些细节。在本书后面部分,当我们看到实际代码时会有更多的细节。
一旦Kafka Streams程序开始消费消息记录,就会将原始记录转换为Purchase
对象。以下信息将构成一个Purchase
对象:
设计任何Kafka Streams程序的第一步都是为流建立一个源。源可以是以下任何一种:
对于本例,源将是一个名为“transactions”的单个主题。如果不熟悉Kafka术语,记住,我们将会在第2章中对这些术语进行解释。
需要注意的是,对于Kafka,Kafka Streams程序看起来像任何其他消费者和生产者的组合。任何数量的应用程序都可以与流式程序一起订阅同一个主题。图1-10表示拓扑中的源节点。
图1-10 源节点:一个Kafka主题
现在已定义好了一个源节点,就可以开始创建一些处理数据的处理器。第一个目标就是屏蔽购买记录中所记录的信用卡号码。第一个处理器用来转换信用卡号码,例如,将1234-5678- 9123-2233的信用卡号码转换为xxxx-xxxx-xxxx-2233。
由KStream.mapValues
方法将执行如图1-11所展示的屏蔽操作,它将返回一个新的KStream
实例,其值由指定的ValueMapper
进行屏蔽处理。这个特别的KStream
实例将是我们定义的其他任何处理器的父处理器。
图1-11 屏蔽处理器是主源节点的一个子节点。该处理器接收所有的原始销售交易记录,然后发出将信用卡号码进行屏蔽后的新记录
每次通过一个转换方法创建一个新的KStream
实例,其本质是创建了一个新的处理器,这个新处理器会连接到已创建好的其他处理器。通过组合的处理器,我们可以使用Kafka Streams优雅地创建复杂的数据流。
需要特别注意的是,通过调用一个方法返回一个新的KStream
实例不会导致原实例
停止消费消息。一个转换方法创建一个新的处理器,并添加到现有的处理器拓扑中。然后用更新后的拓扑作为一个参数来创建新的KStream
实例,新的KStream
实例从创建它的节点处开始接受消息。
你很可能会构建新的KStream
实例来执行额外的转换,为其原来的目的而保留原来的流。当我们定义第二个和第三个处理器时,你就会看到这样的例子。
虽然可以让ValueMapper
将传入的值转换为一个完全新的类型,但在本例它只返回一个更新后的Purchase
对象的副本。使用映射器更新一个对象是在我们在KStream
中经常看到的一种模式。
现在你应该清楚地了解了如何构建处理器管道来转换和输出数据。
下一个要创建的是可以捕获用于确定该国不同地区购买模式所需信息的处理器(如图1-12所示)。为此,将向我们创建的第一个处理器(KStream
)添加一个子处理节点。第一个处理器产生的是对信用卡号码做了屏蔽的Purchase
对象。
图1-12 购买模式处理器获取Purchase
对象并将该对象转换为PurchasePattern
对象,PurchasePattern
对象包括购买的物品及交易发生点的邮政编码两个属性。 一个新处理器从模式处理器获取记录并把它们输出写入Kafka主题中
购买模式处理器从其父节点接收一个Purchase
对象,并将该对象映射成一个新的PurchasePattern
对象。映射过程提取实际购买的物品(如牙膏)和买入时使用的邮政编码,用这些信息创建一个PurchasePattern
对象,第3章将详细讨论映射处理的过程。
接下来,购买模式处理器添加一个子处理器节点来接收新PurchasePattern
对象,并将其写入一个名为patterns
的Kafka主题中。当被写入Kafka主题时,PurchasePattern
对象被转换成某种形式的可转换的数据。然后,其他应用程序可以消费这些信息,并使用这些信息来确定给定区域的库存水平和购买趋势。
第三个处理器将为客户奖励程序提取信息(如图1-13所示)。这个处理器也是原始处理器的子节点,它接收Purchase
对象,然后将该对象映射为另一种类型:RewardAccumulator
对象。
图1-13 客户奖励处理器负责将Purchase
对象转换成RewardAccumulator
对象,该对象包括客户ID,交易日期以及交易金额。一个子处理器将Rewards对象写入另一个Kafka主题中
客户奖励处理器也添加了一个子处理节点,用于将RewardAccumulator
对象输出写入Kafka的rewards
主题中。其他程序通过从rewards
主题中消费记录来确定ZMart的客户得到何种奖励,例如Jane Doe从购买情景中收到的电子邮件。
最后一个处理器如图1-14所示,它是屏蔽处理器节点的第3个子节点,负责将整个已经过屏蔽处理的购买记录输出写到一个叫作purchases的
主题中。该主题用于为NoSQL存储应用程序提供数据,当有记录写入时就会被消费,这些记录将用于以后做特定的分析。
图1-14 最后一个处理器负责将整个Purchase
对象写入另一个Kafka主题中。订阅该主题的消费者将结果存储在NoSQL存储(如MongoDB)中
正如所看到的,第一个处理器用来屏蔽信用卡号码,并给其他三个处理器提供数据,其中两个处理器将进一步提炼和转换数据,另一个将屏蔽结果写入Kafka主题中,以进一步提供给其他消费者使用。通过使用Kafka Streams,可以构建一个强大的节点连接的处理图,以对传入的数据执行流式处理。
要理解Kafka Streams,应该先了解一些Kafka的知识。在第2章中我们将为不了解Kafka的读者介绍以下基本内容。
如果你对Kafka已经非常熟悉了,可以直接跳到第3章,在第3章中我们将基于本章讨论的示例构建一个Kafka Streams应用程序。
本章主要内容
虽然这是一本关于Kafka Streams的书,但是要研究Kafka Streams不可能不探讨Kafka,毕竟,Kafka Streams是一个运行在Kafka之上的库。
Kafka Streams设计得非常好,因此即使具有很少或者零Kafka经验的人都可以启动和运行Kafka Streams。但是,你所取得的进步和对Kafka调优的能力将是有限的。掌握Kafka的基础知识对有效使用Kafka Streams来说是必要的。
注意
本章面向的读者是对Kafka Streams有兴趣,但对Kafka本身具有很少或零经验的开发者。如果读者对Kafka具备很好的应用知识,那么就可以跳过本章,直接阅读第3章。
Kafka是一个很大的话题,很难通过一章进行完整论述。本章将会覆盖足以使读者很好地理解Kafka的工作原理和一些核心配置项设置的必备知识。要想更深入了解Kafka的知识,请看Dylan Scott写的Kafka in Action(Manning,2018)
如今,各组织都在研究数据。互联网公司、金融企业以及大型零售商现在比以往任何时候都更善于利用这些数据。通过利用数据,既能更好地服务于客户,又能找到更有效的经营方式(我们要对这种情况持积极态度,并且在看待客户数据时要从好的意图出发)。
让我们考虑一下在ZMart数据管理解决方案中的各种需求。
在第1章中,已介绍过大型零售公司ZMart。那时,ZMart需要一个流式处理平台来利用公司的销售数据,以便更好地提供客户服务并提升销售总额。但在那时的6个月前,ZMart期待了解它的数据情况,ZMart最初有一个定制的非常有效的解决方案,但是很快就发现该解决方案变得难以驾驭了,接下来将看到其原因。
最初,ZMart是一家小公司,零售销售数据从各分离的应用程序流入系统。这种方法起初效果还是不错的,但随着时间的推移,显然需要一种新的方法。一个部门的销售数据不再只是该部门所感兴趣的,公司的其他部门也可能感兴趣,并且不同的部门对数据的重要性和数据结构都有不同的需求。图2-1展示了ZMart原始的数据平台。
图2-1 ZMart原始数据架构简单,足够使每个信息源流入和流出信息
随着时间的推移,ZMart通过收购其他公司以及扩大其现有商店的产品而持续增长。随着应用程序的添加,应用程序之间的连接变得更加复杂,由最初的少量的应用程序之间的通信演变成了一堆名副其实的意大利面条。如图2-2所示,即使只有3个应用程序,连接的数量也很烦琐且令人困惑。可以看到,随着时间的推移,添加新的应用程序将使这种数据架构变得难以管理。
图2-2 随着时间的推移,越来越多的应用程序被添加进来,连接所有这些信息源变得非常复杂
一个解决ZMart问题的方案是创建一个接收进程来控制所有的交易数据,即建立一个交易数据中心。这个交易数据中心应该是无状态的,它以一种方式接受交易数据并存储,这种方式是任何消费应用程序可以根据自己的需要从数据中心提取信息。对哪些数据的追踪取决于消费应用程序,交易数据中心只知道需要将交易数据保存多久,以及在什么时候切分或删除这些数据。
也许你还没有猜到,我们有Kafka完美的用例。Kafka是一个具有容错能力、健壮的发布/订阅系统。一个Kafka节点被称为一个代理,多个Kafka服务器组成一个集群。Kafka将生产者写入的消息存储在Kafka的主题之中,消费者订阅Kafka主题,与Kafka进行通信以查看订阅的主题是否有可用的消息。图 2-3 展示了如何将Kafka想象为销售交易数据 中心。
现在大家已经对Kafka的概况有了大致的了解,在下面的几节中将进行仔细研究。
图2-3 使用Kafka作为销售交易中心显著简化了ZMart数据架构,现在每台服务器不需要知道其他的信息来源,它们只需要知道如何从Kafka读取数据和将数据写入Kafka
在接下来的几个小节中,我们将介绍Kafka体系架构的关键部分以及Kafka的工作原理。如果想尽早地体验运行Kafka,可以直接跳到2.6节,安装和运行Kafka。等Kafka安装之后,再回到这里来继续学习Kafka。
在前一节中,我曾说过Kafka是一个发布/订阅系统,但更精确地说法是Kafka充当了消息代理。代理是一个中介,将进行互利交换或交易但不一定相互了解的两部分汇聚在一起。图2-4展示了ZMart数据架构的演化。生产者和消费者被添加到图中以展示各单独部分如何与Kafka进行通信,它们之间不会直接进行通信。
Kafka将消息存储在主题中,并从主题检索消息。消息的生产者和消费者之间不会直接连接。此外,Kafka并不会保持有关生产者和消费者的任何状态,它仅作为一个消息交换中心。
Kafka主题底层的技术是日志,它是Kafka追加输入记录的文件。为了帮助管理进入主题的消息负载,Kafka使用分区。在第1章我们讨论了分区,大家可以回忆一下,分区的一个应用是将位于不同服务器上的数据汇集到同一台服务器上,稍后我们将详细讨论分区。
图2-4 Kafka是一个消息代理,生产者将消息发送到Kafka,这些消息被存储,并通过主题订阅的方式提供给消费者
Kafka底层的机制就是日志。大多数软件工程师都对日志很熟悉,日志用于记录应用程序正在做什么。如果在应用程序中出现性能问题或者错误,首先检查的是应用程序的日志,但这是另一种类型的日志。在Kafka(或者其他分布式系统)的上下文中,日志是“一种只能追加的,完全按照时间顺序排列的记录序列”[1]。
图2-5展示了日志的样子,当记录到达时,应用程序将它们追加到日志的末尾。记录有一个隐含的时间顺序,尽管有可能不是与每条记录相关联的时间戳,因为最早的记录在左边,后达到的记录在右端。
日志是具有强大含义的简单数据抽象,如果记录按时间有序,解决冲突或确定将哪个更新应用到不同的机器就变得明确了:最新记录获胜。
Kafka中的主题是按主题名称分隔的日志,几乎可以将主题视为有标签的日志。如果日志在一个集群中有多个副本,那么当一台服务器宕机后,就能够很容易使服务器恢复正常:只需重放日志文件。从故障中恢复的能力正是分布式提交日志具有的。
图2-5 日志是追加传入记录的文件——每条新到达的记录都被立即放在接收到的最后一条记录之后,这个过程按时间顺序对记录进行排序
我们只触及了关于分布式应用程序和数据一致性的深入话题的表面,但到目前为止所讲解的知识应该能让读者对Kafka涉及的内容有了一个基本的了解。
当安装Kafka时,其中一个配置项是log.dir
,该配置项用来指定Kafka存储日志数据的路径。每个主题都映射到指定日志路径下的一个子目录。子目录数与主题对应的分区数相同,目录名格式为“主题名_分区编号”(将在下一节介绍分区)。每个目录里面存放的都是用于追加传入消息的日志文件,一旦日志文件达到某个规模(磁盘上的记录总数或者记录的大小),或者消息的时间戳间的时间间隔达到了所配置的时间间隔时,日志文件就会被切分,传入的消息将会被追加到一个新的日志文件中(如图2-6所示)。
图2-6 logs目录是消息存储的根目录,/logs目录下的每个目录代表一个主题的分区,目录中的文件名以主题的名称打头,然后是下划线,后面接一个分区的编号
可以看到日志和主题是高度关联的概念,可以说一个主题是一个日志,或者说一个主题代表一个日志。通过主题名可以很好地处理经由生产者发送到Kafka的消息将被存储到哪个日志当中。既然已经讨论了日志的概念,那么我们再来讨论Kafka另一个基本概念——分区。
分区是Kafka设计的一个重要部分,它对性能来说必不可少。分区保证了同一个键的数据将会按序被发送给同一个消费者。图2-7展示了分区的工作原理。
图2-7 Kafka使用分区来实现高吞吐量,并将一个主题的消息在集群的不同服务器中传播
对主题作分区的本质是将发送到主题的数据切分到多个平行流之中,这是Kafka能够实现巨大吞吐量的关键。我们解释过每个主题就是一个分布式日志,每个分区类似于一个它自己的日志,并遵循相同的规则。Kafka将每个传入的消息追加到日志末尾,并且所有的消息都严格按时间顺序排列,每条消息都有一个分配给它的偏移量。Kafka不保证跨分区的消息有序,但是能够保证每个分区内的消息是有序的。
除了增加吞吐量,分区还有另一个目的,它允许主题的消息分散在多台机器上,这样给定主题的容量就不会局限于一台服务器上的可用磁盘空间。
现在让我们看看分区扮演的另一个关键角色:确保具有相同键的消息最终在一起。
Kafka处理键/值对格式的数据,如果键为空,那么生产者将采用轮询(round-robin)方式选择分区写入记录。图2-8展示了用非空键如何分配分区的操作。
如果键不为空,Kafka会使用以下公式(如下伪代码所示)确定将键/值对发送到哪个分区:
HashCode.(key) % number of partitions
通过使用确定性方法来选择分区,使得具有相同键的记录将会按序总是被发送到同一个分区。默认的分区器使用此方法,如果需要使用不同的策略选择分区,则可以提供自定义的分区器。
图2-8 “foo”被发送到分区0,“bar”被发送到分区1。通过键的
字节散列与分区总数取模来获得数据被分配的分区
为什么要编写自定义分区器呢?在几个可能的原因中,下面将举一个简单的例子——组合键的使用。
假设将购买数据写入Kafka,该数据的键包括两个值,即客户ID和交易日期,需要根据客户ID对值进行分组,因此对客户ID和交易日期进行散列是行不通的。在这种情况下,就需要编写一个自定义分区器,该分区器知道组合键的哪一部分决定使用哪个分区。例如,/src/main/java/ bbejeck/model/PurchaseKey.java中的组合键,如代码清单2-1所示。
代码清单2-1 组合键PurchaseKey类
public class PurchaseKey {
private String customerId;
private Date transactionDate;
public PurchaseKey(String customerId, Date transactionDate) {
this.customerId = customerId;
this.transactionDate = transactionDate;
}
public String getCustomerId() {
return customerId;
}
public Date getTransactionDate() {
return transactionDate;
}
}
当提及分区时,需要保证特定用户的所有交易信息都会被发送到同一个分区中。但是整体作为键就无法保证,因为购买行为会在多个日期发生,包括交易日期的记录对一个用户而言就会导致不同的键值,就会将交易数据随机分布到不同的分区中。若需要确保具有相同客户ID的交易信息都发送到同一个分区,唯一的方法就是在确定分区时使用客户ID作为键。
代码清单2-2所示的自定义分区器的例子就满足需求。PurchaseKeyPartitioner
类(源代码见src/ main/java/bbejeck/chapter_2/partitioner/PurchaseKeyPartitioner.java)从键中提取客户ID来确定使用哪个分区。
代码清单2-2 自定义分区器PurchaseKeyPartitioner类
public class PurchaseKeyPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key,
byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
Object newKey = null;
if (key != null) { ←--- 如果键不为空,那么提取客户ID
PurchaseKey purchaseKey = (PurchaseKey) key;
newKey = purchaseKey.getCustomerId();
keyBytes = ((String) newKey).getBytes(); ←--- 将键的字节赋值给新的值
}
return super.partition(topic, newKey, keyBytes, value, ←--- 返回具有已被更新键的分区,并将其委托给超类
valueBytes, cluster);
}
}
该自定义分区器继承自DefaultPartitioner
类,当然也可以直接实现Partitioner
接口,但是在这个例子中,在DefaultPartitioner
类中有一个已存在的逻辑。
请注意,在创建自定义分区器时,不仅局限于使用键,单独使用值或与键组合使用都是有效的。
注意
Kafka API提供了一个可以用来实现自定义分区器的
Partitioner
接口,本书不打算讲解从头开始写一个分区器,但是实现原则与代码清单2-2相同。
已经看到如何构造一个自定义分区器,接下来,将分区器与Kafka结合起来。
既然已编写了一个自定义分区器,那就需要告诉Kafka使用自定义的分区器代替默认的分区器。虽然还没有讨论生产者,但在设置Kafka生产者配置时可以指定一个不同的分区器[2],配置如下:
partitioner.class=bbejeck_2.partitioner.PurchaseKeyPartitioner
通过为每个生产者实例设置分区器的方式,就可以随意地为任何生产者指定任何分区器类。在讨论Kafka生产者时再对生产者的配置做详细介绍。
警告
在决定使用的键以及选择键/值对的部分作为分区依据时,一定要谨慎行事。要确保所选择的键在所有数据中具有合理的分布,否则,由于大多数数据都分布在少数几个分区上,最终导致数据倾斜。
在创建主题时决定要使用的分区数既是一门艺术也是一门科学。其中一个重要的考虑因素是流入该主题的数据量。更多的数据意味着更多的分区以获得更高的吞吐量,但与生活中的任何事物一样,也要有取舍。
增加分区数的同时也增加了TCP连接数和打开的文件句柄数。此外,消费者处理传入记录所花费的时间也会影响吞吐量。如果消费者线程有重量级处理操作,那么增加分区数可能有帮助,但是较慢的处理操作最终将会影响性能。
我们已经讨论了日志和对主题进行分区的概念,现在,花点时间结合这两个概念来阐述分布式日志。
到目前为止,我们讨论日志和对主题进行分区都是基于一台Kafka服务器或者代理,但典型的Kafka生产集群环境包括多台服务器。故意将讨论集中单个节点上,是因为考虑一个节点更容易理解概念。但在实践中,总是使用包括多台服务器的Kafka集群。
当对主题进行分区时,Kafka不会将这些分区分布在一台服务上,而是将分区分散到集群中的多台服务器上。由于Kafka是在日志中追加记录,因此Kafka通过分区将这些记录分发到多台服务器上。图2-9展示了这个过程。
让我们通过使用图2-9作为一个向导来完成一个快速实例。对于这个实例,我们假设有一个主题,并且键为空,因此生产者将通过轮询的方式分配分区。
生产者将第1条消息发送到位于Kafka代理1上的分区0中[3],第2条消息被发送到位于Kafka代理1上的分区1中,第3条消息被发送到位于Kafka代理2上的分区2中。当生产者发送第6条消息时,消息将会被发送到Kafka代理3上的分区5中,从下一条消息开始,又将重复该步骤,消息将被发送到位于Kafka代理1上的分区0中。以这种方式继续分配消息,将消息分配到Kafka集群的所有节点中。
图2-9 生产者将消息写入主题的分区中,如果消息没有关联键,那么生产者就会通过轮询方式选择一个分区,否则通过键的散列值与分区总数取模来决定分区
虽然远程存储数据听起来会有风险,因为服务器有可能会宕机,但Kafka提供了数据冗余。当数据被写入Kafka的一个代理时,数据会被复制到集群中一台或多台机器上(在后面小节会介绍副本)。
到目前为止,我们已经讨论了主题在Kafka中的作用,以及主题如何及为什么要进行分区。可以看到,分区并不都位于同一台服务器上,而是分布在整个集群的各个代理上。现在是时候来看看当服务器故障时Kafka如何提供数据可用性。
Kafka代理有领导者(leader)和追随者(follower)的概念。在Kafka中,对每一个主题分区(topic partition),会选择其中一个代理作为其他代理(追随者)的领导者。领导者的一个主要职责是分配主题分区的副本给追随者代理服务器。就像Kafka在集群中为一个主题分配分区一样,Kafka也会在集群的多台服务器中复制分区数据。在深入探讨领导者、追随者和副本是如何工作之前,先来介绍Kafka为实现这一点所使用的技术。
如果你是个Kafka菜鸟,你可能会问自己:“为什么在Kafka的书中会谈论Apache ZooKeeper?”Apache ZooKeeper是Kafka架构不可或缺的部分,正是由于ZooKeeper才使得Kafka有领导者代理,并使领导者代理做诸如跟踪主题副本的事情,ZooKeeper官网对其介绍如下:
ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和组服务。这些类型的所有服务都是通过分布式应用程序以某种形式使用。
既然Kafka是一个分布式应用程序,那么它一开始就应该知道ZooKeeper在其架构中的作用。在这里的讨论中,我们只考虑两个或多个Kafka服务器的安装问题。
在Kafka集群中,其中一个代理会被选为控制器。在2.3.4节我们介绍了分区以及如何在集群的不同服务器之间分配分区。主题分区有一个领导者分区和一到多个追随者分区(复制的级别决定复制的程度[4]),当生成消息时,Kafka将记录发送到领导者分区对应的代理上。
Kafka使用ZooKeeper来选择代理控制器,对于其中涉及的一致性算法的探讨已超出本书所讲内容的范围,因此我们不做深入探讨,只声明ZooKeeper从集群中选择一个代理作为控制器。
如果代理控制器发生故障或者由于任何原因而不可用时,ZooKeeper从与领导者保持同步的一系列代理(已同步的副本[ISR])中选出一个新的控制器,构成该系列的代理是动态的[5],ZooKeeper只会从这个代理系列中选择一个领导者[6]。
Kafka在代理之间复制记录,以确保当集群中的节点发生故障时数据可用。可以为每个主题(正如前面介绍的消息发布或消费实例中的主题)单独设置复制级别也可以为集群中的所有主题设置复制级别[7]。图2-10演示了代理之间的复制流。
Kafka复制过程非常简单,一个主题分区对应的各代理从该主题分区的领导者分区消费消息,并将消息追加到自己的日志当中。正如2.3.12节所论述的,与领导者代理保持同步的追随者代理被认为是ISR,这些ISR代理在当前领导者发生故障或者不可用时有资格被选举为领导者。[8]
图2-10 代理1和代理3是一个主题分区的领导者,同时也是另外一个分区的追随者,而代理2只是追随者,追随者代理从领导者代理复制数据
代理控制器的职责是为一个主题的所有分区建立领导者分区和追随者分区的关系,如果一个Kafka节点宕机或者没有响应(与ZooKeeper之间的心跳),那么所有已分配的分区(包括领导者和追随者)都将由代理控制器重新进行分配。图2-11演示了一个正在运行的代理控制器。[9]
图2-11展示了一个简单的故障情景。第1步,代理控制器检测到代理3不可用。第2步,代理控制器将代理3上分区的领导权重新分配给代理2。
ZooKeeper也参与了Kafka以下几个方面的操作。
图2-11 代理控制器负责将其他代理分配为某些主题/分区的领导者代理和另一些主题/分区的追随者代理, 当代理不可用时,代理控制器将已分配给不可用代理的重新分配给集群中的其他代理
现在可知Kafka为什么依赖于Apache ZooKeeper了,正是ZooKeeper使得Kafka有了一个带着追随者的领导者代理,领导者代理的关键角色是为追随者分配主题分区,以便进行复制,以及在代理成员出现故障时重新分配主题分区。
对追加日志已进行了介绍,但还没有谈到随着日志持续增长如何对其进行管理。一个集群中旋转磁盘的空间是一个有限的资源,因此对Kafka而言,随着时间的推移,删除消息是很重要的事。在谈到删除Kafka中的旧数据时,有两种方法,即传统的日志删除和日志压缩。
日志删除策略是一个两阶段的方法:首先,将日志分成多个日志段,然后将最旧的日志段删除。为了管理Kafka不断增加的日志,Kafka将日志切分成多个日志段。日志切分的时间基于消息中内置的时间戳。当一条新消息到达时,如果它的时间戳大于日志中第一个消息的时间戳加上log.roll.ms
配置项配置的值时,Kafka就会切分日志。此时,日志被切分,一个新的日志段会被创建并作为一个活跃的日志段,而以前的活跃日志段仍然为消费者提供消息检索[10]。
日志切分是在设置Kafka代理时进行设置的[11]。日志切分有两个可选的配置项。
log.roll.ms
——这个是主配置项,但没有默认值。log.roll.hours
——这是辅助配置项,仅当log.roll.ms
没有被设置时使用,该配置项默认值是168小时。随着时间的推移,日志段的数据也将不断增加,为了为传入的数据腾出空间,需要将较旧的日志段删除。为了删除日志段,可以指定日志段保留的时长。图2-12说明了日志切分的过程。
图2-12 左边是当前日志段,右上角是一个已被删除的日志段,在其下面是最近切分的仍然在使用的日志段
与日志切分一样,日志段的删除也基于消息的时间戳,而不仅是时钟时间或文件最后被修改的时间,日志段的删除基于日志中最大的时间戳。用来设置日志段删除策略的3个配置项按优先级依次列出如下,这里按优先级排列意味着排在前面的配置项会覆盖后面的配置项。
log.retention.ms
——以毫秒(ms)为单位保留日志文件的时长。log.retention.minutes
——以分钟(min)为单位保留日志文件的时长。log.retention.hours
——以小时(h)为单位保留日志文件。提出这些设置的前提是基于大容量主题的假设,这里大容量是指在一个给定的时间段内保证能够达到文件最大值。另一个配置项log.retention.bytes
,可以指定较长的切分时间阈值,以控制I/O操作。最后,为了防止日志切分阈值设置得相对较大而出现日志量显著增加的情况,请使用配置项log.segment.bytes
来控制单个日志段的大小。
对于键为空的记录以及独立的记录[12],删除日志的效果很好。但是,如果消息有键并需要预期的更新操作,那么还有一种方法更适合。
假设日志中已存储的消息都有键,并且还在不停地接收更新的消息,这意味着具有相同键的新记录将会更新先前的值。例如,股票代码可以作为消息的键,每股的价格作为定期更新的值。想象一下,使用这些信息来展示股票的价值,并出现程序崩溃或者重启,这就需要能够让每个键恢复到最新数据[13]。
如果使用删除策略,那么从最后一次更新到应用程序崩溃或重启之间的日志段就可能被去除,启动时就得不到所有的记录[14]。一种较好的方式是保留给定键的最近已知值,用与更新数据库表键一样的方式对待下一条记录[15]。
按键更新记录是实现压缩主题(日志)的表现形式。与基于时间和日志大小直接删除整个日志段的粗粒度方式不同,压缩是一种更加细粒度的方式,该方式是删除日志中每个键的旧数据。从一个很高的层面上来说,一个日志清理器(一个线程池)运行在后台,如果后面的日志中出现了相同的键,则日志清理器就会重新复制日志段文件并将该键对应的旧记录去除。图2-13阐明了日志压缩是如何为每个键保留最新消息的。
这种方式保证了给定键的最后一条记录在日志中。可以为每个主题指定日志保留策略,因此完全有可能某些主题使用基于时间的保留,而其他主题使用压缩。
默认情况下,日志清理功能是开启的。如果要对主题使用压缩,那么需要在创建主题时设置属性log.cleanup.policy=compact
。
在Kafka Streams中使用应用状态存储时就要用到压缩,不过并不需要我们自己来创建相应的日志或主题——框架会处理。然而,理解压缩的原理是很重要的,日志压缩是一个宽泛的话题,我们仅谈论至此。如果想了解压缩方面的更多信息,参见Kafka官方文档。
注意
当使用
cleanup.policy
为压缩时,你可能好奇如何从日志中去除一条记录。对于一个压缩的主题,删除操作会为给定键设置一个null
值,作为一个墓碑标记。任何值为null
的键都确保先前与其键相同的记录被去除,之后墓碑标记自身也会被去除。
图2-13 左边是压缩前的日志,可以看到具有不同值的重复键,这些值是用来更新给定键的。右边是压缩后的日志,保留了每个键的最新值,但日志变小了
本节的关键内容是:如果事件或消息是独立、单独的,那么就使用日志删除,如果要对事件或消息进行更新,那就使用日志压缩。
我们已经花了很多时间介绍Kafka内部是如何处理数据的,现在,让我们转移到Kafka外部,探讨如何通过生产者向Kafka发送消息,以及消费者如何从Kafka读取消息。
[1] Jay Kreps, “The Log: What Every Software Engineer Should Know About Real-time Data’s Unifying Abstraction”(日志:每个软件工程师都应该知道实时数据的统一抽象)。
[2] 这里说的不同的分区器,是指不使用默认分区器,这里指定自定义分区器来覆盖默认分区器。 ——译者注
[3] 代理1是指代理服务器对应的broker.id为1,分区0表示分区编号为0。 ——译者注
[4] 这里的级别是指分区是领导者分区还是追随者分区。——译者注
[5] 代理是动态的是指根据代理的存活情况动态地将代理从ISR集合中移除或将代理加入ISR集合中。——译者注
[6] Kafka官方文档“Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)”。
[7] 复制级别也就是我们通常说的副本数。——译者注
[8] Kafka官方文档“Replication”。
[9] 本节的一些信息来自Gwen Shapira在Qurora上的回答:“What is the actual role of ZooKeeper in Kafka? What benefits will I miss out on if I don’t use ZooKeeper and Kafka together?”。(ZooKeeper在Kafka中的实际角色是什么?如果我们不将ZooKeeper和Kafka一起使用会错失哪些好处?)
[10] Kafka总是将消息追加到活跃日志段的末尾。——译者注
[11] Kafka官方文档“Broker Configs”。
[12] 独立的记录是指若消息有键时,各消息的键都不相同。——译者注
[13] Kafka官方文档“Log Compaction”。
[14] 由于采用删除策略,位于被删除日志段中的数据被删除了,因此在重启后这些数据就丢失了,所以说在启动后就得不到所有的记录。——译者注
[15] 数据库中存在该键对应的记录时就做更新,否则就在数据库中插入一条记录。——译者注
回到ZMart对集中销售交易数据中心的需求,看看如何将购买交易数据发送到Kafka。在Kafka中,生产者是用于发送消息的客户端。图2-14重述ZMart的数据结构,突出显示生产者,以强调它们在数据流中适合的位置。
尽管ZMart有很多的销售交易,但现在我们只考虑购买一个单一物品:一本10.99美元的书。当消费者完成销售交易时,交易信息将被转换为一个键/值对并通过生产者发送到Kafka。
键是客户ID,即123447777
,值是一个JSON格式的值,即"{\"item\":\"book\",\ "price\":10.99}"
(这里已把双引号转义了,这样JSON可以被表示为Java中的字符串)。有了这种格式的数据,就可以使用生产者将数据发送到Kafka集群。代码清单2-3所示的示例代码可以在源代码/src/main/java/bbejeck.chapter_2/producer/SimpleProducer.java类中找到。
图2-14 生产者用于向Kafka发送消息,它们并不知道哪个消费者会读取消息,也不知道消费者在什么时候会读取消息
代码清单2-3 SimpleProducer
示例
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.
➥ StringSerializer");
properties.put("value.serializer",
➥ "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
properties.put("partitioner.class",
➥ PurchaseKeyPartitioner.class.getName()); ←--- 生产者属
性配置
PurchaseKey key = new PurchaseKey("12334568", new Date());
try(Producer<PurchaseKey, String> producer =
➥ new KafkaProducer<>(properties)) { ←--- 创建一个KafkaProducer
ProducerRecord<PurchaseKey, String> record =
➥ new ProducerRecord<>("transactions", key, "{\"item\":\"book\",
\"price\":10.99}"); ←--- 实例化ProducerRecord
Callback callback = (metadata, exception) -> {
if (exception != null) {
System.out.println("Encountered exception "
➥ + exception); ←--- 构造一个回调
}
};
Future<RecordMetadata> sendFuture =
➥ producer.send(record, callback); ←--- 发送记录,并将返回的Future赋值给一个变量
}
Kafka生产者是线程安全的。所有消息被异步发送到Kafka——一旦生产者将记录放到内部缓冲区,就立即返回Producer.send
。缓冲区批量发送记录,具体取决于配置,如果在生产者缓冲区满时尝试发送消息,则可能会有阻塞。
这里描述的Producer.send
方法接受一个Callback
实例,一旦领导者代理确认收到记录,生产者就会触发Callback.onComplete
方法,Callback.onComplete
方法中仅有一个参数为非空。在本例中,只关心在发生错误时打印输出异常堆栈信息,因此检验异常
对象是否为空。一旦服务器确认收到记录,返回的Future
就会产生一个RecordMetadata
对象。
定义
在代码清单2-3中,
Producer.send
方法返回一个Future
对象,一个Future
对象代表一个异步操作的结果。更重要的是,Future
可以选择惰性地检索异步结果,而不是等它们完成。更多信息请参考Java文档“Interface Future”(接口Future)。
当创建KafkaProducer
实例时,传递了一个包含生产者配置信息的java.util. Properties
参数。KafkaProducer
的配置并不复杂,但在设置时需要考虑一些关键属性,例如,可以在配置中指定自定义的分区器。这里要介绍的属性太多了,因此我们只看一下代码清单2-3中使用的属性。
bootstrap.servers
是一个用逗号分隔的host:port
值列表。最终,生产者将使用集群中的所有代理。此外,此列表用于初始连接到Kafka集群。key.serializer
和value.serializer
通知Kafka如何将键和值转化为字节数组。在内部,Kafka使用键和值的字节数组,因此在将消息通过网络发送之前需要向Kafka提供正确的序列化器,以将对象转换为字节数组。acks
指定生产者认为在一条记录发送完成之前需要等待的从代理返回的最小确认数。acks
的有效值为all
、0
和1
。当值为all
时,生产者需要等待一个代理接收到所有追随者代理都已提交记录的确认。当值为1时
,代理将记录写入其日志,但不用等待所有的追随者代理来确认提交了记录。当值为0
时,意味着生产者不用等待任何确认——这基本上是“即发即弃”。retries
指定失败后尝试重发的次数。如果记录的顺序很重要,那么应该考虑设置max.in.flight.requests.connection
为1
,以防止失败的记录在重试发送之前第二批记录成功发送的情景。compression.type
配置项用来指定要采用的压缩算法。如果设置了compression.type
,compression.type
会通知生产者在发送数据前对本批次的数据进行压缩。注意,是对整个批次进行压缩,而不是单条记录。partitioner.class
指定实现Partitioner
接口的类的名称。partitioner.class
与我们在2.3.7节中介绍的自定义分区器有关。更多生产者相关的配置信息请参见Kafka官方文档。
当创建一个ProducerRecord
对象时,可以选择指定分区、时间戳或者两者都指定。在代码清单2-3中实例化ProducerRecord
时,使用了4个重载构造方法中的一个。其他构造方法允许设置分区和时间戳,或者只设置分区,代码如下:
ProducerRecord(String topic, Integer partition, String key, String value)
ProducerRecord(String topic, Integer partition,
Long timestamp, String key,
String value)
在2.3.5节中,我们讨论了Kafka分区的重要性。我们也讨论了DefaultPartitioner
的工作原理以及如何提供一个自定义分区器。为什么要显式设置分区?可能有多种业务上的原因,下面是其中一个例子。
假设传入的记录都有键,但是记录被分发到哪个分区并不重要,因为消费者有逻辑来处理该键包含的任何数据。此外,键的分布可能不均匀,但你希望确保所有的分区接收到的数据量大致相同,代码清单2-4给出的是一个粗略的实现方案。
代码清单2-4 手动设置分区
AtomicInteger partitionIndex = new AtomicInteger(0); ←--- 创建一个AtomicInteger实例变量
int currentPartition = Math.abs(partitionIndex.getAndIncrement())%
➥ numberPartitions; ←--- 获取当前分区并将其作为参数
ProducerRecord<String, String> record =
➥ new ProducerRecord<>("topic", currentPartition, "key", "value");
上面的代码调用Math.abs
,因此对于Math.abs
求得的整型值,如果该值超出Integer. MAX_VALUE
,也不必关注。
定义
AtomicInteger
属于java.util.concurrent.atomic包,该包包含支持对单个变量进行无锁、线程安全的操作的类。若需要更多信息,请参考Java官方文档关于java.util.concurrent.atomic包的介绍。
Kafka从0.10版本开始在记录中增加了时间戳,在创建ProducerRecord
对象时调用以下重载的构造函数设置了时间戳。
ProducerRecord(String topic, Integer partition,
➥ Long timestamp, K key, V value)
如果没有设置时间戳,那么生产者在将记录发送到Kafka代理之前将会使用系统当前的时钟时间。时间戳也受代理级别的配置项log.message.timestamp.type
的影响,该配置项可以被设置为CreateTime
(默认类型)和LogAppendTime
中的一种。与许多其他代理级别的配置一样,代理级别的配置将作为所有主题的默认值,但是在创建主题时可以为每个主题指定不同的值[16]。如果时间戳类型设置为LogAppendTime
,并且在创建主题时没有覆盖代理级别对时间戳类型的配置,那么当将记录追加到日志时,代理将使用当前的时间覆盖时间戳,否则,使用来自ProducerRecord
的时间戳。
两种时间戳类型该如何选择呢?LogAppendTime
被认为是“处理时间”,而CreateTime
被认为是“事件时间”,选择哪一种类型取决于具体的业务需求。这就要确定你是否需要知道Kafka什么时候处理记录,或者真实的事件发生在什么时候。在后面的章节,将会看到时间戳对于控制Kafka Streams中的数据流所起的重要作用。
[16] 主题级别的配置将会覆盖代理级别的配置。 ——译者注
我们已经知道了生产者的工作原理,现在是时候来看看Kafka的消费者。假设你正在构建一个原型应用程序用于展示ZMart最近的销售统计数据。对于这个示例,将消费先前生产者示例中发送的消息。因为这个原型处于早期阶段,所以此时要做的就是消费消息并将消息打印到控制台。
注意
因为本书所探讨的Kafka Streams的版本要求Kafka的版本为0.10.2或者更高版本,所以我们仅讨论新的消费者,它是在Kafka 0.9版本中发布的。
KafkaConsumer
是用来从Kafka消费消息的客户端。KafkaConsumer
类很容易使用,但是有一些操作事项需要重视。图2-15展示了ZMart的体系架构,突出了消费者在数据流中所起的作用。
图2-15 这些是从Kafka读取消息的消费者,正如生产者不知道消费者一样,消费者从Kafka读取消息时也不知道是谁生产的消息
KafkaProducer
基本上是无状态的,然而KafkaConsumer
需要周期性地提交从Kafka消费的消息的偏移量来管理一些状态。偏移量唯一标识消息,并表示消息在日志中的起始位置。消费者需要周期性地提交它们已接收到的消息的偏移量。
对一个消费者来说,提交一个偏移量有两个含义。
如果创建了一个新消费者实例或者发生了某些故障,并且最后提交的偏移量不可用,那么消费者从何处开始消费取决于具体的配置。
auto.offset.reset="earliest"
——将从最早可用的偏移量开始检索消息,任何尚未被日志管理进程移除的消息都会被检索到。auto.offset.reset="latest"
——将从最新的偏移量开始检索消息,本质上仅从消费者加入集群的时间点开始消费消息。auto.offset.reset="none"
——不指定重置策略,代理将会向消费者抛出异常。从图2-16可以看到选择不同的auto.offset.reset
设置的影响。如果设置为earliest
,那么收到消息的起始偏移量是0
;如果设置为latest
,那么取得消息的起始偏移量为11。
图2-16 将auto.offset.reset
设置为earliest
与latest的
图形对比表示。设置为earliest
,消费者将会得到所有未被删除的消息;设置为latest
意味着消费者需要等待下一条可用消息到达
接下来,我们需要讨论偏移量提交的选项,你可以自动提交也可以手动提交。
默认情况下,消费者使用的是自动提交偏移量,通过enable.auto.commit
属性进行设置。还有一个与enable.auto.commit
配合使用的配置项auto.commit.interval.ms
,用来指定消费者提交偏移量的频率(默认值是5秒)。调整这个频率值要谨慎,如果设置太小,将会增加网络流量;如果设置太大,可能会导致在发生故障或重启时消费者收到大量重复数据。
手动提交偏移量有两种方式——同步和异步。同步提交方式的代码如下:
consumer.commitSync()
consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>)
无参的commitSync()
方法在上一次检索(轮询)成功返回所有的偏移量之前会一直阻塞,此方法适用于所有订阅的主题和分区。另一个方法需要一个Map<TopicPartiton
,OffsetAndMetadata>
类型的参数,它只会提交Map
中指定的偏移量、分区和主题。
异步提交也有与同步提交类似的方法,consumer.commitAsync()
方法是完全异步的,提交后立即返回。其中一个重载方法是无参的,两个consumer.commitAsync
方法都可选择地提供一个OffsetCommitCallback
回调对象,它在提交成功或者失败时被调用。通过提供回调实例可以实现异步处理或者异常处理。使用手工提交的好处是可以直接控制记录何时被视为已处理。
创建一个消费者与创建一个生产者类似,提供一个以java.util.Properties
形式的Java对象的配置,然后返回一个KafkaConsumer
实例。该实例订阅由主题名称列表提供或者由正则表达式指定的主题。通常,会在一个循环中以指定毫秒级的间隔周期性地运行消费者轮询。
轮询的结果是一个ConsumerRecords<K
,V>
对象,ConsumerRecords
实现了Iterable
接口,每次调用next()
方法返回一个包括消息的元数据以及实际的键和值的ConsumerRecord
对象。
在处理完上一次轮询调用返回的所有ConsumerRecord
对象之后,又会返回到循环的顶部,再次轮询指定的同期。实际上,期望消费者以这种轮询方式无限期地运行,除非发生错误或者应用程序需要关闭和重启(这就是提交的偏移量要发挥作用的地方——在重启时,消费者从停止的地方继续消费)。
通常需要多个消费者实例——主题的每个分区都有一个消费者实例。可以让一个消费者从多个分区中读取数据,但是通常的做法是使用一个线程数与分区数相等的线程池,每个线程运行一个消费者,每个消费者被分配到一个分区。
这种每个分区一个消费者的模式最大限度地提高了吞吐量,但如果将消费者分散在多个应用程序或者服务器上时,那么所有实例的线程总数不要超过主题的分区总数。任何超过分区总数的线程都将是空闲的。如果一个消费者发生故障,领导者代理将会把分配给该故障消费者的分区重新分配给另一个活跃的消费者。
注意
这个例子展示了一个消费者订阅一个主题的情况,但是这种情况仅是为了阐述的目的。大家可以让一个消费者订阅任意数量的主题。
领导者代理将主题的分区分配给具有相同group.id
的所有可用的消费者,group.id
是一个配置项,用来标示消费者属于哪一个消费者组——这样一来,消费者就不需要位于同一台机器上。事实上,最好让消费者分散在几台机器上。这样,当一台服务器发生故障时,领导者代理可以将主题分区重新分配给一台正常运行的机器上的消费者。
在2.5.5节中描述的向消费者添加和移除主题分区(topic-partition)分配的过程被称为再平衡。分配给消费者的主题分区不是静态的,而是动态变化的。当添加一些具有相同消费者组ID的消费者时,将会从活跃的消费者中获取一些当前的主题分区,并将它们分配给新的消费者。这个重新分配的过程持续进行,直到将每个分区都分配给一个正在读取数据的消费者。
在达到这个平衡点之后[17],任何额外的消费者都将处于空闲状态。当消费者不管由于什么原因离开消费者组时,分配给它们的主题分区被重新分配给其他消费者。
在2.5.5节中,我们描述了使用线程池及多个消费者(在同一个消费者组)订阅同一个主题。尽管Kafka会平衡所有消费者的主题分区负载,但是主题和分区的分配并不是确定性的,你并不知道每个消费者将收到哪个主题分区对。
KafkaConsumer
有一个允许订阅特定主题和分区的方法,代码如下:
TopicPartition fooTopicPartition_0 = new TopicPartition("foo", 0);
TopicPartition barTopicPartition_0 = new TopicPartition("bar", 0);
consumer.assign(Arrays.asList(fooTopicPartition_0, barTopicPartition_0));
在手动进行主题分区分配时,需要权衡以下两点。
consumer.assign
方法。代码清单2-5给出的是ZMart原型消费者的代码,该消费者消费交易数据并打印到控制台。完整代码可以在源代码src/main/java/bbejeck.chapter_2/consumer/ThreadedConsumerExample.java类中找到。
代码清单2-5 ThreadedConsumerExample
示例
public void startConsuming() {
executorService = Executors.newFixedThreadPool(numberPartitions);
Properties properties = getConsumerProps();
for (int i = 0; i < numberPartitions; i++) {
Runnable consumerThread = getConsumerThread(properties); ←--- 创建一个消费者线程
executorService.submit(consumerThread);
}
}
private Runnable getConsumerThread(Properties properties) {
return () -> {
Consumer<String, String> consumer = null;
try {
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList( ←--- 订阅主题
➥ "test-topic"));
while (!doneConsuming) {
ConsumerRecords<String, String> records = ←--- 5秒钟轮询一次
➥ consumer.poll(5000);
for (ConsumerRecord<String, String> record : records) {
String message = String.format("Consumed: key =
➥ %s value = %s with offset = %d partition = %d",
record.key(), record.value(),
record.offset(), record.partition());
System.out.println(message); ←--- 打印格式化的消息
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (consumer != null) {
consumer.close(); ←--- 关闭消费者,否则会导致资源泄露
}
}
};
}
这个例子省略了类的其他代码——它不会独立存在。可以在本章的源代码中找到完整的示例。
[17] 这个平衡点是指同一个消费组下的消费者已将主题分区分配完毕。——译者注
当我写本书时,Kafka的最新版本是1.0.0。因为Kafka是一个Scala项目,所以每次发布有两个版本:一个用于Scala 2.11;另一个用于Scala 2.12。本书使用Scala 2.12版本的Kafka。尽管大家可以下载发行版,本书源代码中也包括Kafka的二进制发行版,它将与本书阐述和描述的Kafka Streams一起工作。要安装Kafka,从本书repo管理的源代码中提取.tgz文件,放到自己机器上的某个目录中。
注意
Kafka的二进制版本包括Apache ZooKeeper,因此不需要额外的安装工作。
如果接受Kafka的默认配置,那么本地运行Kafka需要配置的地方就很少。默认情况下,Kafka使用9092端口,ZooKeeper使用2181端口。假设本地没有应用程序使用这些端口,那么一切就绪了。
Kafka将日志写入/tmp/kafka-logs目录下,ZooKeeper使用/tmp/zookeeper目录存储日志。根据自身服务器情况,可能需要更改这些目录的权限或所有权,抑或是需要修改写日志的位置。
为了修改Kafka日志目录,cd
命令进入Kafka安装路径的config目录,打开server. properties文件,找到log.dirs
配置项,修改该配置项的值为任何你想使用的路径。在同一个目录下,打开zookeeper.properties文件,可以修改dataDir
配置项的值。
稍后我们将会在本书中详细介绍Kafka的配置,但现在所需要做的配置仅此而已。需要注意的是,这些说的“日志”是Kafka和ZooKeeper的真实数据,并不是用于跟踪应用行为的应用层面的日志。应用日志位于Kafka安装目录的logs目录下。
Kafka启动很简单,由于ZooKeeper对于Kafka集群正确运行(ZooKeeper决定领导者代理、保存主题信息、对集群中各成员执行健康检查等)是必不可少的,因此在启动Kafka之前需要先启动ZooKeeper。
注意
从现在开始,所有对目录的引用均假设当前工作在Kafka安装目录下。如果使用的是Windows机器,目录是Kafka安装目录下的/bin/windows。
要启动ZooKeeper,打开命令提示符,输入以下命令:
bin/zookeeper-server-start.sh config/zookeeper.properties
该命令执行后,在屏幕上会看到很多信息,但结尾会看到与图2-17所示类似的信息。
图2-17 当ZooKeeper启动时,在控制台可以看到的输出信息
打开另一个命令提示符,输入以下命令,启动Kafka:
bin/Kafka-server-start.sh config/server.properties
同样,会在屏幕上看到滚动的文本。当Kafka完全启动时,会看到与图2-18所示类似的信息。
图2-18 Kafka启动时的输出信息
提示
ZooKeeper对Kafka运行必不可少,因此在关闭时要调换顺序:先关闭Kafka,再关闭ZooKeeper。要关闭Kafka,可以在Kafka运行终端按下Ctrl+C,或在另一个终端执行
kafka-server-stop.sh
脚本。除了关闭脚本是zookeeper-server-stop.sh
,关闭ZooKeeper的操作与关闭Kafka的操作相同。
既然Kafka已启动并开始运行了,现在是时候使用Kafka来发送消息和接收消息了。但是,在发送消息前,需要先为生产者定义一个发送消息的主题。
在Kafka中创建一个主题很简单,仅需要运行一个带有一些配置参数的脚本。配置很简单,但是这些配置的设置有广泛的性能影响。
默认情况下,Kafka被配置为自动创建主题,这意味着如果尝试向一个不存在的主题发送或读取消息,那么Kafka代理就会创建一个主题(使用server.properties文件中的默认配置)。即使在开发中,依靠代理创建主题也不是一个好的做法,因为第一次尝试生产或消费会失败,这是由于需要时间来传播关于主题存在的元数据信息。需要确保总是主动地创建主题。
要创建主题,需要运行kafka-topics.sh脚本。打开一个终端窗口,运行以下命令:
bin/kafka-topics.sh --create --topic first-topic --replication-factor 1
➥ --partitions 1 --zookeeper localhost:2181
当脚本执行后,在终端控制台应该会看到类似如图2-19所示的信息。
图2-19 这是创建主题的结果,事先创建主题很重要,可以提供特定主题的配置。否则,自动创建主题将使用默认配置或者server.properties文件中的配置
前面命令中的大多数配置标记的含义都显而易见,但还是让我们快速了解一下其中的两个配置。
replication-factor
——此标记确定领导者代理在集群中分发消息的副本数。在这种情况下,如果副本因子为1
,那么就不会复制,Kafka中保存的仅是原始消息。副本因子为1
对于快速演示或者原型是可以的,但在实践中,几乎总是希望副本因子为2
或3,
以便在服务器发生故障时保证数据可用性。partitions
——此标记用于指定主题将用到的分区数。同样,这里只有一个分区是可以的,但是如果想要更高的负载,当然就需要更多的分区。确定合适的分区数不是一门精确的科学[18]。在Kafka中发送消息通常需要编写一个生产者客户端,但Kafka也自带了一个名为kafka- console-producer
的方便脚本,允许从终端窗口发送消息。在这个例子中我们将使用控制台生产者,但是在2.4.1节中,我们已经介绍了如何使用KafkaProducer
。
运行以下命令(图2-20中展示的也是)发送第一条消息:
# 假设在bin目录下运行该命令
./kafka-console-producer.sh --topic first-topic --broker-list localhost:9092
配置控制台生产者有几个选项,但这里我们仅使用必需的配置:消息送达的主题以及连接到Kafka的一个Kafka代理列表(对于本例,只是本地一台机器)。
启动控制台生产者是一个“阻塞脚本”,因此在执行前面的命令之后,输入一些文本并按回车键。可以发送你想要发送的任何数量的消息。但本例为了演示,可以输入一条消息“the quick brown fox jumped over the lazy dog.”,并按回车键,然后按Ctrl+C让生产者退出。
图2-20 控制台生产者是用来快速测试配置和确保端到端功能的一个很好工具
Kafka也提供了一个控制台消费者用来从命令行读取消息。控制台消费者类似于控制台生产者:一旦启动,将持续从主题中读取消息直到脚本被终止(通过Ctrl+C)。
运行以下命令,启动控制台消费者:
bin/kafka-console-consumer.sh --topic first-topic
➥ --bootstrap-server localhost:9092 --from-beginning
在启动控制台消费者之后,在终端控制台可以看到与图2-21所示类似的信息。
图2-21 控制台消费者是一个方便的工具,可以快速地感知数据
是否正在流动以及消息是否包含预期的信息
--from-beginning
参数指定将会收到来自那个主题的任何未被删除的消息。控制台消费者还没有提交偏移量,因此若没有设置--from-beginning
,那么只会收到控制台消费者启动之后所发送的消息。
我们已完成了Kafka的旋风之旅,并生产和消费了第一条消息。如果你还没有阅读本章第一部分,现在是时候回到本章起始处去学习Kafka工作原理的细节。
[18] 我们并不能给出一个确切的分区数,这要根据实际应用场景。 ——译者注
下一章,我们将以零售业中的一个具体的例子开始讨论Kafka Streams。尽管Kafka Streams将处理所有生产者和消费者实例的创建,但你能够看到我们在这里介绍的概念所发挥的作用。