主页 > 苹果商城可以直接下载imtoken吗 > PyFlink+区块链?揭秘行业龙头BTC.com如何实现实时计算

PyFlink+区块链?揭秘行业龙头BTC.com如何实现实时计算

PyFlink+区块链?揭秘行业龙头BTC.com如何实现实时计算

高正彦@BTC.com Flink 中文社区

大家好,我们是BTC.com团队。 2020年,我们有幸接触到了Flink和PyFlink生态。 我们从团队自身需求出发,完善团队内部实时计算的任务和需求,构建流批一体的计算环境。

在实现实时计算的过程中,我们在实践中获得了一些经验,在此分享一些这方面的心路历程。 主要分享提纲如下:

01 困惑•描述•思考•行动

作为工程师,我们每天都在不断地了解需求和发展业务。

有一天,我们被拉进了一个团队总结会,收到了如下要求:

销售总监A:

我们想知道销售历史、实时转化率和销售量。 能不能统计一下实时的TOP5商品,以及大促期间用户的实时访问量和商品的TOP5实时浏览量? 我们可以根据他的历史访问记录实时推荐相关吗?

营销总监乙:

我们想知道营销的效果和每个活动的实时数据,否则我们无法准确评估我们营销的效果并及时反馈。

研发总监C:

部分用户的BUG无法复现,日志能不能更实时一些? 传统的日志分析需要一定的排序。 你能直接清洗/处理相关数据吗?

采购总监丁:

这些年数字化流行吗? 采购端要预测采购需求,对支出进行实时分类管理,预测未来供应来源,改善成本。 有没有办法做到这一点? 也有一些供应商不稳定。 你能监视他们的情况吗?

运维总监E:

有时候网站访问比较慢,没地方看实时机器情况。 什么样的显示器可以设置大屏幕? 有办法解决这个问题吗?

部门领导F:

能否实现以上人群的需求?

做了上面的了解之后,我发现,就大家对数据的渴望而言btc区块链浏览器,用户需要的不仅仅是历史数据,还需要实时数据。

在电商、金融、制造等行业,数据增长迅猛,很多企业都面临着新的挑战。 数据分析的实时处理框架,比如做一些实时数据分析报表,实时数据处理和计算等。

与大多数企业类似,在此之前,我们在实时计算方面没有任何经验和积累。 这时候我就开始迷茫了,如何更好的满足上述需求,在成本和效果之间取得平衡,相关架构又该如何设计?

btc区块链浏览器_sitejianshu.com 区块链公有链和基础链_siteblockvalue.com btc区块链查

如果你很穷,你就想改变。 有了迷茫之后,我们就开始准备梳理现有的条件和需要的东西。

我们的业务范围主要是区块链浏览器和数据服务、区块链矿池、多币种钱包等。在区块链浏览器业务方面,BTC.com是目前全球领先的区块链数据服务平台,矿池业务在全球排名第一。行业,区块链浏览器也是世界排名前三的浏览器之一。

首先,我们使用解析器对区块链上的数据进行解析,获取各种数据信息,可以分析出各个币种的地址活跃度、地址交易状态、交易流水、参与度等。 目前BTC.com区块链浏览器与业内各大矿池、交易所等公司都有相关合作,可以更好的实现部分数据的统计、排序、归纳和输出。

对于用户来说,不仅有专业的区块链开发者,还有各种各样的B端和C端用户。 C端用户可以标记区块链地址、运行智能合约、查看智能合约相关内容等,以及链上数据的检索和查看。 B端用户有更专业的支持和指导,提供API、区块链节点等一些定制,以及交易加速、链上业务合作、数据定制等。

在数据量方面,截至目前,比特币约有5亿笔交易,超过3000万个地址,22亿个输出(output:每笔交易的输出),并且还在不断增长。 就以太坊而言,更多。 BTC.com的矿池和区块链浏览器都支持多种币种,每种币种的总数据量在几十TB左右。

矿池是矿工购买矿机后连接的服务平台。 矿工可以通过接入矿池获得更稳定的收益。 这是一项需要保证7*24小时稳定的服务。 其中,矿机不断向矿池提交自己计算下发的任务的解,矿池将广播达到网络难度的解。 这个过程也可以被认为是几乎实时的。 矿机提交到服务器,再提交到服务器内部的Kafka消息队列。 同时,一些组件监听这些消息进行消费。 这些提交的方案可以用来分析矿机的工作状态、算力、连接状态等。

