官方网址:https://docs.streamlit.io/
官方网址:https://discuss.streamlit.io/t/streamlit-components-community-tracker/4634
官方网址:https://github.com/streamlit/streamlit
第三方插件网址:https://github.com/arnaudmiribel/streamlit-extras
第三方插件网址:https://github.com/joy13975/streamlit-nested-layout
第三方插件网址:https://github.com/Schluca/streamlit_tree_select
第三方插件网址:https://github.com/blackary/st_pages
第三方插件网址:https://github.com/andfanilo/streamlit-echarts
第三方插件网址:https://github.com/victoryhb/streamlit-option-menu
系统环境:win10
py版本:3.7
安装指令pip install streamlit
安装后测试运行,提示ImportError: cannot import name 'builder' from 'google.protobuf.internal' (D:\Program Files\Python\Python37\lib\site-packages\google\protobuf\internal\__init__.py)
经查说需要更新protobuf来解决。或者将streamlit退到1.13。
所以我最终使用的版本是:
streamlit : 1.13
protobuf : 3.15.6
20230109 由于需要清除缓存功能,所以更改了使用版本
streamlit : 1.15
protobuf : 3.20.0

初次运行

然后第一次运行时候,会提示需要输入一个邮箱来确定使用。
在这里插入图片描述
启动程序后
在这里插入图片描述
可以通过pycharm修改代码,然后刷新页面来查看新的页面效果。
在这里插入图片描述
下面按照需求,在官方文档中找到对应的功能模块,开发调试。

最终效果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

样例代码

主页

主要技术采用st_pages作为分页导航设置

import streamlit as st
from pkg.st_pages.src.st_pages import Page, show_pages, add_page_title
# Optional -- adds the title and icon to the current page
add_page_title()

body = '''
这是主页,暂时没有想好写什么,所以先随便写点东西来填充一下
'''
st.markdown(body, unsafe_allow_html=False)

# Specify what pages should be shown in the sidebar, and what their titles and icons
# should be
# https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json
show_pages(
    [
        Page("main_app.py", "主页", "🏠"),
        Page("public_pages/page_1.py", "微观数据", ":rabbit2:"),
        Page("public_pages/main_page.py", "宏观数据", ":cow2:"),
        # Page("public_pages/page_2.py", "数据覆盖度", ":water_buffalo:"),
    ]
)
st_pages使用注意事项

请求json问题
在使用https://github.com/blackary/st_pages这个包时候,不建议直接安装。因为这个包的stc/st_pages/__init__.py中有一个功能如下:

@st.experimental_singleton
def get_icons() -> dict[str, str]:
    url = "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json"
    return requests.get(url).json()

这里是请求网页上的json,但是在离线环境或者国内的环境就经常请求不到。这里是建议将那个json下载下来,然后放到项目中,然后将代码改成如下:

@st.experimental_singleton
def get_icons() -> dict[str, str]:
    # url = "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json"
    # return requests.get(url).json()
    with open('./pkg/templates/emoji.json', 'r', encoding="utf8") as f:
        js_data = json.load(f)
    return js_data

这样回避访问网络问题。

菜单刷新问题
在使用2级菜单的时候,由于本身该功能包是采用重新写css的方式(li:nth-child)来实现菜单功能的,所以需要所有的次级页面都添加add_page_title()或者add_indentation()来实现菜单层级功能。

主页修改-20230118

原st_pages不太满足需求,故参考st_pages重新写了份类似的功能包streamlit_pages。
main_app.py:导航页面

from pkg.streamlit_pages import add_indentation, Page, Menu, overwrite_pages

from streamlit.source_util import _on_pages_changed, get_pages

pages = [
    Page("main_app.py", "主页", "🏠"),
    Menu(name="微观数据", icon=":horse:"),
    # Menu(name="二级菜单", icon=":horse:", father="微观数据"),
    Page("public_pages/page_1.py", "微观数据", ":rabbit2:", father="微观数据"),
    Menu(name="宏观数据", icon=":horse:"),
    Page("public_pages/page_2.py", "重点指标地区覆盖度", ":rabbit2:", father="宏观数据"),
    Page("public_pages/main_page.py", "重点指标数据查询", ":cow2:", father="宏观数据"),
    Page("public_pages/page_test.py", "测试页面", ":cow2:"),
]

overwrite_pages(pages)
add_indentation()

body = '''
这是主页,暂时没有想好写什么,所以先随便写点东西来填充一下
'''
st.markdown(body, unsafe_allow_html=False)

with st.container():
    with st.expander("检查页面信息-测试开发用"):
        for page in pages:
            st.write(page.to_dict())

streamlit_pages.init.py:page程序包

# -*- coding:utf-8 -*-
# author: cyz
# time: 2023/1/12 14:02
import os, sys

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
# os.chdir(os.path.dirname(os.path.abspath(__file__)))

from dataclasses import dataclass
from pathlib import Path
import json
from typing import Union, Dict, Tuple, List

from streamlit.source_util import _on_pages_changed, get_pages
from streamlit.commands.page_config import get_random_emoji
from streamlit.util import calc_md5
import streamlit as st

try:
    from streamlit import _gather_metrics  # type: ignore
except ImportError:
    def _gather_metrics(name, func, *args, **kwargs):
        return func

try:
    from streamlit.source_util import page_icon_and_name
except ImportError:
    from streamlit.source_util import page_name_and_icon  # type: ignore

    def page_icon_and_name(script_path: Path) -> Tuple[str, str]:
        icon, name = page_name_and_icon(script_path)
        return name, icon

