#coding:utf-8importparamikoimportosfromcommon.yaml_info.configure_yamlimport*config_data=get_yaml(file_name="config.yaml") ip=config_data["ip"] port=config_data["port_ssh"] username=config_data["username"] password=config_data["password"] classFileUtil: defupload_file(local_path, remote_path): """ 上传文件到远程目录中 :param local_path:本地文件目录 :param remote_path:远程文件目录 :return: """transport=paramiko.Transport((ip, port)) try: transport.connect(username=username, password=password) sftp_client=paramiko.SFTPClient.from_transport(transport) sftp_client.put(local_path, remote_path) sftp_client.close() transport.close() returnTrueexcept: transport.close() print("ssh连接失败") returnFalsedefwrite_file(content, remote_path): """ 向远程文件内写入内容,该方法为替换 :param content: 文件的字符串行 :param remote_path: 远程文件目录 :return: """transport=paramiko.Transport((ip, port)) try: transport.connect(username=username, password=password) sftp_client=paramiko.SFTPClient.from_transport(transport) fo=sftp_client.open(filename=remote_path, mode='w') fo.writelines(content) fo.flush() sftp_client.close() transport.close() returnTrueexcept: transport.close() print("ssh连接失败") returnFalsedefbatch_upload_files(local_path, remote_path): """ 批量上传文件到远程目录中 :param local_path:本地文件目录 :param remote_path:远程文件目录 :return: """transport=paramiko.Transport((ip, port)) try: file_list= [] transport.connect(username=username, password=password) sftp_client=paramiko.SFTPClient.from_transport(transport) files=os.listdir(local_path) files.remove("__init__.py") print(files) forfinfiles: # if f == "__init__.py":# continuesftp_client.put(os.path.join(local_path, f), os.path.join(remote_path, f)) file_list.append(f.split('.')[0]) sftp_client.close() transport.close() returnfile_listexcept: transport.close() print("ssh连接失败") returnFalse