在业务中,我们需要计算历史数据和实时数据。

历史数据需要关联一些货币价格和历史交易信息,并且需要一直保存这些交易信息,这是一个典型的批处理任务。

每当确认一个新块时,就可以处理和分析一些数据。 比如某个地址在这个区块中有一笔交易,那么你可以从交易流向中分析出这是一笔什么样的交易,挖掘出交易相关的信息。 性别。 或者这个区块里面有一些特殊的交易,比如segwit交易,比如闪电网络交易,也就是这个币有一些特有的东西可以分析统计。 并且确认新块时的难度预测也会发生变化。

还有大额交易的监控。 通过新区块的确认和未确认交易,锁定一些大额交易,结合地址的一些标签,锁定交易流向,更好的进行数据分析。

一些区块链也有 OLAP 要求。

btc区块链浏览器_sitejianshu.com 区块链公有链和基础链_siteblockvalue.com btc区块链查

在总结了数据统计中的需求和问题之后,我们开始思考:什么样的架构才是最合适的,如何让人员参与少、成本低?

解决问题无非就是提出假设,通过测量,刷新认知。

siteblockvalue.com btc区块链查_btc区块链浏览器_sitejianshu.com 区块链公有链和基础链

浏览了一些资料后,我们认为大多数计算框架都是传递输入,处理它,然后得到输出。 首先我们要获取数据,这里的数据可以是MySQL或者Kafka,然后计算,这里的计算可以是聚合,也可以是TOP 5类型,实时的,可能有窗口类型。 计算完成后将结果发送到消息通道和存储,发送到微信或钉钉,再发送到MySQL。

团队一开始尝试了Spark,搭建了Yarn,使用Airflow作为调度框架。 团队通过集成导入MySQL开发了一些批处理任务,具有离线任务、数据固定、体量大、计算周期长等特点。 一些复杂的操作。

在一些批处理任务中,这种架构是稳定的,但是随着业务的发展,实时性要求越来越高,实时数据不能保证按顺序到达,按时间戳排序,消息时间字段是允许的前后有差距。 在数据模型方面,需求驱动开发的成本比较高,而且当时Spark的方式比较高,对状态的处理不是很好,影响了一部分效率。

其实2019年的时候在调研一些实时计算的事情,关注了Flink框架。 那时候还是基于Java的。 整体框架在概念上与 Spark 不同。 本来以为批处理是一个特殊的流,但是因为团队没有Java的基因和沉淀,使用Flink作为实时计算架构在当时算是暂时告一段落了。

2020年初,无论是Flink社区,InfoQ,还是B站,都在宣传PyFlink,而那个时候,尤其是程鹤群[1]和孙金城[2]的视频,还有孙金城先生的博客[3] 深刻。 于是就想试试PyFlink,它的优势是流批一体,而且还支持Python、pandas,甚至未来Tensorflow、Keras的一些功能,对我们很有吸引力。 之后就是在PyFlink上构思我们的流批一体化架构。

02 流批一体化架构

建筑学

首先我们要对数据进行梳理,知道数据是从哪里来的。 在Spark主导的时期,数据周期性地(增量地)从数据源加载,通过一定的转换逻辑,然后写入目的地。 由于数据量和业务需求,延迟通常在小时级别,而在实时中,要求尽可能短的延迟,因此将数据源进行分类,整体分为几个部分。 一部分是我们存储在 MySQL 中用于持久化的传统数据。 这部分可以直接作为批量计算,也可以导入到Hive中,做进一步的计算。 对于实时部分,其实有很多想法。 一种方式是通过MySQL的Binlog来分析,另一种是通过MySQL的CDC功能。 经过多方考虑,我们最终选择了Kafka,不仅因为它是一个优秀的分布式Streaming平台,而且团队在上面也有技术沉淀。

而且其实在本地开发的时候安装Kafka更方便。 只需要brew install Kafka,就可以通过Conduktor客户端方便的看到各个topic的状态。 因此,修改现有的Parser以支持Kafka。 当收到新的块时,它会立即发送消息给 Kafka 进行处理。

