spark on yarn模式安装和配置carbondata
本节将介绍如何在 Spark on YARN 模式的集群上安装和配置 CarbonData。carbondata1.5.1的编译可以看上一篇版本:spark2.3.1,carbondata1.5.1前置条件Hadoop HDFS 和 Yarn 需要安装和运行。Spark 需要在所有的集群节点上安装并且运行。CarbonData 用户需要有权限访问 HDFS.以下步骤仅针对于 ...
本节将介绍如何在 Spark on YARN 模式的集群上安装和配置 CarbonData。carbondata1.5.1的编译可以看上一篇
版本:spark2.3.1,carbondata1.5.1
前置条件
- Hadoop HDFS 和 Yarn 需要安装和运行。
- Spark 需要在所有的集群节点上安装并且运行。
- CarbonData 用户需要有权限访问 HDFS.
以下步骤仅针对于 Driver 程序所在的节点. (Driver 节点就是启动 SparkContext 的节点)
-
编译carbondata工程,并且从
./assembly/target/scala-2.1x/carbondata_xxx.jar
路径获取 assembly jar。最后将这个 jar 复制到$SPARK_HOME/carbonlib
文件夹。注意: 如果
$SPARK_HOME
路径下不存在 carbonlib 文件夹,请事先创建它。 -
从 CarbonData repository 复制
./conf/carbon.properties.template
文件到$SPARK_HOME/conf/
文件夹下面,并将它重命名为carbon.properties
。 -
压缩 carbonlib 文件夹的内容到
tar.gz
文件中,并将这个压缩文件移到 carbonlib 文件夹下面。
cd $SPARK_HOME
tar -zcvf carbondata.tar.gz carbonlib/
mv carbondata.tar.gz carbonlib/
4.在 $SPARK_HOME/conf/spark-defaults.conf
文件中配置下表提到的属性。
spark.master yarn-client
spark.yarn.dist.files /home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/conf/carbon.properties
spark.yarn.dist.archives /home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/carbonlib/carbondata.tar.gz
spark.executor.extraJavaOptions -Dcarbon.properties.filepath=carbon.properties -XX:+OmitStackTraceInFastThrow -XX:+UseGCOverheadLimit
spark.executor.extraClassPath carbondata.tar.gz/carbonlib/*
spark.driver.extraClassPath /home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/carbonlib/*
spark.driver.extraJavaOptions -Dcarbon.properties.filepath=/home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/conf/carbon.properties -Dhdp.version=current
spark.yarn.executor.memoryOverhead 1024
spark.yarn.driver.memoryOverhead 1024
spark.yarn.am.extraJavaOptions -Dhdp.version=current
spark.yarn.scheduler.heartbeat.interval-ms 120000
spark.executor.heartbeatInterval 120000
spark.network.timeout 720000
5.将下面的配置添加到 $SPARK_HOME/conf/carbon.properties
文件中:
carbon.storelocation=hdfs://master:9000/Carbon/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://master:9000/Carbon/data
#Path where the bad records are stored
carbon.badRecords.location=hdfs://master:9000/Carbon/badrecords
6,把hive-site.xml放到spark的conf下面,(这个一定要放)
7.测试
spark-shell --master yarn-client --driver-memory 1g
--executor-cores 2 --executor-memory 2G
下面给一个完整的代码demo:
package carbondata
import java.io.File
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
object carbondataSpark {
def main(args: Array[String]): Unit = {
/* //hive store location
val warehouse = new File("hdfs://master:9000/jason/carbondata_warehouse").getCanonicalPath
//metastore location
*/
//val metastore = new File("hdfs://master:9000/hive").getCanonicalPath
val storeLocation = new File("hdfs://master:9000/jason/carbondata").getCanonicalPath
val spark = SparkSession
.builder()
.appName("carbondata streaming")
//.config("spark.driver.host","master")
.getOrCreateCarbonSession("hdfs://master:9000/jason/carbondata")
spark.sql(s"DROP TABLE IF EXISTS carbon_table")
spark.sql(
s"""
| CREATE TABLE carbon_table (
| col1 STRING,
| col2 STRING
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('streaming'='true')""".stripMargin)
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
val tablePath = carbonTable.getTablePath
// batch load
var qry: StreamingQuery = null
val readSocketDF = spark.readStream
.format("socket")
.option("host", "192.168.17.142")
.option("port", 9999)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("1 seconds"))
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("dbName", "default")
.option("tableName", "carbon_table")
.start()
// start new thread to show data
new Thread() {
override def run(): Unit = {
do {
spark.sql("select * from carbon_table").show(false)
Thread.sleep(10000)
} while (true)
}
}.start()
qry.awaitTermination()
}
}
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢
更多推荐
所有评论(0)