forked from AZMDDY/xshapi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
150 lines (123 loc) · 4.06 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import xsh.Session
import xsh.Screen
import xsh.Dialog
import re
def sleep(ms: int):
xsh.Session.Sleep(ms)
def get_row_info(row: int, chr_num: int):
"""
获取第row行的前chr_num个字符
row=0表示终端中最新的一行
"""
n_row = xsh.Screen.CurrentRow - row
return xsh.Screen.Get(n_row, 1, n_row, chr_num)
def wait(msg: str, timeout=0):
"""
等待终端当前行出现期望的内容(不区分大小写),直到超时或者找到
timeout: 超时时间,单位秒,当为0时,表示一直等待
"""
ret = False
count = 0
while not ret:
ret = re.search(msg, get_row_info(0, 200), re.I)
if ret:
return True
if timeout > 0 and count >= timeout * 10:
return False
count += 1
sleep(100)
def wait_no(msg: str, timeout=0):
"""
等待终端当前行不出现期望的内容(不区分大小写),直到超时或者不出现
timeout: 超时时间,单位秒,当为0时,表示一直等待
"""
ret = False
count = 0
while ret:
ret = re.search(msg, get_row_info(0, 200), re.I)
if not ret:
return True
if timeout > 0 and count >= timeout * 10:
return False
count += 1
sleep(100)
def send(msg: str, wait_time=200):
"""
向终端发送字符串,然后等待一段时间
wait_time: 命令执行后,等待的时间,单位毫秒
"""
xsh.Screen.Send(msg + "\r")
sleep(wait_time)
def input_passwd(passwd: str):
"""
输入密码
根据终端当前是否有输入密码的提示选择性的输入
"""
if wait("yes/no", 1):
send("yes")
if wait("password", 1):
send(passwd)
def su_root(passwd: str):
"""
切换到root用户
"""
send("su root")
input_passwd(passwd)
def ssh(user: str, passwd: str, host: str, port=22):
"""
ssh自动登录(不能在本地shell中执行)
"""
send("ssh {}@{} -p {}".format(user, host, port), 1000)
input_passwd(passwd)
def ssh_in_local_shell(user: str, passwd: str, host: str, port=22):
"""
在本地shell中ssh自动登录
"""
send("ssh {}:{}@{}:{}".format(user, passwd, host, port), 1000)
if wait("yes/no", 1):
send("yes")
def open_session(user: str, passwd: str, host: str, port=22):
"""
打开一个会话,但不会切换到这个会话,后续命令不会在这个新的会话中执行;
如果当前会话是本地shell,那么将切换到打开的会话中,后续命令也会在这个当前会话中执行;
"""
xsh.Session.Open("ssh://{}:{}@{}:{}".format(user, passwd, host, port))
xsh.Screen.Synchronous = True
sleep(1000)
def scp_in_local_shell(src: str, dst: str, user: str, passwd: str, host: str, port=22, mode=0):
"""
在本地shell中执行scp自动拷贝文件
mode=0: 从远端主机拷贝文件到本地
mode=1: 拷贝本地文件到远端主机
"""
src = src.replace("\\", "/")
dst = dst.replace("\\", "/")
if mode == 0:
send("scp -P {} {}:{}@{}:{} {}".format(port, user, passwd, host, src, dst), 1000)
else:
send("scp -P {} {} {}:{}@{}:{}".format(port, src, user, passwd, host, dst), 1000)
if wait("yes/no", 1):
send("yes")
if wait(src.split("/")[-1], 1):
wait_no(src.split("/")[-1])
def scp(src: str, dst: str, user: str, passwd: str, host: str, port=22, mode=0):
"""
使用scp复制文件(不能在本地shell中执行)
mode=0: 从远端主机拷贝文件到当前设备
mode=1: 拷贝当前设备的文件到远端主机
"""
src = src.replace("\\", "/")
dst = dst.replace("\\", "/")
if mode == 0:
send("scp -P {} {}@{}:{} {}".format(port, user, host, src, dst), 1000)
else:
send("scp -P {} {} {}@{}:{}".format(port, src, user, host, dst), 1000)
input_passwd(passwd)
if wait(src.split("/")[-1], 1):
wait_no(src.split("/")[-1])
def set_screen_sync(sync=True):
"""
设置屏幕是否同步
"""
xsh.Screen.Synchronous = sync
sleep(1000)