@dataclass
class Page:
    """
    Utility class for working with pages

    Parameters
    ----------
    path: str
        The path to the page
    name: str (optional)
        The name of the page. If not provided, the name will be inferred from
        the path
    icon: str (optional)
        The icon of the page. If not provided, the icon will be inferred from
        the path
    """

    path: str
    name: Union[str, None] = None
    icon: Union[str, None] = None
    is_menu: bool = False
    father: Union[str, None] = None

    @property
    def page_path(self) -> Path:
        return Path(str(self.path))

    @property
    def page_name(self) -> str:
        standard_name = page_icon_and_name(self.page_path)[1]
        standard_name = standard_name.replace("_", " ").title()
        if self.name is None:
            return standard_name
        return self.name

    @property
    def page_icon(self) -> str:
        standard_icon = page_icon_and_name(self.page_path)[0]
        icon = self.icon or standard_icon or ""
        return translate_icon(icon)

    @property
    def page_hash(self) -> str:
        if self.is_menu:
            return calc_md5(f"{self.page_path}_{self.page_name}")
        return calc_md5(str(self.page_path))

    def to_dict(self) -> Dict[str, Union[str , bool]]:
        return {
            "page_script_hash": self.page_hash,
            "page_name": self.page_name,
            "icon": self.page_icon,
            "script_path": str(self.page_path),
            "is_menu": self.is_menu,
            "father": self.father,
        }

    @classmethod
    def from_dict(cls, page_dict: Dict[str, Union[str , bool]]):
        return cls(
            path=str(page_dict["script_path"]),
            name=str(page_dict["page_name"]),
            icon=str(page_dict["icon"]),
            is_menu=bool(page_dict["is_menu"]),
            father=str(page_dict["father"]),
        )


class Menu(Page):
    def __init__(self, name: str, icon: Union[str, None] = None, father=None):
        super().__init__(path="", name=name, icon=icon, is_menu=True, father=father)

# 原本的shou_pages,感觉这个名字更加切合实际
def _overwrite_pages(pages: List[Page]):
    """
    Given a list of Page objects, overwrite whatever pages are currently being
    shown in the sidebar, and overwrite them with this new set of pages.

    NOTE: This changes the list of pages globally, not just for the current user, so
    it is not appropriate for dymaically changing the list of pages.
    """
    current_pages: Dict[str, Dict[str, Union[str , bool]]] = get_pages("")  # type: ignore
    if set(current_pages.keys()) == set(p.page_hash for p in pages):
        return

    try:
        default_page = [p.path for p in pages if p.path][0]
    except IndexError:
        raise ValueError("Must pass at least one page to show_pages")

    for page in pages:
        if page.is_menu:
            page.path = default_page

    current_pages.clear()
    for page in pages:
        current_pages[page.page_hash] = page.to_dict()

    _on_pages_changed.send()

overwrite_pages = _gather_metrics("streamlit_pages.overwrite_pages", _overwrite_pages)

@st.experimental_singleton
def get_icons() -> Dict[str, str]:
    # url = "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json"
    # return requests.get(url).json()
    emoji_path = os.path.dirname(os.path.abspath(__file__)) +'/emoji.json'
    with open(emoji_path, 'r', encoding="utf8") as f:
        js_data = json.load(f)
    return js_data

# @st.experimental_singleton
def get_css() -> str:
    # url = "https://github.com/Socvest/streamlit-on-Hover-tabs/blob/main/st_on_hover_tabs/style.css"
    emoji_path = os.path.dirname(os.path.abspath(__file__)) +'/style.css'
    with open(emoji_path, 'r', encoding="utf8") as f:
        css_file = f.read()
    return css_file

def translate_icon(icon: str) -> str:
    """
    If you pass a name of an icon, like :dog:, translate it into the
    corresponding unicode character
    """
    icons = get_icons()
    if icon == "random":
        icon = get_random_emoji()
    elif icon.startswith(":") and icon.endswith(":"):
        icon = icon[1:-1]
        if icon in icons:
            return icons[icon]
    return icon

def _get_indentation_code(style:str="st-page") -> str:
    if style == "st-page":
        styling = '''
        div[data-testid="stSidebarNav"] {
        /* background-color:rgb(0, 0, 0); */
        /*color:rgb(255, 255, 255);*/
        /*padding-top: 20px;*/
        }
        '''
        current_pages = get_pages("")

        menu_info = {}

        for idx, val in enumerate(current_pages.values()):
            if val.get("is_menu"):
                if val.get("father") and (val.get("father") in menu_info.keys()):
                    menu_margin_left = menu_info[val.get("father")] + 1.5
                else:
                    menu_margin_left = 1

                menu_info[val.get("page_name")] = menu_margin_left
                styling += f"""
                        li:nth-child({idx + 1}) a {{
                            pointer-events: none; /* Disable clicking on section header */
                            margin-left: {menu_margin_left}rem;
                        }}
                    """

            elif val.get("father") and (val.get("father") in menu_info.keys()):
                # Unless specifically unnested, indent all pages that aren't section headers
                menu_margin_left = menu_info[val.get("father")] + 0.5
                styling += f"""
                        li:nth-child({idx + 1}) span:nth-child(1) {{
                            margin-left: {menu_margin_left}rem;
                        }}
                    """

        styling += get_css()
    else:
        styling = '''
        div[data-testid="stSidebarNav"] {
        display: None;
        }
        '''


    styling = f"""
        <style>
            {styling}
        </style>
    """

    return styling

