spark安装测试
由于本地已经安装好hadoop相关组件,所以本文是在yarn的基础上对spark进行安装及测试
- 确保hdfs及yarn成功启动,hadoop版本为2.7.3
- 安装scala,由于本人安装的spark是2.4.0,对应的scala版本需要是2.11.X。
- 下载spark,本人下载的是spark-2.4.0-bin-hadoop2.7这个版本。
- 解压,软链及修改配置文件
配置文件主要是spark-env.sh
export JAVA_HOME=/home/connect/software/java #Java环境变量export SCALA_HOME=/home/spark/software/scala #SCALA环境变量export SPARK_WORKING_MEMORY=1g #每一个worker节点上可用的最大内存export SPARK_MASTER_IP=XXXXXX #驱动器节点IPexport HADOOP_HOME=/home/spark/software/hadoop #Hadoop路径#export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop #Hadoop配置目录export SPARK_WORKER_CORES=2 #每个节点多少核export SPARK_WORKER_INSTANCES=1
然后修改slaves,将worker节点ip写入其中。 修改bashrc文件。
- 在master上启动start-all.sh 6,然后在浏览器中输入: 有spark管理界面即可。
测试spark是否可以工作
- 测试功能为worldCount。手动制作一个txt文件,输入一些单词,并上传到hdfs中。
- 利用spark-shell的方式进行测试
val file=sc.textFile("hdfs://hdfs:9000/user/spark/logs/words"val count=file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)count.collect()
其后就会在客户端上显示测试结果
自己写scala的jar包上传并测试(worldCount)
- maven项目,引入maven库
org.scala-lang scala-library 2.11.4 org.apache.spark spark-core_2.12 2.4.0 provided org.apache.spark spark-streaming_2.12 2.4.0 provided
- 建一个WordCount
import org.apache.spark.{SparkConf, SparkContext}object WordCount { def main(args: Array[String]): Unit = { if (args.length < 1) { System.err.print("Usage:") System.exit(1) } val conf = new SparkConf().setMaster("spark://10.37.167.204:7077") val sc = new SparkContext(conf) val line = sc.textFile(args(0)) line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println) sc.stop() }}
3.打成jar上传到服务器,然后进入spark/bin目录下执行
spark-submit --master spark://masterIp:7077 --name WordCount --class WordCount --executor-memory 1G --total-executor-cores 2 ../Scala.jar hdfs://hdfs:9000/user/spark/logs/words
这里的结果是输出到客户端,也可以输入到hdfs文件中,代码中自己设置。
saprk Mlib学习函数
- 密集和稀疏向量
一个向量(1.0,0.0,3.0)它有2中表示的方法密集:[1.0,0.0,3.0] 其和一般的数组无异稀疏:(3,[0,2],[1.0,3.0]) 其表示的含义(向量大小,序号,值) 序号从0开始import org.apache.spark.mllib.linalg.Vectorsobject Test { def main(args: Array[String]) { val vd = Vectors.dense(2, 5, 8) println(vd(1)) println(vd) //向量个数,序号,value val vs = Vectors.sparse(4, Array(0, 1, 2, 3), Array(9, 3, 5, 7)) println(vs(0)) //序号访问 println(vs) val vs2 = Vectors.sparse(4, Array(0, 2, 1, 3), Array(9, 3, 5, 7)) println(vs2(2)) println(vs2) }}--- result ---5.0[2.0,5.0,8.0]9.0(4,[0,1,2,3],[9.0,3.0,5.0,7.0])3.0(4,[0,2,1,3],[9.0,3.0,5.0,7.0])
spark算子
- zip
def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常------------val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2) val rdd2 = sc.parallelize(Array(2,3,3,4,5,7),2) rdd1.zip(rdd2).collect().foreach(println)//result (1,2)(2,3)(3,3)(4,4)(5,5)(6,7)
- zipPartitions 以分区为单位进行zip操作,要求分区数目相等。否则异常。
val func = (x:Iterator[Int],y:Iterator[Int])=>{ var result = List[String]() while(x.hasNext&&y.hasNext){ result::=x.next()+"_"+y.next() } result.iterator } rdd1.zipPartitions(rdd2)(func).collect().foreach(println)##result3_32_31_26_75_54_4
下面是一个简单的例子
spark Mllib学习
pca :
Spark MLlib简介
使用SGD算法逻辑回归的垃圾邮件分类器
Spark 部署及示例代码讲解 于Spark的Als算法+自迭代+Spark2.0新写法 矩阵奇异值分解spark源码阅读
1