个 BigQuery 事务,涉及多个查询,包括会话
BigQuery 支持自去年以来的事务(在 Google Cloud Next'21 上展示):现在可以对一个或多个表执行变异操作,然后通过将脚本包装在
BEGIN TRANSACTION;
进入全屏模式 退出全屏模式
和
COMMIT TRANSACTION;
进入全屏模式 退出全屏模式
或者
ROLLBACK TRANSACTION;
进入全屏模式 退出全屏模式
很容易
然而交易有一个限制:
一个事务不能跨越多个脚本。
虽然这在大多数情况下都不是问题,但当事务中包含的脚本变得过于复杂或查询参数过多或破坏 BigQuery 作业](https://cloud.google.com/bigquery/quotas#jobs)的任何其他[配额时,它可能会成为问题。例如,当从请求负载自动生成查询脚本时,就会发生这种情况。
有一种解决方法,即 BigQuery 会话。让我们看看它是如何工作的
BigQuery 会话
Sessions是一种在它们之间链接作业和持久化临时数据(如临时表)的方法。
会话的一个常见用例正是我们想要的:
在多个查询上创建多语句事务。在会话中,您可以在决定提交或回滚之前开始事务、进行更改并查看临时结果。您可以对会话中的多个查询执行此操作。如果不使用会话,则需要在单个查询中完成多语句事务。
这个想法是将事务查询堆叠在同一个会话中,从BEGIN TRANSACTION;开始,以COMMIT TRANSACTION;结束。
在这两者之间,您可以根据需要调用 put 尽可能多的查询,整个会话将具有原子行为。
会话在 24 小时不活动后自动关闭。但是,当与事务混合时,目标表可能会在会话中“锁定”并在会话结束之前变得不可用。这就是为什么我建议强制会话在脚本末尾结束。它是通过调用以下查询来完成的:
CALL BQ.ABORT_SESSION();
进入全屏模式 退出全屏模式
Python实现
我们正在处理会话的概念,我们需要在处理结束时打开并始终关闭:为此自然会指示上下文管理器。
"""ContextManager wrapping a bigquery session."""
from google.cloud import bigquery
class BigquerySession:
"""ContextManager wrapping a bigquerySession."""
def __init__(self, bqclient: bigquery.Client, bqlocation: str = "EU") -> None:
"""Construct instance."""
self._bigquery_client = bqclient
self._location = bqlocation
self._session_id = None
def __enter__(self) -> str:
"""Initiate a Bigquery session and return the session_id."""
job = self._bigquery_client.query(
"SELECT 1;", # a query can't fail
job_config=bigquery.QueryJobConfig(create_session=True),
location=self._location,
)
self._session_id = job.session_info.session_id
job.result() # wait job completion
return self._session_id
def __exit__(self, exc_type, exc_value, traceback):
"""Abort the opened session."""
if self._session_id:
# abort the session in any case to have a clean state at the end
# (sometimes in case of script failure, the table is locked in
# the session)
job = self._bigquery_client.query(
"CALL BQ.ABORT_SESSION();",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=self._session_id
)
],
),
location=self._location,
)
job.result()
进入全屏模式 退出全屏模式
然后使用此上下文将作业堆叠到单个会话中变得非常容易,从而创建多语句、多脚本 bigquery 事务:
with BigquerySession(self.bigquery_client, BIGQUERY_LOCATION) as session_id:
# open transaction
job = self.bigquery_client.query(
"BEGIN TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
# stack queries
for queryscript in scripts:
job = self.bigquery_client.query(
queryscript,
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
# end transaction
job = self.bigquery_client.query(
"COMMIT TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
进入全屏模式 退出全屏模式
请注意所有作业如何使用相同的 session\id(即在同一会话中)和在同一位置(这是会话的要求)运行。
希望这可以帮助 !
照片由 Caroline Selfors 在 Unsplash 上拍摄
更多推荐

所有评论(0)