def _add_indentation(style:str="st-page"):
    """
    For an app that has set one or more "sections", this will add indentation
    to the files "within" a section, and make the sections itself
    unclickable. Makes the sidebar look like something like this:

    - page 1
    - section 1
        - page 2
        - page 3
    - section 2
        - page 4
    """

    styling = _get_indentation_code(style)

    st.write(
        styling,
        unsafe_allow_html=True,
    )
    # st.markdown('<style>' + get_css() + '</style>', unsafe_allow_html=True)
add_indentation = _gather_metrics("streamlit_pages.add_indentation", _add_indentation)

style.css:我有部分注释掉,跟原版的有些不太相同。

section[data-testid='stSidebar'] {
    /*background-color: #111;*/
    min-width:unset !important;
    width: unset !important;
    flex-shrink: unset !important;
}

button[kind="header"] {
    background-color: transparent;
    color:rgb(180, 167, 141)
}

@media(hover){
    /* header element to be removed */
    header[data-testid="stHeader"] {
        display:none;
    }

    /* The navigation menu specs and size */
    section[data-testid='stSidebar'] > div {
        /*height: 100%;*/
        width: 95px;
        position: relative;
        z-index: 1;
        top: 0;
        left: 0;
        /*background-color: #111;*/
        overflow-x: hidden;
        transition: 0.5s ease;
        /*padding-top: 60px;*/
        white-space: nowrap;
        /*color:rgb(255, 255, 255)*/
    }

    /* The navigation menu open and close on hover and size */
    /* section[data-testid='stSidebar'] > div {
    height: 100%;
    width: 75px; /* Put some width to hover on. */
    /* }
    /* ON HOVER */
    section[data-testid='stSidebar'] > div:hover{
    width: 300px;
    }

    /* The button on the streamlit navigation menu - hidden */
    button[kind="header"] {
        display: none;
    }
}

@media(max-width: 272px){

    section[data-testid='stSidebar'] > div {
        width:15rem;
    }
}
微观数据页面

这个页面主要是采用直接连接外页面使用的。所以本身代码量不高。

import streamlit as st
import streamlit.components.v1 as components

st.set_page_config(layout="wide", page_title="微观数据")
# https://docs.streamlit.io/library/components/components-api
components.iframe("http://192.168.1.1:5000/", width=1680, height=760, scrolling=True)
宏观数据页面

基础设置

import streamlit as st
import streamlit_nested_layout  # 布局必须导入
from streamlit_tree_select import tree_select  # 树状选择
from streamlit_echarts import st_pyecharts  # 画图

from datetime import datetime
import pandas as pd
from pkg.streamlit_pages import add_indentation  # 刷新菜单栏使用

# https://docs.streamlit.io/library/api-reference/utilities/st.set_page_config
st.set_page_config(layout="wide", page_title="宏观数据")
# https://docs.streamlit.io/library/advanced-features/session-state
if 'macro_index_data' not in st.session_state:  # 数据状态,就是共享的内存,不会随着代码重运行而导致数据重置
    st.session_state.macro_index_data = pd.DataFrame()
add_indentation()

# 页面布局
# https://docs.streamlit.io/library/api-reference/layout/st.columns
col1, col2 = st.columns((4, 1))  # 将页面按照4:1宽度分割成2个区域
with col1:
    select_expander=st.expander("详细选项")
    expander_col1, expander_col2, expander_col3, expander_col4 = select_expander.columns((1, 1, 1, 1))
数据缓存
# 缓存功能区
# https://docs.streamlit.io/library/api-reference/performance/st.cache
@st.experimental_singleton()
def aaaaaa():
    # 请求df数据api
    md = MacroData()
    return md
md = aaaaaa()
dp = DrawPic()

@st.experimental_memo
def get_index_label():
    return md.get_index_label()
按键触发查询数据

查询数据功能

def get_macro_index_data(data):
    source = md.get_macro_index_data(data)
    source = source.astype(object).where(pd.notnull(source), None)
    return source

def selectData(index_code, area_code, dim_cal, dim_dur, frequency, occur_period_star, occur_period_end, model):
    data = {
        "index_code": index_code,
        "area_code": area_code,
        "dim_cal": dim_cal,
        "dim_dur": dim_dur,
        "frequency": frequency,
        "model": model
    }
    if occur_period_star != "":
        data["occur_period_star"] = occur_period_star
    if occur_period_end != "":
        data["occur_period_end"] = occur_period_end
    else:
        data["occur_period_end"] = datetime.now().strftime("%Y%m")

    # st.write('You selected:', data)
    macro_index_data = get_macro_index_data(data)
    st.session_state.macro_index_data = macro_index_data

按键触发

select_data_flag = col1_1_1.button(label="查询", on_click=selectData,
                                       args=(index_code["index_code"].tolist(),
                                             area_code,
                                             dim_cal["cur_code"].tolist(),
                                             index_code["dim_dur"].tolist(),
                                             index_code["frequency"].tolist(),
                                             occur_period_star,
                                             occur_period_end,
                                             model
                                             )
                                       )
下载数据

下载功能

with open('./pkg/templates/jquery-3.2.1.min.js', 'r') as f:
    jquery = f.read()

def downloadFile(df, path, **kwargs):
    # https://discuss.streamlit.io/t/automatic-download-select-and-download-file-with-single-button-click/15141/4
    index = kwargs.get("index", False)
    df.to_excel(path, index=index)
    with open(path, "rb") as file:
        bytes = file.read()
    os.remove(path)
    b64 = base64.b64encode(bytes).decode()
    href = f"""
    <html>
    <head>
    <title>Start Auto Download file</title>
    <script language="javascript">
    {jquery}
    </script>
    <script>
    $('<a href="data:text/xlsx;base64,{b64}" download="{path.split("/")[1]}">')[0].click()
    </script>
    </head>
    </html>"""
    components.html(href, height=0)

