使用python 链接 ADCS FTP 服务批量下载和上传文件
Published in:2023-07-31 |
Words: 1.9k | Reading time: 9min | reading:

使用python 链接 ADCS FTP 服务批量下载和上传文件

基础环境

1
2
3
PyCharm IDE
PDCU or ADCU hardware
Python >= 3.8

准备工作

  • ADCS开启FTP服务,端口20或21
  • python编写程序操作

测试环境

1
2
3
ubuntu >= 22.04
python >= 3.8
PyCharm IDE

代码编写

FTP服务开启

基于ubuntu 22.04 系统开启ftp服务,使用python代码测试是否连接成功。

FTP服务开启

  • ssh服务开启

    1
    2
    3
    sudo apt update
    sudo apt install openssh-server
    lsof -i:21
  • ftp服务开启

    1
    2
    3
    4
    5
    6
    7
    sudo apt-get install vsftpd
    # 更改配置
    sudo vi /etc/vsftpd.conf
    local_enable=YES
    write_enable=YES
    # 重启服务生效
    sudo service vsftpd restart
  • ftp与ssh测试

    1
    2
    3
    4
    5
    # ftp 
    ftp ip

    # ssh
    ssh root@ip

sfpt链接服务器

  • 代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @logger.catch
    def ftp_connect(host, port, username, password):
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None
    myHostname = host
    myUsername = username
    myPassword = password

    sftp = pysftp.Connection(host=myHostname, username=myUsername, password=myPassword, cnopts=cnopts)
    logger.debug("Connection succesfully stablished ... ")
    directory_structure = sftp.listdir_attr()
    for attr in directory_structure:
    logger.debug(attr.filename, attr)
    return sftp, directory_structure

sftp批量下载文件

  • 使用sftp.get()方法下载文件,构造指定函数下载
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    @logger.catch
    def download_file(sftp, LocalFile, RemoteFile): # 下载当个文件
    """
    download point file from remote PDCU
    :param sftp:
    :param LocalFile: local file dir
    :param RemoteFile: remote pdcu file
    :return:
    """
    file_handler = open(LocalFile, 'wb')
    logger.debug(" download file dir : " + LocalFile)
    sftp.get(RemoteFile, LocalFile) # 下载目录中文件
    file_handler.close()
    return True
  • 抑或目录:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    @logger.catch
    def download_filetree(sftp, LocalDir, RemoteDir): # 下载整个目录下的文件
    """
    download point file and dir from remote PDCU
    :param sftp:
    :param LocalDir: local file dir
    :param RemoteDir: remote pdcu file
    :return:
    """
    if not os.path.exists(LocalDir):
    os.makedirs(LocalDir)
    logger.debug(" download file dir : " + LocalDir)
    for file in sftp.listdir(RemoteDir):
    Local = os.path.join(LocalDir, file)
    Remote = RemoteDir + "/" + file
    if sftp.isdir(Remote): # 判断是否是文件
    if not os.path.exists(Local):
    os.makedirs(Local)
    download_filetree(sftp, Local, Remote)
    elif sftp.isfile(Remote): # 文件
    download_file(sftp, Local, Remote)
    else:
    logger.error(" you input path or file is incorrect!, detail path or file :" + Remote)
    return "complete"

