当前位置: 首页 > news >正文

网站建设毕业论文百度文库重庆seo外包平台

网站建设毕业论文百度文库,重庆seo外包平台,题库小程序源码,新余网站设计在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 select word, count(1) as count from source group by word; ------------------------------------------------------ |…

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()

需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
这一步只能创建表和连接器,具体执行还要执行下一步。

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

完整代码如下

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')# define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()# execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",
http://www.yidumall.com/news/39377.html

相关文章:

  • 做特卖的网站爱库存金蝶进销存免费版
  • 为什么做图书管理网站海外网站seo优化
  • 做微信公众号海报的网站seo在线培训机构
  • 白之家低成本做网站网络营销seo是什么意思
  • 哪些网站是php做的如何看待百度竞价排名
  • 章丘市建设局网站站长之家权重
  • 免费logo设计网站推荐广告收益平台
  • 网站布局结构有哪些网络营销策略内容
  • java做网站和asp做网站网站友情链接有什么用
  • html5电影网站建设找推网
  • 码云可以做博客网站吗搜索引擎优化seo公司
  • 模板网站建设公司哪个好电子商务seo实训总结
  • 昆明网站建设是什么百度怎么投放自己的广告
  • 西城网站建设seo人员的相关薪资
  • 做网站建设一年能赚多少在线代理浏览网址
  • 贵州省住房和城乡建设厅门户网站免费建站系统哪个好用吗
  • 网站开发和c语言一个完整的策划案范文
  • 衢州网站建设批发销售清单软件永久免费版
  • 做外围网站代理违法吗制造企业网站建设
  • 查房价的官方网站教师遭网课入侵直播录屏曝光广场舞
  • 做两个一摸一样的网站宁波网站推广
  • 网络会议系统有哪些苏州seo排名优化课程
  • 福建省建设干部培训中心网站深圳头条新闻
  • 网友天津seo推广
  • 为什么做网站必须用服务器网站建设技术外包
  • 做网站管理员需要哪些知识网络广告投放渠道有哪些
  • 网站一般做几个关键词在线视频观看免费视频22
  • 网站建设文化渠道自助建站系统开发
  • 网站建设专家工作内容福州seo顾问
  • 专业做企业网站头条搜索站长平台