这里的js是网页连接上的js,由于网络问题导致经常访问不了,所以这里也是改成读取本地js文件的写法。

按键触发

    download_data_flag = col1_1_2.button(label="下载", on_click=downloadFile, args=(st.session_state.macro_index_data, "download/数据表.xlsx",))
选择按钮

单项选择

data_model_select = st.selectbox("查询的数据类型:", ("数据表", "判断表", "图表"))

在这里插入图片描述

带全选的多项选择

def multiselectContainer(label:str, item:list):
    # https://discuss.streamlit.io/t/select-all-on-a-streamlit-multiselect/9799/2
    container = st.container()
    key = "全选" + label
    select_all = st.checkbox("全选", key=key)
    if select_all:
        select = container.multiselect(label + ":", (item), (item))
    else:
        select = container.multiselect(label + ":", (item))

    return select
multiselectContainer("指标名称", base_dim_index["index_name"].tolist())

在这里插入图片描述
树状结构选择

def treeSelectNodes(source):
    cur_level = sorted(source["cur_level"].unique().tolist(), reverse=True)

    nodes = []
    detail_col = ["country_code", "province_code", "city_code", "area_code", "street_code", "community_code"]
    for index, row in source.iterrows():
        key = set(row[detail_col].fillna("").tolist())
        node = {"label": row.cur_name, "value": row.cur_code, "children": []}
        nodes.append([key, node, row["cur_level"]])
    nodes_df = pd.DataFrame(nodes,columns=["key", "node", "cur_level"])
    nodes_df["is_use"] = False

    for i in range(len(cur_level) - 1):
        tmp = nodes_df[(nodes_df["cur_level"] >= cur_level[i]) &
                       (nodes_df["is_use"] == False)
                       ]  # 小于等于当前层级
        tmp_1 = nodes_df[nodes_df["cur_level"] == cur_level[i + 1]]  # 上一层级
        for index, row in tmp.iterrows():
            for tem_1_key in tmp_1["key"].tolist():

                if tem_1_key.issubset(row["key"]) :  # 判断是否包含关系
                    node = nodes_df[nodes_df["key"] == row["key"]]["node"].tolist()[0]
                    node_1 = nodes_df[nodes_df["key"] == tem_1_key]["node"].tolist()[0]
                    node_1["children"].append(node)
                    nodes_df.loc[nodes_df["key"] == row["key"], "is_use"] = True
                    nodes_df.loc[nodes_df["key"] == tem_1_key, "node"] \
                        = nodes_df.loc[nodes_df["key"] == tem_1_key, "node"].apply(lambda x: node_1)

    return nodes_df[nodes_df["is_use"] == False]["node"].tolist()

@st.experimental_memo
def tree_select_nodes(area_code_list:list):
    source = md.get_dim_admin_area(area_code_list)
    nodes = treeSelectNodes(source)
    return nodes

def tree_select_container(label ,area_code_list, value_node_dict):
    container = st.container()
    nodes = tree_select_nodes(area_code_list)
    select_all = st.checkbox("全选", key="tree_select_all")
    if select_all:
        return_select = tree_select(nodes, checked=area_code_list, check_model="all", no_cascade=True, show_expand_all=True)
    else:
        return_select = tree_select(nodes, check_model="all", no_cascade=True, show_expand_all=True)
    show_select = [value_node_dict[i] for i in return_select["checked"]]
    container.text_area(label, ",".join(show_select), disabled=True, height=130)

    # st.write(return_select)
    return return_select["checked"]
    
area_code = tree_select_container("已选择行政区划",
                                  base_dim_area["area_code"].tolist(),
                                  base_dim_area.set_index("area_code").to_dict()["area_name"])

在这里插入图片描述
勾选后文字划掉

# https://github.com/arnaudmiribel/streamlit-extras/blob/main/src/streamlit_extras/stodo/__init__.py
def to_do(st_commands, checkbox_id:str, value:bool=False):
    """Create a to_do item

    Args:
        st_commands (_type_): _description_
        checkbox_id (_type_): _description_

    Returns:
        _type_: _description_
    """
    container = st.container()
    cols = container.columns((1, 100))
    done = cols[0].checkbox("", key=checkbox_id, value=value)

    if done:
        for (cmd, *args) in st_commands:
            with cols[1]:
                if cmd == st.write:
                    text = args[0]
                    cols[1].write(
                        "<s style='color: rgba(49, 51, 63, 0.4)'>" f" {text} </s>",
                        unsafe_allow_html=True,
                    )
                else:
                    if cmd in (
                        st.slider,
                        st.button,
                        st.checkbox,
                        st.time_input,
                        st.color_picker,
                        st.selectbox,
                        st.camera_input,
                        st.radio,
                        st.date_input,
                        st.multiselect,
                        st.text_area,
                        st.text_input,
                    ):
                        cmd(*args, disabled=True)
                    else:
                        cmd(*args)

    else:
        for (cmd, *args) in st_commands:
            with cols[1]:
                if cmd == st.write:
                    st.write(*args, unsafe_allow_html=True)
                else:
                    cmd(*args)
    st.write("")
    return done

to_do(
    [(st.write, "☕ Take my coffee"), (st.write, "🥞 Have a nice breakfast")],
    "coffee",
)
to_do(
    [(st.write, "☕ Take my coffee")],
    "coffee1",
)