sftp上传指定文件

  • 参考如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    # 判断sftp服务端中文件路径是否存在,若不存在则创建
    @logger.catch
    def create_dir(sftp, remoteDir):
    try:
    if stat.S_ISDIR(sftp.stat(remoteDir).st_mode): # 如果remoteDir存在且为目录,则返回True
    pass
    except Exception as e:
    sftp.mkdir(remoteDir)
    print("在远程sftp上创建目录:{}".format(remoteDir))


    # 上传
    @logger.catch
    def sftp_upload(sftp, localDir, remoteDir):
    if os.path.isdir(localDir): # 判断本地localDir是否为目录
    for file in os.listdir(localDir):
    remoteDirTmp = os.path.join(remoteDir, file)
    localDirTmp = os.path.join(localDir, file)
    if os.path.isdir(localDirTmp): # 如果本地localDirTmp为目录,则对远程sftp服务器进行判断
    create_dir(sftp, remoteDirTmp) # 判断sftp服务端文件目录是否存在,若不存在则创建
    sftp_upload(sftp, localDirTmp, remoteDirTmp)
    else:
    print("upload file:", localDir)
    try:
    sftp.put(localDir, remoteDir)
    except Exception as e:
    print('upload error:', e)


    if __name__ == '__main__':
    host = '192.168.149.153' # sftp主机
    port = 22 # 端口
    username = 'sftpuser' # sftp用户名
    password = 'passwd' # 密码
    localDir = '/test/sftp' # 本地文件或目录
    remoteDir = '/sftp' # 远程文件或目录(注意远程路径要存在)
    sf = paramiko.Transport((host, port))
    sf.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(sf)
    sftp_upload(sftp, localDir, remoteDir)
    sf.close()

sftp获取文件状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

def clear_dir(path):
"""
清空文件夹:如果文件夹不存在就创建,如果文件存在就清空!
:param path: 文件夹路径
:return:
"""
import os
import shutil
try:
if not os.path.exists(path):
os.makedirs(path)
else:
shutil.rmtree(path)
os.makedirs(path)
return True
except:
return False


@logger.catch
def cmd(self, sftp, command):
"""
执行命令
:param command:
:return:
"""
# with SSHConnectionManager(**self.ssh_args) as ssh:
return sftp.cmd(f"{command}")


@logger.catch
def exists(self, sftp, path):
"""
判断路径是否存在
:param path:
:return:
"""
is_exists = False
# with SSHConnectionManager(**self.ssh_args) as ssh:
result = sftp.cmd(f"find {path}")
if result:
is_exists = True

return is_exists


@logger.catch
def is_file(self, sftp, path):
"""
判断路径是否是文件
:param path:
:return:
"""
if self.exists(path):
# with SSHConnectionManager(**self.ssh_args) as ssh:
prefix = sftp.cmd(f"ls -ld {path}")[0]
if prefix == '-':
return True
else:
return False
else:
return False


@logger.catch
def is_dir(self, sftp, path):
"""
判断路径是否是目录
:param path:
:return:
"""
if self.exists(path):
# with SSHConnectionManager(**self.ssh_args) as ssh:
prefix = sftp.cmd(f"ls -ld {path}")[0]
if prefix == 'd':
return True
else:
return False
else:
return False


@logger.catch
def makedirs(self, sftp, path):
"""
创建目录
:param path: 远程文件夹路径
:return:
"""
# with SSHConnectionManager(**self.ssh_args) as ssh:
cmd_str = f"mkdir -p {path}"
print(cmd_str)
sftp.cmd(cmd_str)

UI编写

UI绘制采用tkinter 第三方库绘制 ,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

class FileDownload:
def __init__(self, master):
self.file_path = StringVar()
self.file_path.set('请输入下载文件地址:')
self.src_dir = ''
self.urls = []
# tkinter刚开始不熟悉,布局麻烦,这里使用的网格布局,row,column 的index,定位cell
open_btn = Button(master, text="链接ftp服务", width=15, height=3, borderwidth=2,
command=self.connect_ftp_service)
download_btn = Button(master, text="开始下载文件", width=15, height=3, borderwidth=2,
command=self.file_download)
# self.file_path = Text(master, width=200, height=3)
# self.file_path.grid(row=2, column=1, columnspan=2, ipadx=10, ipady=10)
open_btn.grid(row=0, column=0, ipadx=5, ipady=3, padx=10, pady=3)
download_btn.grid(row=1, column=0, ipadx=5, ipady=3, padx=10, pady=3)
# 文件输入路径获取
self.file_label = Entry(master, textvariable=self.file_path, fg='green', bg='white')
self.file_label.grid(row=0, column=1, rowspan=2, ipadx=200, ipady=15, padx=10, pady=2) # ,padx=20,pady=10,
# self.console_text = StringVar()
self.text = Text(master, width=200, height=50, bg='white', fg='blue')
self.text.grid(row=2, columnspan=2, ipadx=10, ipady=10)
# self.file_path.pack()
# self.text.setvar("aaaaa")# self.file_name = StringVar()


