python 查询MongoDB数据库
·
import io
import zlib
from pathlib import Path
from bson import ObjectId
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
def byte_to_hex(byte_array: bytes) -> str:
return byte_array.hex().upper()
# return ''.join([format(x, '02X') for x in byte_array])
def hex_to_bytes(hex_str: str) -> bytearray:
if hex_str is None or len(hex_str) % 2 != 0:
return bytearray()
byte_array = bytearray(len(hex_str) // 2)
for i in range(0, len(hex_str), 2):
val = int(hex_str[i:i + 2], 16).to_bytes(1, byteorder='big')
byte_array[i // 2] = val[0]
return byte_array
def compress(text: str) -> bytes:
return zlib.compress(text.encode('utf-8'), level=zlib.Z_BEST_COMPRESSION)
# compressor = zlib.compressobj(level=zlib.Z_BEST_COMPRESSION, strategy=zlib.Z_DEFAULT_STRATEGY)
# compressed_data = compressor.compress(text.encode('utf-8')) + compressor.flush() # 压缩数据并结束压缩流
# return compressed_data
def decompress(compressed_data: bytearray) -> str:
input_stream = io.BytesIO(compressed_data)
output = zlib.decompress(input_stream.read())
# chunk_size = 8192 # 设置一个合适的块大小
# decompressor = zlib.decompressobj()
# output = b''
# while True:
# chunk = input_stream.read(chunk_size)
# if not chunk:
# break
# output += decompressor.decompress(chunk)
# output += decompressor.flush()
return str(output, 'utf-8')
def compress_str(text: str) -> str:
return byte_to_hex(compress(text))
def decompress_str(compressed_str: str) -> str:
return decompress(hex_to_bytes(compressed_str))
if __name__ == '__main__':
client = None
try:
client = MongoClient('mongodb://x:x@x:27017/ota?authSource=ota&directConnection=true')
# 尝试执行一个简单命令来验证连接
client.admin.command('ping')
print("成功连接到 MongoDB 服务器!")
except ConnectionFailure as e:
print(f"无法连接到 MongoDB 服务器: {e}")
print("请确保 MongoDB 服务正在运行。")
exit()
except Exception as e:
print(f"连接过程中发生未知错误: {e}")
exit()
# 获取数据库对象
# 如果数据库不存在,MongoDB 会在第一次写入数据时隐式创建它。
db_name = 'ota'
db = client[db_name]
print(f"已连接到数据库 '{db_name}'。")
# collection_names = db.list_collection_names()
# print(f"当前数据库 '{db_name}' 中的集合名称: {collection_names}")
transaction_tbl = db.get_collection('Transaction')
transaction_data = transaction_tbl.find_one({'_id': ObjectId('6a0c28e12d400c06be1d9440')})
# print("transaction_data: ", transaction_data)
session_tbl = db.get_collection('SessionAbroadTS')
# session_data_cnt = session_tbl.count_documents({'mSvrsessionid': transaction_data['svrsessionid']})
# print("session_data_cnt: ", session_data_cnt)
session_data_list = session_tbl.find({'mMofVin': transaction_data['vin'],'mSvrsessionid': transaction_data['svrsessionid']}).sort('mCreatedAt', 1)
i = 0
for session_data in session_data_list:
# print(session_data)
session_msg_tbl = db.get_collection('SessionAbroadMsgCP')
session_msg_data = session_msg_tbl.find_one({'_id': ObjectId(session_data["mSessionMsgId"])})
print(session_msg_data)
# print(f"789C{compress_str(decompress_str(session_msg_data["mSendMsg"]))[4:]}")
# print(compress_str(decompress_str(session_msg_data["mSendMsg"])))
# print(session_msg_data["mSendMsg"])
# print("session_msg_data:\n", decompress_str(session_msg_data["mSendMsg"]))
# print(f"789C{compress_str(decompress_str(session_msg_data["mSendMsg"]))[4:]}" == session_msg_data["mSendMsg"])
i = i + 1
send_save_path = Path("C:/Users/Admin/Downloads/DMMessage",
"send_message_{}_{}.bin".format(transaction_data['svrsessionid'], i))
receive_msg = decompress_str(session_msg_data["mReceiveMsg"])
new_receive_msg = receive_msg.replace("\r\n", "\n")
send_save_path.write_bytes(new_receive_msg.encode("utf-8"))
receive_save_path = Path("C:/Users/Admin/Downloads/DMMessage",
"receive_message_{}_{}.bin".format(transaction_data['svrsessionid'], i))
send_msg = decompress_str(session_msg_data["mSendMsg"])
new_send_msg = send_msg.replace("\r\n", "\n")
receive_save_path.write_bytes(new_send_msg.encode("utf-8"))
import io
import zlib
from pathlib import Path
from bson import ObjectId
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
def byte_to_hex(byte_array: bytes) -> str:
return byte_array.hex().upper()
# return ''.join([format(x, '02X') for x in byte_array])
def hex_to_bytes(hex_str: str) -> bytearray:
if hex_str is None or len(hex_str) % 2 != 0:
return bytearray()
byte_array = bytearray(len(hex_str) // 2)
for i in range(0, len(hex_str), 2):
val = int(hex_str[i:i + 2], 16).to_bytes(1, byteorder='big')
byte_array[i // 2] = val[0]
return byte_array
def compress(text: str) -> bytes:
return zlib.compress(text.encode('utf-8'), level=zlib.Z_BEST_COMPRESSION)
# compressor = zlib.compressobj(level=zlib.Z_BEST_COMPRESSION, strategy=zlib.Z_DEFAULT_STRATEGY)
# compressed_data = compressor.compress(text.encode('utf-8')) + compressor.flush() # 压缩数据并结束压缩流
# return compressed_data
def decompress(compressed_data: bytearray) -> str:
input_stream = io.BytesIO(compressed_data)
output = zlib.decompress(input_stream.read())
# chunk_size = 8192 # 设置一个合适的块大小
# decompressor = zlib.decompressobj()
# output = b''
# while True:
# chunk = input_stream.read(chunk_size)
# if not chunk:
# break
# output += decompressor.decompress(chunk)
# output += decompressor.flush()
return str(output, 'utf-8')
def compress_str(text: str) -> str:
return byte_to_hex(compress(text))
def decompress_str(compressed_str: str) -> str:
return decompress(hex_to_bytes(compressed_str))
def query_log(transaction_id: str, cur_step: str):
client = None
try:
client = MongoClient('mongodb://user:password@host:27017/ota?authSource=ota&directConnection=true')
# 尝试执行一个简单命令来验证连接
client.admin.command('ping')
print("成功连接到 MongoDB 服务器!")
except ConnectionFailure as e:
print(f"无法连接到 MongoDB 服务器: {e}")
print("请确保 MongoDB 服务正在运行。")
exit()
except Exception as e:
print(f"连接过程中发生未知错误: {e}")
exit()
# 获取数据库对象
# 如果数据库不存在,MongoDB 会在第一次写入数据时隐式创建它。
db_name = 'ota'
db = client[db_name]
print(f"已连接到数据库 '{db_name}'。")
transaction_tbl = db['Transaction']
transaction_data = transaction_tbl.find_one({'_id': ObjectId(transaction_id)})
if transaction_data:
session_tbl = db['SessionAbroadTS']
query = {'mMofVin': transaction_data['vin'], 'mSvrsessionid': transaction_data['svrsessionid'], 'mPartNo': 'HU',
'mCurStep': cur_step}
session_data = session_tbl.find_one(query)
if session_data:
session_msg_tbl = db['SessionAbroadMsgCP']
session_msg_data = session_msg_tbl.find_one({'_id': ObjectId(session_data["mSessionMsgId"])})
if session_msg_data:
data_dir = Path("D:\\Mongo")
if not data_dir.exists():
data_dir.mkdir(parents=True, exist_ok=True)
file_name = f"{transaction_id}_step{cur_step}.bin"
save_path = Path(data_dir, file_name)
receive_msg = decompress_str(session_msg_data["mReceiveMsg"])
text = f"POST /dm/message HTTP/1.1\nContent-Type:application/vnd.syncml.dm+xml\nContent-Length:{len(receive_msg)}\n{receive_msg}"
save_path.write_text(text)
print(f"写入{file_name}成功!")
client.close()
if __name__ == '__main__':
query_log('6a2d518d674a8b19aefea0f5', "5")
更多推荐

所有评论(0)