在这里插入图片描述
页面跳转

# ===============================页面跳转=========================
def switch_page(page_name: str):
    # https://github.com/arnaudmiribel/streamlit-extras/blob/main/src/streamlit_extras/switch_page_button/__init__.py
    from streamlit.runtime.scriptrunner import RerunData, RerunException
    from streamlit.source_util import get_pages

    def standardize_name(name: str) -> str:
        return name.lower().replace("_", " ")

    page_name = standardize_name(page_name)

    pages = get_pages("")

    for page_hash, config in pages.items():
        if standardize_name(config["page_name"]) == page_name:
            raise RerunException(
                RerunData(
                    page_script_hash=page_hash,
                    page_name=page_name,
                )
            )

    page_names = [standardize_name(config["page_name"]) for config in pages.values()]

    raise ValueError(f"Could not find page {page_name}. Must be one of {page_names}")

want_to_contribute = st.button("重点指标地区覆盖度!")
if want_to_contribute:
    switch_page("重点指标地区覆盖度")
# ===============================页面跳转=========================

# ===============================页面跳转1=========================
from streamlit.components.v1 import html
import urllib.parse
def nav_page(page_name, timeout_secs=3):
    # https://github.com/streamlit/streamlit/issues/4832
    page_name = urllib.parse.quote(page_name.encode('utf8'))
    nav_script = """
        <script type="text/javascript">
            function attempt_nav_page(page_name, start_time, timeout_secs) {
                var links = window.parent.document.getElementsByTagName("a");
                for (var i = 0; i < links.length; i++) {
                    if (links[i].href.toLowerCase().endsWith("/" + page_name.toLowerCase())) {
                        links[i].click();
                        return;
                    }
                }
                var elasped = new Date() - start_time;
                if (elasped < timeout_secs * 1000) {
                    setTimeout(attempt_nav_page, 100, page_name, start_time, timeout_secs);
                } else {
                    alert("Unable to navigate to page '" + page_name + "' after " + timeout_secs + " second(s).");
                }
            }
            window.addEventListener("load", function() {
                attempt_nav_page("%s", new Date(), %d);
            });
        </script>
    """ % (page_name, timeout_secs)
    html(nav_script)

if st.button("重点指标数据查询"):
    nav_page("重点指标数据查询")
# ===============================页面跳转1=========================

在这里插入图片描述

数据展示

无数据时提醒

st.warning('warning: 没有查询到数据', icon="⚠️")

在这里插入图片描述

表格展示-带筛选器

def dataframe_explorer(df: pd.DataFrame) -> pd.DataFrame:
    # https://github.com/arnaudmiribel/streamlit-extras/blob/main/src/streamlit_extras/dataframe_explorer/__init__.py
    """
    Adds a UI on top of a dataframe to let viewers filter columns
    Args:
        df (pd.DataFrame): Original dataframe
    Returns:
        pd.DataFrame: Filtered dataframe
    """

    random_key_base = pd.util.hash_pandas_object(df)

    df = df.copy()

    # Try to convert datetimes into standard format (datetime, no timezone)
    # for col in df.columns:
    #     if is_object_dtype(df[col]):
    #         try:
    #             df[col] = pd.to_datetime(df[col])
    #         except Exception:
    #             pass
    #
    #     if is_datetime64_any_dtype(df[col]):
    #         df[col] = df[col].dt.tz_localize(None)

    modification_container = st.container()

    with modification_container:
        to_filter_columns = st.multiselect(
            "数据表筛选列", # "Filter dataframe on",
            df.columns,
            key=f"{random_key_base}_multiselect",
        )
        filters: Dict[str, Any] = dict()
        for column in to_filter_columns:
            left, right = st.columns((1, 20))
            # Treat columns with < 10 unique values as categorical
            if is_categorical_dtype(df[column]) or df[column].nunique() < 10:
                left.write("↳")
                filters[column] = right.multiselect(
                    f"{column}的数据值", # f"Values for {column}",
                    df[column].unique(),
                    default=df[column].unique().tolist(),
                    key=f"{random_key_base}_{column}",
                )
                df = df[df[column].isin(filters[column])]
            elif is_numeric_dtype(df[column]):
                left.write("↳")
                _min = float(df[column].min())
                _max = float(df[column].max())
                step = (_max - _min) / 100
                filters[column] = right.slider(
                    f"{column}的数据值", # f"Values for {column}",
                    _min,
                    _max,
                    (_min, _max),
                    step=step,
                    key=f"{random_key_base}_{column}",
                )
                df = df[df[column].between(*filters[column])]
            elif is_datetime64_any_dtype(df[column]):
                left.write("↳")
                filters[column] = right.date_input(
                    f"{column}的数据值", # f"Values for {column}",
                    value=(
                        df[column].min(),
                        df[column].max(),
                    ),
                    key=f"{random_key_base}_{column}",
                )
                if len(filters[column]) == 2:
                    filters[column] = tuple(map(pd.to_datetime, filters[column]))
                    start_date, end_date = filters[column]
                    df = df.loc[df[column].between(start_date, end_date)]
            else:
                left.write("↳")
                filters[column] = right.text_input(
                    f"{column}的样例值",# f"Pattern in {column}",
                    key=f"{random_key_base}_{column}",
                )
                if filters[column]:
                    df = df[df[column].str.contains(filters[column])]

    return df
# https://pandas.pydata.org/docs/reference/api/pandas.io.formats.style.Styler.format.html
try:
    source = dataframe_explorer(st.session_state.macro_index_data)
