先贴代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import datetime
import logging
import os

import pyspark.sql.functions as fun
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext

time_format = "%Y%m%d"

python_path = "/opt/anaconda2/bin/python2"

tag_pv = 'app_search_pv'
tag_click = 'app_search_click'

sql_template = """
                SELECT vender_id,store_id,search_keyword,
                      has_result,event_code,user_id,
                      search_source,action_type,page_no,dt
                    from db1.events
                    where dt>='{}' and dt<'{}'
                    AND (event_code='{}' OR event_code='{}')
                    AND length(vender_id)>0
                    and length (store_id)>0
                    and length (search_keyword)>0
               """

# 集群环境中, 指定python路径.
os.environ["PYSPARK_PYTHON"] = python_path

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__file__)

conf = SparkConf().setAppName('search_rec:stat_events')
sc = SparkContext(conf=conf)

if __name__ == '__main__':
    logger.info("application id:%s", sc._jsc.sc().applicationId())

    now_time = datetime.datetime.now()
    now_str = now_time.strftime(time_format)
    yesterday = (now_time - datetime.timedelta(days=1)).strftime(time_format)

    hive_context = HiveContext(sc)

    df_hive = hive_context.sql(sql_template.format(yesterday, now_str, tag_pv, tag_click))

    df_pv = df_hive.filter(df_hive['event_code'] == tag_pv)
    logger.info("pv count:%s", df_pv.count())
    logger.info("uv count:%s", df_pv.select("user_id").distinct().count())

    logger.info("task1")
    # task1, stat pv by store.
    tmp_pv_by_store = df_pv.groupBy("vender_id", "store_id").agg(fun.count("*").alias("pv"),
                                                                 fun.countDistinct("user_id").alias("uv"))

    logger.info("rdd_pv_by_store size:%s", tmp_pv_by_store.count())

    rdd_pv_by_store = tmp_pv_by_store.filter("pv>=1000").orderBy('pv', ascending=False)

    for s in rdd_pv_by_store.collect():
        logger.info("pv_by_store:%s %s %d %d", s[0], s[1], s[2], s[3])

    logger.info("task2")
    # task2, stat pv by keyword.
    tmp_pv_by_keyword = df_pv.groupBy("vender_id", "search_keyword").agg(fun.count("*").alias("pv"),
                                                                         fun.countDistinct("user_id").alias("uv"))
    logger.info("rdd_pv_by_keyword count:%s", tmp_pv_by_keyword.count())

    rdd_pv_by_keyword = tmp_pv_by_keyword.filter("pv>=100").orderBy('pv', ascending=False)
    for keyword in rdd_pv_by_keyword.collect():
        logger.info("pv_by_kw: %s %s %d %d", keyword[0], keyword[1], keyword[2], keyword[3])

    logger.info("task3")
    # task3, stat pv by has_result.
    tmp_result_status = df_pv.filter("has_result != 1").groupBy("search_keyword", "vender_id").agg(
        fun.count("*").alias("pv"),
        fun.countDistinct("user_id").alias("uv"))
    logger.info("rdd_result_status count:%s", tmp_result_status.count())

    rdd_result_status = tmp_result_status.filter("pv>=30").orderBy('pv', ascending=False)
    for x in rdd_result_status.collect():
        logger.info("no_result:%s %s %d %d", x[0], x[1], x[2], x[3])

运行命令

spark-submit --master yarn-client  --driver-memory 2g --executor-memory 3g test_events.py

总结

1) 此脚本为pyspark测试脚本, 改到这种程度可以 yarn-client和yarn-cluster模式下运行.

2) 后面文章要总结一下conf的配置 和 运行模式的问题.

3) 集群环境中python环境的问题.

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