scala> val lines = sc.parallelize(Array("home Tom","hadoop Jack","look Kim")) lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> lines.foreach(println) home Tom hadoop Jack look Kim
//注意对RDD本身的操作不影响其本身,因为是val定义的常量 scala> lines.flatMap(t=>t.split(" ")) res20: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:27
scala> lines.foreach(println) home Tom hadoop Jack look Kim
//必须使用新的常量来接收 scala> val newrdd = lines.flatMap(t=>t.split(" ")) newrdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at flatMap at <console>:26
scala> newrdd.foreach(println) home Tom hadoop Jack look Kim
scala> val rdd1 = sc.parallelize(Array("one","two","three")) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(Array("two","three","three")) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> rdd2.distinct().foreach(println) two three
scala> rdd2.union(rdd1).foreach(println) two three three one two three
scala> rdd2.intersection(rdd1).foreach(println) two three
scala> rdd2.subtract(rdd1).foreach(println)
scala> rdd1.subtract(rdd2).foreach(println) one
RDDs的基本操作之Action
在RDD上计算出来的一个结果
并把结果返回给driver program,save等等
reduce()
接收一个函数,作用在RDD两个类型相同的元素上,返回新元素
可以实现RDD中元素的累加、计数、和其他类型的聚集操作。
Collect()
遍历整个RDD,想driver program返回RDD内容
需要单机内存能够容纳下(因为需要拷贝给driver)
大数据处理要使用savaAsText方法
1 2 3 4 5 6 7 8
scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val lines = sc.textFile("/home/hadoop/look.sh")//注意这是错的,这样默认是取hdfs文件 scala> val lines = sc.textFile("file:///home/hadoop/look.sh")//用file://来指明取的系统文件 lines: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/look.sh MapPartitionsRDD[5] at textFile at <console>:24
scala> lines.foreach(println) #!/bin/bash Jarinfo=$(ps -ef|grep java) echo "$Jarinfo" | while read Line do #echo $Line; #echo ${Line##*:} Jarstr=${Line##*:} Ishere=$(echo $Jarstr | grep $1 ) if [[ "$Ishere" != "" ]] then echo YES1> exit 1 fi done
scala> val pairs = lines.map(line=>(line.split(" ")(0),line)) pairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[6] at map at <console>:26
scala> var rdd = sc.parallelize(Array((1,2),(3,4),(3,6))) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> var rdd = sc.parallelize(Array(("Tom",82),("Tom",78),("Mike",76),("Mike",74))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
//可以求每个人的成绩之和+课程数目(即一次计算多个指标) scala> val rst = rdd.combineByKey(x=>(1,x),(item:(Int,Int),new)=>(item._1+1,item._2+new),(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,a._2+b._2))
「 相关文章 」
基本概括概述spark快速 扩充了mapreduce 基于内存计算(中间结果的存储位置) spark通用 批处理hadoop 迭代计算 机器学习系统 交互式查询 hive 流处理 sto...