def server_msg_input(self):
"""

:return: ip port username psd
"""
# 初始化弹出输入对话框
child = tkinter.Tk()
# 弹出对话框的title
child.title('服务器账户信息输入')
child['height'] = 260
child['width'] = 320
child.resizable(0, 0)
# 设置弹出对话的框的标签值
ip_label = tkinter.Label(child, text='IP:', font=('黑体', 12))
ip_label.place(x=25, y=20)
# 弹出输入对话框请获取相应的值
ip_str = tkinter.StringVar()
ip_entry = tkinter.Entry(child, font=('黑体', 12), textvariable=ip_str)
ip_entry.place(x=100, y=20)

port_label = tkinter.Label(child, text='port:', font=('黑体', 12))
port_label.place(x=25, y=60)
# 弹出输入对话框请获取相应的值
port_str = tkinter.StringVar()
port_entry = tkinter.Entry(child, font=('黑体', 12), textvariable=port_str)
port_entry.place(x=100, y=60)

user_name_label = tkinter.Label(child, text='UserName:', font=('黑体', 12))
user_name_label.place(x=25, y=100)
# 弹出输入对话框请获取相应的值
user_name_str = tkinter.StringVar()
user_name_entry = tkinter.Entry(child, font=('黑体', 12), textvariable=user_name_str)
user_name_entry.place(x=100, y=100)

psd_label = tkinter.Label(child, text='PassWord:', font=('黑体', 12))
psd_label.place(x=25, y=140)
# 弹出输入对话框请获取相应的值
psd_str = tkinter.StringVar()
psd_entry = tkinter.Entry(child, font=('黑体', 12), textvariable=psd_str)
psd_entry.place(x=100, y=140)

# 绑定回车键
def Get_key():
global key, edt_ip, edt_port, edt_user_name, edt_pass_word
key = ip_entry.get()
edt_ip = ip_entry.get()
edt_port = port_entry.get()
edt_user_name = user_name_entry.get()
edt_pass_word = psd_entry.get()
if edt_ip != "" and ip_identify(edt_ip) and edt_port != "" and edt_user_name != "" and edt_pass_word != "":
logger.debug(
" user input server msg , ip : " + edt_ip + ",port:" + edt_port + ".user name :"
+ edt_user_name + ", psd : " + edt_pass_word)
connect_ftp_server(self, edt_ip, edt_port, edt_user_name, edt_pass_word)
else:
logger.error(" you input msg is error , please re input!")
showerror(title='错误!', message='您输入信息有误,请重新输入!')
child.destroy()

# 按下回车键返回秘钥
child.bind('<Return>', Get_key)
# 点击OK按钮返回秘钥
btn_OK = tkinter.Button(child, text='确认', font=('黑体', 12), height=2, command=Get_key)
btn_OK.place(x=150, y=200)
# edt_ip, edt_port, edt_user_name, edt_pass_word = Get_key()

# if edt_ip != "" and edt_port != "" and edt_user_name \
# != "" and edt_pass_word != "":
# return edt_ip, edt_port, edt_user_name, edt_pass_word
# return child

其他部分

linux系统目录无法找到解决办法

  • 在代码首部添加如下代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # encoding:utf-8
    # coding=utf-8
    # 导入OS模块
    import os
    import sys

    # 把当前文件所在文件夹的父文件夹路径加入到PYTHONPATH
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

Prev:
基于Nginx的http_sssl_cert双向认证配置
Next:
uvicorn 框架软件编写后发布至ubuntu服务器