2018年前后,团队将整体业务迁移到了Kubernetes。 在业务不断发展的过程中,减轻了很多开发和运维的负担。 因此,建议有一定的业务规模。 最好是迁移到 Kubernetes。 它的成本优化、DevOps 和高可用性支持是其他平台和传统方法无法比拟的。

在开发作业的过程中,我们尽量使用Flink SQL,结合一些Java和Python的UDF、UDAF、UDTF。 每个作业通过初始化类似于以下的语句来形成特定的模式:


self.source_ddl = '''
    CREATE TABLE SourceTable (xxx int) WITH 
'''
self.sink_ddl = '''
    CREATE TABLE SinkTable (xxx int) WITH 
'''
self.transform_ddl = '''
    INSERT INTO SinkTable
    SELECT udf(xxx)
    FROM SourceTable
    GROUP BY FROM_UNIXTIME(`timestamp`, 'yyyyMMdd')
'''

未来将对数据进行针对性分层,按照业界常见的ODS、DWD、DWS、ADS,将原始层、明细层、汇总层分离,进一步完善数据治理。

影响

最终,我们团队基于PyFlink的开发,快速完成了既有任务。 有的是batch job,处理这几天的数据,有的是real-time job,根据kafka的消息消费。 目前比较稳定。

部署时选择了Kubernetes,下面分享。 在K8S中部署Jobmanager和Taskmanager,并利用Kubernetes的job功能作为批量作业的部署,考虑对接一些监控平台,比如Prometheus。

在成本上,由于使用了Kubernetes集群,所以只有在机器上扩展宿主机的成本。 这样一来,成本比传统的Yarn部署方式要低,而且后期Kubernetes会支持原生部署,扩展Jobmanager和Taskmanager会更方便。

03 Zeppelin、PyFlink on K8S等实践

Zeppelin 就是我们用来进行数据探索和逻辑验证的。 有些数据在本地并不是真正的数据。 使用 Zeppelin 连接到实际的链上数据,进行计算的逻辑验证。 验证完成后,可以转换成生产需要的代码进行部署。

siteblockvalue.com btc区块链查_sitejianshu.com 区块链公有链和基础链_btc区块链浏览器

1. 在 Kubernetes 上构建 PyFlink 和 Zeppelin。 整理后的Deployment Demo在github上。 可以参考相关链接[4]。 关于配置文件,修改如下配置的效果。

(1). 在configmap的flink-conf.yaml文件中修改taskmanager配置。


taskmanager.numberOfTaskSlots: 10

调整Taskmanager可以调整运行的作业数量。

(2). 修改 Zeppelin 的 dockerfile 中的 zeppelin-site.xml 文件。


cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml; \
sed -i 's#127.0.0.1#0.0.0.0#g' conf/zeppelin-site.xml; \
sed -i 's#auto#local#g' conf/zeppelin-site.xml

3. 将websocket 配置添加到Zeppelin 的ingress。


nginx.ingress.kubernetes.io/configuration-snippet: |
    proxy_set_header Upgrade "websocket";
    proxy_set_header Connection "Upgrade";

Zeppelin需要在浏览器中与服务器建立socket连接,需要在ingress中添加websocket配置。

4、Flink和Zeppelin数据持久化的作用。


volumeMounts:
- mountPath: /zeppelin/notebook/
  name: data
volumes:
- name: data
  persistentVolumeClaim:
    claimName: zeppelin-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zeppelin-pvc
spec:
  storageClassName: efs-sc
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi

5、Flink命令提交作业的方式。

(1). 本地安装PyFlink,需要Python 3.5及以上版本。


$ pip3 install apache-flink==1.11.1

(2). 测试演示


def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(
        env,
        environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
    )
    sink_ddl = """
        create table Results (word VARCHAR, `count` BIGINT) with ( 'connector' = 'print')
        """
    t_env.sql_update(sink_ddl)
    elements = [(word, 1) for word in content.split(" ")]
    # 这里也可以通过 Flink SQL
    t_env.from_elements(elements, ["word", "count"]) \
        .group_by("word") \
        .select("word, count(1) as count") \
        .insert_into("Results")
    t_env.execute("word_count")
if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    word_count()

或实时处理的演示:


