import io
import zlib
from pathlib import Path

from bson import ObjectId
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure


def byte_to_hex(byte_array: bytes) -> str:
    return byte_array.hex().upper()
    # return ''.join([format(x, '02X') for x in byte_array])


def hex_to_bytes(hex_str: str) -> bytearray:
    if hex_str is None or len(hex_str) % 2 != 0:
        return bytearray()

    byte_array = bytearray(len(hex_str) // 2)
    for i in range(0, len(hex_str), 2):
        val = int(hex_str[i:i + 2], 16).to_bytes(1, byteorder='big')
        byte_array[i // 2] = val[0]
    return byte_array


def compress(text: str) -> bytes:
    return zlib.compress(text.encode('utf-8'), level=zlib.Z_BEST_COMPRESSION)
    # compressor = zlib.compressobj(level=zlib.Z_BEST_COMPRESSION, strategy=zlib.Z_DEFAULT_STRATEGY)
    # compressed_data = compressor.compress(text.encode('utf-8')) + compressor.flush()  # 压缩数据并结束压缩流
    # return compressed_data


def decompress(compressed_data: bytearray) -> str:
    input_stream = io.BytesIO(compressed_data)
    output = zlib.decompress(input_stream.read())
    # chunk_size = 8192  # 设置一个合适的块大小
    # decompressor = zlib.decompressobj()
    # output = b''
    # while True:
    #     chunk = input_stream.read(chunk_size)
    #     if not chunk:
    #         break
    #     output += decompressor.decompress(chunk)
    # output += decompressor.flush()
    return str(output, 'utf-8')


def compress_str(text: str) -> str:
    return byte_to_hex(compress(text))


def decompress_str(compressed_str: str) -> str:
    return decompress(hex_to_bytes(compressed_str))


if __name__ == '__main__':
    client = None
    try:
        client = MongoClient('mongodb://x:x@x:27017/ota?authSource=ota&directConnection=true')
        # 尝试执行一个简单命令来验证连接
        client.admin.command('ping')
        print("成功连接到 MongoDB 服务器!")
    except ConnectionFailure as e:
        print(f"无法连接到 MongoDB 服务器: {e}")
        print("请确保 MongoDB 服务正在运行。")
        exit()
    except Exception as e:
        print(f"连接过程中发生未知错误: {e}")
        exit()

    # 获取数据库对象
    # 如果数据库不存在,MongoDB 会在第一次写入数据时隐式创建它。
    db_name = 'ota'
    db = client[db_name]
    print(f"已连接到数据库 '{db_name}'。")
    # collection_names = db.list_collection_names()
    # print(f"当前数据库 '{db_name}' 中的集合名称: {collection_names}")
    transaction_tbl = db.get_collection('Transaction')
    transaction_data = transaction_tbl.find_one({'_id': ObjectId('6a0c28e12d400c06be1d9440')})
    # print("transaction_data: ", transaction_data)
    session_tbl = db.get_collection('SessionAbroadTS')
    # session_data_cnt = session_tbl.count_documents({'mSvrsessionid': transaction_data['svrsessionid']})
    # print("session_data_cnt: ", session_data_cnt)
    session_data_list = session_tbl.find({'mMofVin': transaction_data['vin'],'mSvrsessionid': transaction_data['svrsessionid']}).sort('mCreatedAt', 1)
    i = 0
    for session_data in session_data_list:
        # print(session_data)
        session_msg_tbl = db.get_collection('SessionAbroadMsgCP')
        session_msg_data = session_msg_tbl.find_one({'_id': ObjectId(session_data["mSessionMsgId"])})
        print(session_msg_data)
        # print(f"789C{compress_str(decompress_str(session_msg_data["mSendMsg"]))[4:]}")
        # print(compress_str(decompress_str(session_msg_data["mSendMsg"])))
        # print(session_msg_data["mSendMsg"])
        # print("session_msg_data:\n", decompress_str(session_msg_data["mSendMsg"]))
        # print(f"789C{compress_str(decompress_str(session_msg_data["mSendMsg"]))[4:]}" == session_msg_data["mSendMsg"])

        i = i + 1
        send_save_path = Path("C:/Users/Admin/Downloads/DMMessage",
                         "send_message_{}_{}.bin".format(transaction_data['svrsessionid'], i))
        receive_msg = decompress_str(session_msg_data["mReceiveMsg"])
        new_receive_msg = receive_msg.replace("\r\n", "\n")
        send_save_path.write_bytes(new_receive_msg.encode("utf-8"))
        receive_save_path = Path("C:/Users/Admin/Downloads/DMMessage",
                              "receive_message_{}_{}.bin".format(transaction_data['svrsessionid'], i))
        send_msg = decompress_str(session_msg_data["mSendMsg"])
        new_send_msg = send_msg.replace("\r\n", "\n")
        receive_save_path.write_bytes(new_send_msg.encode("utf-8"))
import io
import zlib
from pathlib import Path

from bson import ObjectId
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure


def byte_to_hex(byte_array: bytes) -> str:
    return byte_array.hex().upper()
    # return ''.join([format(x, '02X') for x in byte_array])


def hex_to_bytes(hex_str: str) -> bytearray:
    if hex_str is None or len(hex_str) % 2 != 0:
        return bytearray()

    byte_array = bytearray(len(hex_str) // 2)
    for i in range(0, len(hex_str), 2):
        val = int(hex_str[i:i + 2], 16).to_bytes(1, byteorder='big')
        byte_array[i // 2] = val[0]
    return byte_array


def compress(text: str) -> bytes:
    return zlib.compress(text.encode('utf-8'), level=zlib.Z_BEST_COMPRESSION)
    # compressor = zlib.compressobj(level=zlib.Z_BEST_COMPRESSION, strategy=zlib.Z_DEFAULT_STRATEGY)
    # compressed_data = compressor.compress(text.encode('utf-8')) + compressor.flush()  # 压缩数据并结束压缩流
    # return compressed_data


def decompress(compressed_data: bytearray) -> str:
    input_stream = io.BytesIO(compressed_data)
    output = zlib.decompress(input_stream.read())
    # chunk_size = 8192  # 设置一个合适的块大小
    # decompressor = zlib.decompressobj()
    # output = b''
    # while True:
    #     chunk = input_stream.read(chunk_size)
    #     if not chunk:
    #         break
    #     output += decompressor.decompress(chunk)
    # output += decompressor.flush()
    return str(output, 'utf-8')


def compress_str(text: str) -> str:
    return byte_to_hex(compress(text))


def decompress_str(compressed_str: str) -> str:
    return decompress(hex_to_bytes(compressed_str))


def query_log(transaction_id: str, cur_step: str):
    client = None
    try:
        client = MongoClient('mongodb://user:password@host:27017/ota?authSource=ota&directConnection=true')
        # 尝试执行一个简单命令来验证连接
        client.admin.command('ping')
        print("成功连接到 MongoDB 服务器!")
    except ConnectionFailure as e:
        print(f"无法连接到 MongoDB 服务器: {e}")
        print("请确保 MongoDB 服务正在运行。")
        exit()
    except Exception as e:
        print(f"连接过程中发生未知错误: {e}")
        exit()

    # 获取数据库对象
    # 如果数据库不存在,MongoDB 会在第一次写入数据时隐式创建它。
    db_name = 'ota'
    db = client[db_name]
    print(f"已连接到数据库 '{db_name}'。")

    transaction_tbl = db['Transaction']
    transaction_data = transaction_tbl.find_one({'_id': ObjectId(transaction_id)})
    if transaction_data:
        session_tbl = db['SessionAbroadTS']
        query = {'mMofVin': transaction_data['vin'], 'mSvrsessionid': transaction_data['svrsessionid'], 'mPartNo': 'HU',
                 'mCurStep': cur_step}
        session_data = session_tbl.find_one(query)
        if session_data:
            session_msg_tbl = db['SessionAbroadMsgCP']
            session_msg_data = session_msg_tbl.find_one({'_id': ObjectId(session_data["mSessionMsgId"])})
            if session_msg_data:
                data_dir = Path("D:\\Mongo")
                if not data_dir.exists():
                    data_dir.mkdir(parents=True, exist_ok=True)
                file_name = f"{transaction_id}_step{cur_step}.bin"
                save_path = Path(data_dir, file_name)
                receive_msg = decompress_str(session_msg_data["mReceiveMsg"])
                text = f"POST /dm/message HTTP/1.1\nContent-Type:application/vnd.syncml.dm+xml\nContent-Length:{len(receive_msg)}\n{receive_msg}"
                save_path.write_text(text)
                print(f"写入{file_name}成功!")
    client.close()


if __name__ == '__main__':
    query_log('6a2d518d674a8b19aefea0f5', "5")

更多推荐