server.py

#!/usr/ftpbin/env python
# -*- coding: utf-8 -*-
#    @version : 0.0.1
#    @File    : server.py
#    @Time    : 2019/7/11 15:35
#    @Site    : 
#    @Software: PyCharm
#    @Author  : KANGXINWEN
#    @Author_email: singbogo@163.com
#    @description:  通过socket远程调用执行命令并返回
# https://github.com/henryyuan/MYFTP
# https://segmentfault.com/a/1190000014744919


import socket
import os
import signal
import subprocess
from threading import Timer
import exception
import threading
import struct
import time

from Order import man_order, MAXBUFFER, LOB_order
from network import IP
from MYFTP.ftpbin.myftps import ftp_server_run
from MYFTP.ftpconf.settings import ServerConfig
from MYFTP.ftpbin.create_user import create_user


class AutoMainPlantSocket(object):

    def __init__(self, ip=None):
        self._socket_obj = None
        self._code = "utf8"
        self._address_port = ('127.0.0.1', 9999)
        self._backlog = 5  # max connecting count
        self._conn = None
        self._process = None
        self._timeout = 10  # time out
        self._is_timeout = False
        self._timer = None
        self._MAXBUFFER = 1024
        self._threads = []
        self._connaddress = []    # <class 'socket.socket'>
        self._processs = []

        self._init(ip)

    def set_timeout(self, timeout):
        self._timeout = timeout

    def _timeout_callback(self, conn, p):
        self._is_timeout = True
        print("exe time out call back")
        self._send(conn, str("poll:%s" % '0'))

        try:
            os.killpg(p.pid, signal.SIGKILL)
        except Exception as error:
            print("Exception _timeout_callback: %s" % str(error))

    def set_addressport(self, address, port):
        self._address_port = (address, port)

    def set_localIPPort(self, ip, port=9999):
        if ip:
            self.set_addressport(ip, port)
        else:
            self._address_port = (IP.get_host_ip(), port)

    def set_backlog(self, backlog):
        self._backlog = backlog

    def _start_timer(self, conn):
        self._timer = Timer(self._timeout, self._timeout_callback, [conn, self._process])
        return self._timer.start()

    @staticmethod
    def _cmdsplit(msg, maxsplit=1):
        if type(msg) is 'str':
            msg = bytes(msg)
        if msg:
            # TypeError: must be str, not bytes
            if msg.find(str(" ")) != -1:
                return msg.split(maxsplit=maxsplit)
            else:
                return msg, "notFound"
        else:
            return None

    def _init(self, ip="127.0.0.1", port=9999):
        """
        socket init
        :return:
        """
        # init program
        self.set_localIPPort(ip, port)

        # init socket
        self._socket_obj = socket.socket()
        self._socket_obj.bind(self._address_port)
        print("sevser blind %s port %d successful......" % (self._address_port[0], self._address_port[1]))
        self._socket_obj.listen(self._backlog)
        print('server listening......')

    def _accept(self):
        conn, address = self._socket_obj.accept()
        print("Accept new connection from %s....." % str(address))
        self._send(conn, b"Welcome!\n")
        return conn, address

    def _recv(self, conn, buffersize=MAXBUFFER):
        # recv data
        # when client close connect recv  exception ConnectionResetError
        data = str()
        try:
            data = conn.recv(buffersize)
        except ConnectionResetError:
            print("%s is be disconnected" % str(conn))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            # client connecting  be closed by client, server need shutdown recv
            data = "quit"
        except ConnectionAbortedError:
            print("%s is be disconnected" % str(conn))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            # client connecting  be closed by client, server need shutdown recv
            data = "quit"
        except OSError as e:
            print("%s is be disconnected" % str(conn))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            data = "quit"
        except Exception as e:
            print("%s is be disconnected Exception : %s" % (str(conn), str(e)))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            # client connecting  be closed by client, server need shutdown recv
            data = "quit"
        finally:
            return data

    @staticmethod
    def __bytes2str(bytes_data):
        if bytes_data:
            str_data = str(bytes_data, 'utf-8')
            print(">>>%s" % str_data)
            return str_data

    @staticmethod
    def __str2bytes(str_data):
        if str_data:
            bytes_data = bytes(str_data, 'utf-8')
            print(">>>%s" % bytes_data)
            return bytes_data

    def _send(self, conn, data):
        if type(data) is str:
           data = bytes(str(data), 'utf-8')
        print("send data: %s ......" % data)
        try:
            return conn.sendall(data)
        except ConnectionResetError:
            print("%s is be disconnected" % str(conn))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            # client connecting  be closed by client, server need shutdown recv
            return "quit"
        except ConnectionAbortedError:
            print("%s is be disconnected" % str(conn))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            # client connecting  be closed by client, server need shutdown recv
            return "quit"
        except OSError as oserr:
            print("%s is be disconnected" % str(conn))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            # client connecting  be closed by client, server need shutdown recv
            return "quit"
        except Exception as e:
            print("%s is be disconnected Exception : %s" % (str(conn), str(e)))
            if conn in self._connaddress:
                self._connaddress.remove(conn)
            # client connecting  be closed by client, server need shutdown recv
            return "quit"
        finally:
            del data

    def _run_cmd(self, conn, cmd):
        # subprocess Popen
        process = None
        try:
            process = subprocess.Popen(cmd, shell=True,
                                       stdin=subprocess.PIPE,
                                       stdout=subprocess.PIPE,
                                       stderr=subprocess.STDOUT)
            self._processs.append(process)

            order, paramer = AutoMainPlantSocket._cmdsplit(cmd)
            if order.upper() in LOB_order:
                return self._dispose1(process, conn, process.poll(), cmd)
            else:
                return self._dispose(process, conn, process.poll(), cmd)
        except exception as e:
            print("Exception _run_cmd:%s....." % str(e))
            self._close_process(process)
        finally:
            pass

    def _dispose1(self, process, conn, poll_flag, cmd):
        """
        ret msg dispose
        :param poll_flag:
        :param cmd:
        :return:
        """
        line = None
        try:
            while poll_flag is None or poll_flag > 0:
                data, err = process.communicate()
                if process.returncode:
                    if err:
                        line = err
                    else:
                        line = data
                else:
                    line = data
                # poll_flag = process.poll()
                line = line.strip()
                if line:
                    # 为了将数据全部发送 先发送长度
                    # run_result_len = len(line)
                    # self._conn.sendall(bytes(str(run_result_len), 'utf8')))
                    self._send(conn, line)
                    poll_flag = 0
                    str1 = str("poll:%s" % poll_flag)
                    self._send(conn, str1)
        except Exception as e:
            print("%s is Exception : %s" % (str(conn), str(e)))
        finally:
            del line

    def _dispose(self, process, conn, poll_flag, cmd):
        """
        ret msg dispose
        :param poll_flag:
        :param cmd:
        :return:
        """
        while poll_flag is None or poll_flag > 0:
            # failed cmd
            if process.returncode:
                print("Non zero exit code:%s executing: %s poll_flag: %s" % (process.returncode, cmd, poll_flag))
                if process.stderr is None:
                    line = process.stdout.readline()
                else:
                    line = process.stderr.readline()
            else:
                line = process.stdout.readline()  # read line
            poll_flag = process.poll()
            line = line.strip()
            if line:
                # 为了将数据全部发送 先发送长度
                # run_result_len = len(line)
                # self._conn.sendall(bytes(str(run_result_len), 'utf8')))
                self._send(conn, line)
            elif len(line) is 0:
                str1 = str("poll:%s" % poll_flag)
                if poll_flag is None:
                    pass
                elif poll_flag >= 0:
                    self._send(conn, str1)
                    poll_flag = 0
                # else:
                #     self._start_timer(conn)        # wait timeout
                #     poll_flag = 0     # set while-end flag

    def _close_connect(self, conn):
        if conn:
            conn.close()
            # remove list
            if conn in self._connaddress:
                self._connaddress.remove(conn)

    def _close_socket(self):
        if self._socket_obj:
            self._socket_obj.close()

    def _close_process(self, process):
        if process:
            process.kill()
            process.wait()
            # remove list
            if process in self._processs:
                self._processs.remove(process)

    def _close_thread(self, thread):
        if thread:
            thread.kill()
            # remove list
            if thread in self._threads:
                self._threads.remove(thread)

    def dosomething(self, conn, msg):
        # input_msg = input('>>>:')
        # if len(input_msg) == 0:
        #     return
        # data encode utf-8
        # input_byte = bytes(input_msg, 'utf8')
        # 1、send to cilent
        # exit
        order, paramer = AutoMainPlantSocket._cmdsplit(msg)
        if order.upper() in ("QUIT", "Q", "EXIT", "BYE", "BYEBYE"):
            self._send(conn, msg)
            # 2、so something self like shutdown connect
            self._close_connect(conn)
            return "quit"
        elif order in ("shutdown", ):
            pass
        elif order.upper() in ['cmd', 'CMD', 'Cmd']:
            self._send_poll0(conn)
        elif order.upper() in ["ftp", "FTP"]:
            self._send_poll0(conn)
            self._run_ftp_server(conn)
        elif order.upper() in ["CreateFtpUser", "CREATEFTPUSER"]:
            self._send(conn, "Account:test   Password:test    Home Dir:test   Quota:10 ")
            self._send_poll0(conn)
            account_info = self._recv(conn)
            account_info = self.__bytes2str(account_info)
            #  # 进行分割
            account, password, homedir, Quota = account_info.split("#", maxsplit=3)
            create_user(account, password, homedir, Quota)
            self._send(conn, "create ftp user successful")
            self._send_poll0(conn)
        else:
            pass
        return "continuewhile"

    def _send_poll0(self, conn):
        """
        send end recv flag
        :param conn:
        :return:
        """
        poll0 = str("poll:%s" % 0)
        self._send(conn, bytes(poll0, 'utf-8'))

    def _tcplink(self, conn):

        try:
            while True:
                # 2、recv from client cmd
                cmd = self._recv(conn)
                if cmd == "quit":
                    break
                cmd = self.__bytes2str(cmd)
                # when recv order is None user can input order do something....
                if cmd:
                    # order in man_order
                    # ERROR: not enough values to unpack (expected 2, got 1)
                    order, paramer = AutoMainPlantSocket._cmdsplit(cmd)
                    if order in man_order:
                        try:
                            ret = self.dosomething(conn, cmd)
                            if ret == 'continuewhile':
                                continue
                            # conn is shutdown while need to breakup
                            elif ret == "quit":
                                break
                        except Exception as e:
                            print("dosomething Exception %s" % str(e))
                            continue
                    # 4、local exec order
                    self._run_cmd(conn, cmd)
        except exception as e:
            print("Exception tcplink:%s....." % str(e))
            self._close_connect(conn)
            self.run()
        finally:
            # last: need release source
            pass

    def run(self):
        """
        1、accpet
        2、while recv
        :return:
        """
        conn, address, thread = None, None, None
        try:
            while True:
                # 1、accept  thie locator it still wait accept
                conn, address = self._accept()
                # tuple1 = (conn, address)
                self._connaddress.append(conn)
                thread = threading.Thread(target=self._tcplink, args=(conn,))
                thread.start()
                self._threads.append(thread)
        except exception as e:
            print("Exception run: %s....." % str(e))
            self._close_thread(threading)
            self._close_connect(conn)
            self._close_socket()
            self.run()
        finally:
            # last: need release source
            pass

    @staticmethod
    def _getinput(format="-->>>"):
        # get keyboard
        inpt = input(format).strip()
        if len(inpt) == 0:
            return None
        # 数据转换
        input_byte = bytes(inpt, 'utf8')
        return input_byte

    def _run_ftp_server(self, conn):
        thread = None
        try:
            # format = "please enter where dir you want to store >>>: "
            # input_dir = AutoMainPlantSocket._getinput(format)
            # # when input_dir is Exst and is dir, go next
            # while not os.path.isdir(input_dir) or not os.path.exists(input_dir):
            #     print("%s is not dir or is not exits" % input_dir)
            #     input_dir = AutoMainPlantSocket._getinput(format)

            thread = threading.Thread(target=self._ftplink, args=(conn, ))
            thread.start()
            # before start thread function-run recv will done once
            self._threads.append(thread)
        except Exception as e:
            print("Exception _run_ftp_server:%s....." % str(e))
            self._close_thread(thread)
            self._run_ftp_server(conn)
        finally:
            # last: need release source
            pass

    def _ftplink(self, conn):
        if conn:
            sever_obj = ServerConfig()
            sever_obj.setConfig(self._address_port[0])
            ftp_server_run(sever_obj)

    # def ftplink(self, conn, filepath):
    #     """
    #     1、正常的情况下 ftp server Thread destory itself  - dosomething is while continue
    #     2、unpack Exception
    #     3、open Exception recv data
    #     4、(2、3)need get last message
    #     :param conn:
    #     :param filepath:
    #     :return:
    #     """
    #     successful_flag = True
    #     try:
    #         self._send(conn, "ftp server thread is start.......")
    #         self._send_poll0(conn)
    #         while True:
    #             file_info = struct.calcsize('128sl')
    #             # file_head_length = int(self._recv(conn, self._MAXBUFFER), 10)
    #             # if file_head_length == "quit":
    #             #     return
    #             file_head = self._recv(conn, file_info)
    #             # if file_head == b"quit":  # connect is colsed  or put finish
    #             #     return
    #             print("file head: %s" % file_head)
    #             if file_head:
    #                 # TypeError: string argument without an encoding unpack(fmt, string)
    #                 file_name, file_size = None, None
    #                 try:
    #                     file_name, file_size = struct.unpack('128sl', file_head)
    #                 except Exception as e:
    #                     print("Exception struct.unpack: %s" % (str(e)))
    #                     successful_flag = False
    #                     while True:
    #                         if "EOF" == self._recv(conn):
    #                             break
    #                     return successful_flag
    #                 # 使用strip()删除打包时附加的多余空字符
    #                 file_name = file_name.decode().strip('\00')
    #                 print(file_name)
    #                 # where you want to save
    #                 # TypeError: Can't mix strings and bytes in path components
    #                 file_new_name = os.path.join(str(filepath, "utf-8"), file_name)
    #                 print('start receiving file name %s size %s...' % (file_name, file_size))
    #                 filew = None
    #                 print("write %s start....." % file_new_name)
    #                 try:
    #                     filew = open(file_new_name, 'wb')
    #                     received_size = 0  # 接收文件的大小
    #                     while not received_size == file_size:
    #                         if file_size - received_size > self._MAXBUFFER:
    #                             r_data = self._recv(conn, self._MAXBUFFER)
    #                             received_size += len(r_data)
    #                         else:
    #                             r_data = conn.recv(file_size - received_size)
    #                             received_size = file_size
    #                         filew.write(r_data)
    #                 except Exception as e:
    #                     except_err = "Exception recv %s data Failed: %s" % (file_name, str(e))
    #                     self._send(conn, except_err)
    #                     successful_flag = False
    #                     self._run_ftp_server(conn)
    #                 finally:
    #                     if filew:
    #                         filew.close()
    #                     msg = None
    #                     if successful_flag:
    #                         msg = ("%s put successful" % file_new_name)
    #                     else:
    #                         msg = ("%s put failed" % file_new_name)
    #                     self._send(conn, msg)
    #                     self._send_poll0(conn)
    #                     while True:
    #                         if "EOF" == self._recv(conn):
    #                             break
    #                     return successful_flag
    #     except Exception as e:
    #         print("Exception ftplink: %s" % str(e))
    #         successful_flag = False
    #     finally:
    #         while True:
    #             if "EOF" == self._recv(conn):
    #                 break
    #         return successful_flag


def server_run():
    AutoMainPlantSocket(None).run()

if __name__ == '__main__':
    server_run()

 

项目源码资源: https://download.csdn.net/download/singebogo/11374821   

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