当前位置: 首页 > news >正文

企联网登录宁波网站关键词优化排名

企联网登录,宁波网站关键词优化排名,wordpress vul,b2b网站建设步骤目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】 1. RDD队列 a、使用场景:测试 b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理 val sparkco…

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")val ssc = new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象,队列中存放的是RDDval queue = new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] = ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产for(i <- 1 to 5 ){// 循环创建rddval rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000)         }// 等待采集器的结束ssc.awaitTermination()}

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")val ssc = new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] = ssc.textFileStream("in")textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")val ssc = new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket = _def receive = {// 获取输入流val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))// 设定一个间接变量var s: String = nullwhile (true) {// 按行读取数据s = reader.readLine()if (s != null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit = {socket = new Socket(host, port)new Thread("Socket Receiver") {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit = {socket.close()socket = null}}

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))// 获取数据,key是null,value是真实的数据val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()

 

http://www.yidumall.com/news/108479.html

相关文章:

  • 游戏门户网站开发资源谷歌搜索引擎入口2023
  • e4a怎么做网站app什么是优化
  • 河北建设教育培训网站什么软件可以刷网站排名
  • 在哪个网站上做推广作用好网站排名优化化快排优化
  • 为什么很少用python做网站seo点击排名软件营销工具
  • 上海seo优化公司 kinglink汕头seo服务
  • 酒类招商网站大全黄冈网站推广软件费用是多少
  • 单人给一个公司做网站费用朝阳区seo技术
  • 高密建设局网站山东网页定制
  • 自己家里做网站网速慢网络营销策划方案框架
  • html5网站动态效果搜索引擎seo优化平台
  • 一键网页转app生成器搜索引擎优化免费
  • 闽侯福州网站建设缅甸最新新闻
  • 做宠物网站心得长沙官网seo分析
  • 普洱网站建设国内重大新闻
  • 动态网站设计免费发广告的网站大全
  • 建设社区网站有什么借鉴之处怎么做网络营销
  • wordpress 首页 静态网店产品seo如何优化
  • 制作网站需要的技术上首页的seo关键词优化
  • 肃宁做网站网站seo如何做好优化
  • apache添加网站优搜云seo
  • 在一个空间建两个网站怎么根据视频链接找到网址
  • 商城网站建设精英推广公司运营模式
  • 好看的手机网站推荐网站优化seo推广服务
  • 网站引流怎么做的推广策划方案
  • 西宁企业网站建设建立网站有哪些步骤
  • 门户网站做等级保护测评博客推广工具
  • 济南做公司网站需要多少钱东莞网站优化
  • 网站怎么在百度做推广seoul是哪个国家
  • 武汉定制网站建设外贸建站网站推广