def handle_kafka_message():
    s_env = StreamExecutionEnvironment.get_execution_environment()
    # s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())
    source_ddl = '''
      CREATE TABLE SourceTable (
        word string
      ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'Topic',
        'connector.properties.bootstrap.servers' = 'localhost:9092',
        'connector.properties.zookeeper.connect' = 'localhost:2121',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
      )
    '''
    sink_ddl = """
        create table Results (word VARCHAR) with ('connector' = 'print')
        """
    st_env.sql_update(sink_ddl)
    st_env.sql_update(source_ddl)
    st_env.from_path("source").insert_into("sink")
    st_env.execute("KafkaTest")
if __name__ == '__main__':
    handle_kafka_message()

(3). 本地测试 Flink 命令提交作业。


$ flink run -m localhost:8081 -py word_count.py
python/table/batch/word_count.py
Job has been submitted with JobID 0a31b61c2f974bcc3f344f57829fc5d5
Program execution finished
Job with JobID 0a31b61c2f974bcc3f344f57829fc5d5 has finished.
Job Runtime: 741 ms

(4). 如果有多个Python文件,可以先压缩再提交作业。


$ zip -r flinkdemo.zip ./*
$ flink run -m localhost:8081 -pyfs flinkdemo.zip -pym main

(5). Kubernetes通过集群的CronJob定时调度提交作业,然后会做一些自研的UI后台界面,用于作业的管理和监控。

04 区块链领域实践

随着区块链技术日趋成熟,应用越来越多,行业标准化、规范化的趋势也开始显现,对云计算和大数据的依赖程度越来越高。 毕竟是数字经济的产物。 BTC.com也植根于区块链技术基础设施,为各个公司的各种应用提供数据和业务支持。

近几年,IT界流行一个词,中台,不管是大公司还是创业公司,都喜欢涉足这个概念,自称是自己的中台自己的业务、数据中台等。在我们的理解中,中台是一种整合各种资源的能力btc区块链浏览器,从传统的单兵作战,到完善武器装备的后勤保障,提升作战能力。 打破数据上的数据孤岛,在瞬息万变的前台和日趋稳定的后台之间取得平衡。 中台更重要的是服务,最终要回馈给客户和合作伙伴。

在区块链领域,BTC.com拥有深厚的行业技术积累,可以提供各种基于数据的能力。 比如用机器学习估计链上的数据,估计eth的gas价格,以及最好的手续费等,用keras的深度学习能力做一些回归计算,然后Flink,机器学习和结合区块链,为预测类和标准化分类提供更多的数据样本。 之前,模型是通过定时任务不断训练的。 与Flink结合后,会更加实时。 对此,未来会提供更多的话题,比如币价与Defi、舆论、市场等的关系,以及区块链地址和交易的标注和分类。 甚至机器学习训练出来的模型,都放在IPFS网络中,通过去中心化的代币进行训练,提供了方便调用样本和模型的能力。

目前,BTC.com已经通过数据挖掘推出了一些能力,包括交易推送、OLAP链分析报告等,以改善和提升相关行业和开发者的实际体验。 我们在各个链上都有监控节点来监控每个区块链网络的可用性和分散性,并监控智能合约。 在接入一些联盟链和隐私加密货币时,可以为联盟链和隐私加密货币提供数据能力。

BTC.com将为区块链行业的生态发展做出更多努力。 立足科技企业本质,以技术开发为第一动力,以市场和客户为导向,开发创新集成应用,抓好基础设施建设。

05 展望与总结

从实时计算的趋势到流批一体化的架构,通过对PyFlink和Flink的学习,各种作业任务已经稳定在线上运行,满足实际业务需求。 并搭建了Zeppelin平台,让业务开发更加便捷。 计算上尽量依赖SQL,方便各方面的集成和调试。

在社区方面,PyFlink 也没有让我们失望,其快速的响应能力和不断完善的文档。 在 Confluence 上也可以看到一些 Flink Improvement Proposals[5],其中有一些与 PyFlink 有关。 在不久的将来,它还将支持 Pandas UDAF、DataStream API 和 ML API。 也有望在未来支持Joblistener。 总之,在这里也非常感谢相关的团队。

未来的展望,总结起来就是通过业务实现数据的价值。 数据中心的最终结果是实现数据。

更多关于 Pylink 的详细信息,请参考 PyFlink 社区支持计划。

btc区块链浏览器_sitejianshu.com 区块链公有链和基础链_siteblockvalue.com btc区块链查

参考链接:

[1][2][3][4][5]+改进+建议