except:
    source = st.session_state.macro_index_data.copy()
func = lambda s: "%.2f" % float(s) if isNumber(s) else s
source = source.style.format(func)
st.dataframe(source)  # 表格展示

在这里插入图片描述
表格展示

            source = st.session_state.macro_index_data.copy().applymap(lambda x: x[0] if isinstance(x, list) and len(x) == 1 else "✔️" if isinstance(x, list) and len(x) == 2 else x)
            st.dataframe(source)  # 表格展示

在这里插入图片描述
图表展示

class DrawPic():

    def line(self, data:pd.DataFrame, xaxis_col:str, yaxis_col:List[str], **kwargs):
        title = kwargs.get("title", "")
        subtitle = kwargs.get("subtitle", "")
        yaxis_name = kwargs.get("yaxis_name", "")
        yaxis_ext_col = kwargs.get("yaxis_ext_col", [])
        yaxis_ext_name = kwargs.get("yaxis_ext_name", "")
        chart = Line(init_opts=opts.InitOpts())
        xaxis_data = data[xaxis_col].tolist()
        chart.add_xaxis(xaxis_data)
        for i in yaxis_col:
            series_name = i
            yaxis_data = data[i].tolist()
            chart.add_yaxis(
                series_name,
                yaxis_data,
                # markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem()]),
                label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
            )
        if yaxis_ext_col != []:
            for i in yaxis_ext_col:
                series_name = i
                yaxis_data = data[i].tolist()
                chart.add_yaxis(
                    series_name,
                    yaxis_data,
                    yaxis_index=1,
                    label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
                )
            chart.extend_axis(
                yaxis=opts.AxisOpts(
                    name=yaxis_ext_name,
                    name_location="end",
                    type_="value",
                    is_inverse=False,
                    axistick_opts=opts.AxisTickOpts(is_show=True),
                    splitline_opts=opts.SplitLineOpts(is_show=True),
                )
            )
        chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
                              tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
                              yaxis_opts=opts.AxisOpts(
                                    name=yaxis_name,
                                    type_="value",
                                    axistick_opts=opts.AxisTickOpts(is_show=True),
                                    splitline_opts=opts.SplitLineOpts(is_show=True),
                              ),
                              # xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
                              datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100,),
                              # legend_opts=opts.LegendOpts(orient="vertical", pos_left="right")
                              legend_opts=opts.LegendOpts(orient="horizontal", pos_top="top", type_="scroll")
                              )



        return chart

    def bar(self, data:pd.DataFrame, xaxis_col:str, yaxis_col:List[str], **kwargs):
        title = kwargs.get("title", "")
        subtitle = kwargs.get("subtitle", "")
        yaxis_name = kwargs.get("yaxis_name", "")
        yaxis_ext_col = kwargs.get("yaxis_ext_col", [])
        yaxis_ext_name = kwargs.get("yaxis_ext_name", "")
        chart = Bar(init_opts=opts.InitOpts())
        xaxis_data = data[xaxis_col].tolist()
        chart.add_xaxis(xaxis_data)
        for i in yaxis_col:
            series_name = i
            yaxis_data = data[i].tolist()
            chart.add_yaxis(
                series_name,
                yaxis_data,
                # markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem()]),
                label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
            )
        if yaxis_ext_col != []:
            for i in yaxis_ext_col:
                series_name = i
                yaxis_data = data[i].tolist()
                chart.add_yaxis(
                    series_name,
                    yaxis_data,
                    yaxis_index=1,
                    label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
                )
            chart.extend_axis(
                yaxis=opts.AxisOpts(
                    name=yaxis_ext_name,
                    name_location="end",
                    type_="value",
                    is_inverse=False,
                    axistick_opts=opts.AxisTickOpts(is_show=True),
                    splitline_opts=opts.SplitLineOpts(is_show=True),
                )
            )
        chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
                              tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
                              yaxis_opts=opts.AxisOpts(
                                    name=yaxis_name,
                                    type_="value",
                                    axistick_opts=opts.AxisTickOpts(is_show=True),
                                    splitline_opts=opts.SplitLineOpts(is_show=True),
                              ),
                              # xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
                              datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100,),
                              # legend_opts=opts.LegendOpts(orient="vertical", pos_left="right")
                              legend_opts=opts.LegendOpts(orient="horizontal", pos_top="top", type_="scroll")
                              )



        return chart

    def mixBarLine(self, data:pd.DataFrame, xaxis_col:str,
                        yaxis_col:List[str], yaxis_ext_col:List[str],
                        **kwargs):
        title = kwargs.get("title", "")
        subtitle = kwargs.get("subtitle", "")
        yaxis_name = kwargs.get("yaxis_name", "")
        yaxis_ext_name = kwargs.get("yaxis_ext_name", "")
        chart = Bar(init_opts=opts.InitOpts())
        chart1 = Line(init_opts=opts.InitOpts())
        xaxis_data = data[xaxis_col].tolist()
        chart.add_xaxis(xaxis_data)
        chart1.add_xaxis(xaxis_data)
        for i in yaxis_col:
            series_name = i
            yaxis_data = data[i].tolist()
            chart.add_yaxis(
                series_name,
                yaxis_data,
                # markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem()]),
                label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
            )
        chart.extend_axis(
            yaxis=opts.AxisOpts(
                name=yaxis_ext_name,
                name_location="end",
                type_="value",
                is_inverse=False,
                position="right",
                axistick_opts=opts.AxisTickOpts(is_show=True),
                splitline_opts=opts.SplitLineOpts(is_show=True),
            )
        )
        chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
                              tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
                              yaxis_opts=opts.AxisOpts(
                                    name=yaxis_name,
                                    type_="value",
                                    axistick_opts=opts.AxisTickOpts(is_show=True),
                                    splitline_opts=opts.SplitLineOpts(is_show=True),
                              ),
                              xaxis_opts=opts.AxisOpts(type_="category",
                                                       axispointer_opts=opts.AxisPointerOpts(is_show=True, type_="shadow"),
                                                       ),
                              datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100,),
                              # legend_opts=opts.LegendOpts(orient="vertical", pos_left="right")
                              legend_opts=opts.LegendOpts(orient="horizontal", pos_top="top", type_="scroll"),
                              )
        for i in yaxis_ext_col:
            series_name = i
            yaxis_data = data[i].tolist()
            chart1.add_yaxis(
                series_name,
                yaxis_data,
                yaxis_index=1,
                label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
                z_level=1, # 调整折现图级别,高级别显示在低级别的图形上
            )

        return chart.overlap(chart1)

    def scatter(self, x_data, y_data, xaxis_name, yaxis_name, **kwargs):
        title = kwargs.get("title", "")
        subtitle = kwargs.get("subtitle", "")
        chart = Scatter(init_opts=opts.InitOpts())
        chart.add_xaxis(xaxis_data=x_data)
        chart.add_yaxis(
        series_name="",
        y_axis=y_data,
        # symbol_size=20,
        label_opts=opts.LabelOpts(is_show=False),
        )
        chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
                              tooltip_opts=opts.TooltipOpts(trigger="item", axis_pointer_type="cross"),
                              xaxis_opts=opts.AxisOpts(
                                    name=xaxis_name,
                                    type_="value",
                                    splitline_opts=opts.SplitLineOpts(is_show=True)
                              ),
                              yaxis_opts=opts.AxisOpts(
                                    name=yaxis_name,
                                    type_="value",
                                    axistick_opts=opts.AxisTickOpts(is_show=True),
                                    splitline_opts=opts.SplitLineOpts(is_show=True),
                              ),
                              # xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
                              datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100),
                              )
        return chart

    def heatmap(self, data:pd.DataFrame, **kwargs):
        visualmap_opts_min = kwargs.get("visualmap_opts_min", 0)
        visualmap_opts_max = kwargs.get("visualmap_opts_max", 1)
        xaxis_data = data.columns.tolist()
        yaxis_data = data.index.tolist()
        corr_rows = data.shape[0]
        corr_cols = data.shape[1]
        value = [[i, j, round(data.iloc[i, j], 4)] for i in range(corr_rows) for j in range(corr_cols)]
        width = max(100*len(xaxis_data), 900)
        height = max(50*len(xaxis_data), 600)
        chart = HeatMap(init_opts=opts.InitOpts(width=f"{width}px", height=f"{height}px"))
        chart.add_xaxis(xaxis_data=xaxis_data)
        chart.add_yaxis(
                series_name="Punch Card",
                yaxis_data=yaxis_data,
                value=value,
                # label_opts=opts.LabelOpts(
                #     is_show=True, color="#fff", position="bottom", horizontal_align="50%"
                # ),
             )
        chart.set_series_opts()
        chart.set_global_opts(
                legend_opts=opts.LegendOpts(is_show=False),
                xaxis_opts=opts.AxisOpts(
                    type_="category",
                    splitarea_opts=opts.SplitAreaOpts(
                        is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)
                    ),
                ),
                yaxis_opts=opts.AxisOpts(
                    type_="category",
                    splitarea_opts=opts.SplitAreaOpts(
                        is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)
                    ),
                ),
                visualmap_opts=opts.VisualMapOpts(
                    min_=visualmap_opts_min, max_=visualmap_opts_max, is_calculable=True, orient="vertical", pos_left="right"
                ),
                tooltip_opts=opts.TooltipOpts(trigger="item", axis_pointer_type="cross"),
                toolbox_opts=opts.ToolboxOpts(is_show=True),
                # datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100),
            )
        return chart

    def barWithListenClick(self, data:pd.DataFrame, xaxis_col:str, yaxis_col:List[str], **kwargs):
        # 点击打开一个新窗口展示指定数据
        title = kwargs.get("title", "")
        subtitle = kwargs.get("subtitle", "")
        click_page = kwargs.get("click_page", "")
        yaxis_name = kwargs.get("yaxis_name", "")
        yaxis_ext_col = kwargs.get("yaxis_ext_col", [])
        yaxis_ext_name = kwargs.get("yaxis_ext_name", "")
        reversal_axis = kwargs.get("reversal_axis", False)
        tooltip_show_col = kwargs.get("tooltip_show_col", [])
        tooltip_show_formatter = kwargs.get("tooltip_show_formatter", None)
        tooltip_show_formatter_is_js_code = kwargs.get("tooltip_show_formatter_is_js_code", False)
        xaxis_data = data[xaxis_col].tolist()
        chart = Bar(init_opts=opts.InitOpts(width=f"1200px", height=f"600px"))
        chart_id = chart.chart_id
        chart.add_xaxis(xaxis_data)
        for i in yaxis_col:
            series_name = i
            if click_page != "":
                yaxis_data = data[[i, click_page] + tooltip_show_col].copy()
                yaxis_data.rename(columns={i:"value"}, inplace=True)
                yaxis_data = yaxis_data.to_dict(orient="records")
            elif tooltip_show_col != []:
                yaxis_data = data[[i] + tooltip_show_col].copy()
                yaxis_data.rename(columns={i: "value"}, inplace=True)
                yaxis_data = yaxis_data.to_dict(orient="records")
            else:
                yaxis_data = data[i].tolist()
            chart.add_yaxis(
                series_name,
                yaxis_data,
                # markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem()]),
                # label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
            )
        # https://www.jianshu.com/p/10167a735d3a
        # https://stackoverflow.com/questions/2109205/open-window-in-javascript-with-html-inserted
        if click_page != "":
            chart.add_js_funcs('''
            chart_''' + chart_id + '''
            .on('click', function(params){
    
            var newWin = open('url','windowName','width=1100, height=700, top=100, left=100, scrollbars=yes');
            newWin.document.write(params.data.''' + click_page + ''');
            console.log(params.name);//此处写点击事件内容
    
            });//点击事件,此事件还可以用到柱状图等其他地图
    
            ''')
        if tooltip_show_formatter_is_js_code:
            tooltip_show_formatter = JsCode(tooltip_show_formatter)

        chart.set_series_opts(label_opts=opts.LabelOpts(is_show=False), )
        chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
                              tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="shadow",
                                                            formatter=tooltip_show_formatter,
                                                            # trigger_on="click",
                                                            ),
                              toolbox_opts=opts.ToolboxOpts(),
                              datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100,),
                              legend_opts=opts.LegendOpts(orient="horizontal", pos_top="top", type_="scroll", is_show=False)
                              )
        if reversal_axis:
            chart.reversal_axis()
        return chart
        
    def searchTable(self, data:pd.DataFrame, **kwargs):
        title = kwargs.get("title", "")
        subtitle = kwargs.get("subtitle", "")
        table = Table(page_title=title)
        table_id = table.chart_id
        headers = data.columns.tolist()
        rows = data.values.tolist()
        table.add(headers, rows)
        table.set_global_opts(title_opts=opts.ComponentTitleOpts(title=title, subtitle=subtitle),
                              )

        return table


source = st.session_state.macro_index_data.copy()
if ("tag" not in source.columns) or (len(cal_label_select) == 0) or (len(base_dim_index_select) == 0):
    st.dataframe(source)
else:
    chart_data = source[source["tag"] == "index_value"]
    unit_data = source[source["tag"] == "unit_use"]
    tab1, tab2, tab3 = col1.tabs(["📈 图表", "🗃 数据值", "🎀 分析"])
    with tab1:
        item = [i for i in chart_data.columns if i not in ["occur_period", "tag"]]
        tab1_select = tab1.selectbox("选择数据" + ":", (base_dim_index_select))

        # 正常的单轴构造
        yaxis_col = [i for i in chart_data.columns if (tab1_select in i) and (cal_label_select[0] in i)]
        for i in yaxis_col:
            if unit_data[i].dropna().unique().tolist() != []:
                unit_use = unit_data[i].dropna().unique().tolist()[0]
                break
        if yaxis_col == []:
            yaxis_name = ""
        else:
            yaxis_name = cal_label_select[0] + "(" + unit_use + ")"
        # 额外的第二轴构造
        if len(cal_label_select) == 2:
            yaxis_ext_col = [i for i in chart_data.columns if (tab1_select in i) and (cal_label_select[1] in i)]
            for i in yaxis_ext_col:
                if unit_data[i].dropna().unique().tolist() != []:
                    unit_ext_use = unit_data[i].dropna().unique().tolist()[0]
                    break
            if yaxis_ext_col != []:
                yaxis_ext_name = cal_label_select[1] + "(" + unit_ext_use + ")"
            else:
                yaxis_ext_name = ""
        else:
            yaxis_ext_col = []
            yaxis_ext_name = ""
        if len(cal_label_select) == 2:
            c = dp.mixBarLine(chart_data,
                        "occur_period",
                        yaxis_col,
                        yaxis_ext_col,
                        yaxis_name=yaxis_name,
                        yaxis_ext_name=yaxis_ext_name
                        )
        else:
            c = dp.line(chart_data,
                        "occur_period",
                        yaxis_col,
                        yaxis_name=yaxis_name)
        st_pyecharts(c, height="500px")
    with tab2:
        st.dataframe(source)


source = st.session_state[cur_page_df + "__02"].copy()
source["new_window_html"] = source["area_detail_list"].apply(lambda x:createNewWindowHtml(x))
source["area_num"] = source["area_detail_list"].apply(lambda x: str(collectAreaNum(x)) + '/1156')
source["area_code_percent(%)"] = source["area_code_percent"].apply(lambda x: round(100*x, 2))
chart = dp.barWithListenClick(source,
            "occur_period",
            ["area_code_percent(%)"],
            yaxis_name="yaxis_name",
            click_page="new_window_html",
            tooltip_show_col=["area_num"],
            tooltip_show_formatter="function(x){return  x[0].name + ': '  + x[0].data.value + '%' + ' (' + x[0].data.area_num + ')地区' + '';}",
            tooltip_show_formatter_is_js_code=True,
            title=source["index_name"].tolist()[0]
清除缓存
def clearCache():
    # https://docs.streamlit.io/library/advanced-features/experimental-cache-primitives
    st.experimental_memo.clear()
    st.experimental_singleton.clear()

查询配置信息

st.write("theme base: ",st.get_option("theme.base"))
st.write("server port: ",st.get_option("server.port"))

启动项

streamlit run script.py --theme.base light --server.port 80
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