Commit f499ec6e by ziho

交换了提出连接请求的主机

parent 2615eb14
import os
import os
import socket
import threading
import time
dst='recv'
tempdir='recvTempFile'
port=43215
delete_command='del'+' ' #'del ' for windows, 'rm ' for linux
def handle_file(sock):
data = b''
while True:
part = sock.recv(1024)
data += part
if len(part) < 1024:
break
filename = data.decode('utf-8')
sock.sendall('ACK'.encode('utf-8'))
with open(os.path.join(tempdir,filename), 'wb') as F:
part = sock.recv(4096)
while part:
F.write(part)
part = sock.recv(4096)
comm='tar -xf '+os.path.join(tempdir,filename)+' -C '+dst
os.system(comm)
comm=delete_command+os.path.join(tempdir,filename)
os.system(comm)
print(filename+'...received!')
if not os.path.exists(dst):
os.system('mkdir '+dst)
print('[info] create dir:'+dst)
if not os.path.exists(tempdir):
os.system('mkdir '+tempdir)
print('[info] create dir:'+tempdir)
runMainLoop=True
def listenforkeyboard():
while True:
aaa=input('[info] input <quit/exit> can quit:\n')
print('you input:'+aaa)
if aaa=='quit' or aaa=='exit':
global runMainLoop
runMainLoop=False
break
a=''
while not (a=='Y' or a=='y'):
a=input('start receiving?<Y or y> :')
t = threading.Thread(target=listenforkeyboard)
t.start()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', port))
server.listen(10)
server.settimeout(1)
while runMainLoop:
try:
sock, _ = server.accept()
except:
pass
else:
n=threading.Thread(target=handle_file, args=(sock,))
n.start()
print('[info] stop and exit! will delete temp file (or not)')
print('[info] and all the thread will end (dont know what will happen)')
\ No newline at end of file
import re
import re
import os
import socket
import threading
import time
ip='218.104.194.130'
port=43215
source='selfplay_data'
packsize=64 #the number of files that 1 '*.tar' file contain
maxthread=5
sleeptime=3 #I will try to change it dynamically
alpha=0.8 #maybe moving average?
#if there are many threads sending at the same time,they may be slow together
delete_command='rm'+' ' #'del ' for windows, 'rm ' for linux
packlist=[]
packindex=0
tempdir='sendTempFile'
if not os.path.exists(tempdir):
os.system('mkdir '+tempdir)
print('[info] create dir:'+tempdir)
def getPrefix(path):
'''
get prefix
'''
filelist=os.listdir(source)
prefix={}
for f in filelist:
matchResult=re.match(r'(agz9_selfplay_zhuque_([-A-Za-z1-9]+)_mlu\d_)(\d+)\.moves',f)
if not matchResult:
continue
pre=matchResult.group(1)
index=int(matchResult.group(3))
if pre in prefix:
prefix[pre]=min(prefix[pre],index)
else:
prefix[pre]=index
return prefix
def sendFileThread(ip, port,packlist,packindex):
packname='tempPack'+str(packindex)+'.tar'
out='sending '+packname+'...'
comm='tar -C '+source+' -cf '+os.path.join(tempdir,packname)
for f in packlist:
comm+=' '+f
os.system(comm)
while True:
time_start=time.time()
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((ip, port))
client.send(packname.encode())
respond = client.recv(10)
if respond == b'ACK':
with open(os.path.join(tempdir,packname), 'rb') as f:
while True:
part = f.read(2048)
if not part:
break
client.sendall(part)
except Exception as e:
print(e)
print(out+'retry')
else:
client.close()
time_end=time.time()
global sleeptime
print(out+'done!total time={:.3f} sec,sleep time={:.3f} sec'.format(time_end-time_start,sleeptime))
# maybe I can use conn time to change sleeptime dynamically?
sleeptime=min(5,alpha*sleeptime+(1-alpha)*(time_end-time_start)*1.375)
os.system(delete_command+os.path.join(tempdir,packname))
break
runMainLoop=True
def listenforkeyboard():
while True:
aaa=input('[info] input <quit/exit> can quit:\n')
print('you input:'+aaa)
if aaa=='quit' or aaa=='exit':
global runMainLoop
runMainLoop=False
break
a=''
while not (a=='Y' or a=='y'):
prefix=getPrefix(source)
print('{} prefixes had found'.format(len(prefix)))
a=input('start sending?<Y or y> :')
t = threading.Thread(target=listenforkeyboard)
t.start()
while runMainLoop:
for key in prefix:
if len(threading.enumerate()) >= maxthread:
continue
path=os.path.join(source,key+str(prefix[key])+'.moves.finish')
if os.path.exists(path):
filename=key+str(prefix[key])+'.moves'
packlist.append(filename)
if len(packlist)==packsize:
if packindex>1000:
packindex=0
t = threading.Thread(target=sendFileThread, args=(ip, port,packlist,packindex))
t.setDaemon(True) #thread will end when main thread end
t.start()
packlist=[]
packindex+=1
time.sleep(sleeptime) #wait for the socket connected
prefix[key]+=1
print('[info] stop and exit! will delete temp file (or not)!')
print('[info] and all the thread will end (dont know what will happen)')
# 修改中,勿使用 # 【修改中】127.0.0.1已完成,外网未测试
## tcp传输 ## tcp传输
...@@ -7,40 +7,37 @@ ...@@ -7,40 +7,37 @@
**1、接收文件的一端,**需打开文件**tcprecv.py**修改参数 **1、接收文件的一端,**需打开文件**tcprecv.py**修改参数
``` python ``` python
ip='127.0.0.1' #目标主机IP
dst='recv' #文件存放地点,会把.moves文件送进去 dst='recv' #文件存放地点,会把.moves文件送进去
tempdir='recvTempFile' #临时文件 tempdir='recvTempFile' #临时文件
port=43215 #随意指定的端口号,没有冲突就不管 port=43215 #随意指定的端口号,没有冲突就不管
```
**2、发送文件的一端**,需打开**tcpsend.py**文件修改参数,必须要改的是IP
``` python
ip='127.0.0.1' #目标主机IP
port=43215 #随意指定的端口号,没有冲突就不管
source='selfplay_data' #存放有.moves和.moves.finish的地方
packsize=64 #打包时多少个文件一组,小批次的话文件到达得就均匀些,文件产生慢的话似乎也没必要
maxthread=5 #最大开启的线程数量 maxthread=5 #最大开启的线程数量
sleeptime=3 #每两个打包文件发送的最少间隔,因为同一时刻有多个线程大家会一起变慢 sleeptime=3 #请求文件的最少间隔,因为同一时刻有多个线程大家会一起变慢
alpha=0.8 #尝试用滑动平均法来控制sleeptime,网络慢的时候就发慢些 alpha=0.8 #尝试用滑动平均法来控制sleeptime,网络慢的时候就发慢些
#alpha是上一时刻sleeptime的权重 #alpha是上一时刻sleeptime的权重
``` ```
**2、发送文件的一端**,需打开**tcpsend.py**文件修改参数,必须要改的是IP
**3、开启接受端** ``` python
port=43215 #随意指定的端口号,没有冲突就不管
source='selfplay_data' #存放有.moves和.moves.finish的地方
packsize=64 #打包时多少个文件一组,小批次的话文件到达得就均匀些,文件产生慢的话似乎也没必要
``` tempdir='sendTempFile' #临时位置
python tcprecv.py waitinglist=[] #这个是还未发送的文件队列,接收到请求时,取出一个进入发送文件的线程
maxExistTempFile=25 #专门有线程负责打包文件,这个是最大打包文件个数,如果打包速度跟不上传输速度,就可能需要把这个数稍微调大
``` ```
需要输入y确认启动,(不启动就ctrl-c),运行中输入quit或exit可以退出
**4、开启发送端**
**3、(在有.moves和.moves.finish的,而且有公网IP的服务器上)开启发送端**
```bash ```bash
...@@ -57,6 +54,17 @@ python tcpsend.py ...@@ -57,6 +54,17 @@ python tcpsend.py
**4、(在没有外网那台服务器)开启接受端**
```
python tcprecv.py
```
需要输入y确认启动,(不启动就ctrl-c),运行中输入quit或exit可以退出
#### 实现 #### 实现
...@@ -75,4 +83,8 @@ $$ ...@@ -75,4 +83,8 @@ $$
​ 网络波动大的时候情况还没测试,因为只测试过127.0.0.1的情况 ​ 网络波动大的时候情况还没测试,因为只测试过127.0.0.1的情况
​ 也许排队论有一些确定方法 ​ 也许排队论有一些确定方法
\ No newline at end of file
2、变成请求文件的模式多了一些步骤,可能还有没考虑的情况
\ No newline at end of file
...@@ -3,34 +3,17 @@ import socket ...@@ -3,34 +3,17 @@ import socket
import threading import threading
import time import time
ip='120.236.247.203'
dst='recv' dst='recv'
tempdir='recvTempFile' tempdir='recvTempFile'
port=43215 port=43215
maxthread=5
sleeptime=1
alpha=0.8
delete_command='rm'+' ' #'del ' for windows, 'rm ' for linux delete_command='rm'+' ' #'del ' for windows, 'rm ' for linux
rbuf=512*1024
def handle_file(sock):
data = b''
while True:
part = sock.recv(1024)
data += part
if len(part) < 1024:
break
filename = data.decode('utf-8')
sock.sendall('ACK'.encode('utf-8'))
with open(os.path.join(tempdir,filename), 'wb') as F:
part = sock.recv(4096)
while part:
F.write(part)
part = sock.recv(4096)
comm='tar -xf '+os.path.join(tempdir,filename)+' -C '+dst
os.system(comm)
comm=delete_command+os.path.join(tempdir,filename)
os.system(comm)
print(filename+'...received!')
if not os.path.exists(dst): if not os.path.exists(dst):
os.system('mkdir '+dst) os.system('mkdir '+dst)
...@@ -39,6 +22,45 @@ if not os.path.exists(tempdir): ...@@ -39,6 +22,45 @@ if not os.path.exists(tempdir):
os.system('mkdir '+tempdir) os.system('mkdir '+tempdir)
print('[info] create dir:'+tempdir) print('[info] create dir:'+tempdir)
def unzip(packname):
comm='tar -xf '+os.path.join(tempdir,packname)+' -C '+dst
os.system(comm)
comm=delete_command+os.path.join(tempdir,packname)
os.system(comm)
def requestThread():
global sleeptime
while True:
time_start=time.time()
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,rbuf)
sock.connect((ip, port))
data = sock.recv(50)
filename = data.decode()
if filename=='wait':
sleeptime=5
break
sock.sendall('ACK'.encode())
with open(os.path.join(tempdir,filename), 'wb') as F:
part = sock.recv(65536)
while part:
F.write(part)
part = sock.recv(65536)
except Exception as e:
print(e)
print('request failed...retry')
else:
#sock.close()
time_end=time.time()
print('request '+filename+'...done!total time={:.3f} sec,sleep time={:.3f} sec'.format(time_end-time_start,sleeptime))
sleeptime=min(5,alpha*sleeptime+(1-alpha)*(time_end-time_start)*1.375)
unzip(filename)
break
runMainLoop=True runMainLoop=True
def listenforkeyboard(): def listenforkeyboard():
while True: while True:
...@@ -56,19 +78,16 @@ while not (a=='Y' or a=='y'): ...@@ -56,19 +78,16 @@ while not (a=='Y' or a=='y'):
t = threading.Thread(target=listenforkeyboard) t = threading.Thread(target=listenforkeyboard)
t.start() t.start()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', port))
server.listen(10)
server.settimeout(1)
while runMainLoop: while runMainLoop:
try: if len(threading.enumerate())<maxthread:
sock, _ = server.accept() t=threading.Thread(target=requestThread)
except: t.setDaemon(True)
pass t.start()
else: time.sleep(sleeptime)
n=threading.Thread(target=handle_file, args=(sock,))
n.start()
print('[info] stop and exit! will delete temp file (or not)') print('[info] stop and exit! will delete temp file (or not)')
print('[info] and all the thread will end (dont know what will happen)') print('[info] and all the thread will end (dont know what will happen)')
\ No newline at end of file
...@@ -4,20 +4,18 @@ import socket ...@@ -4,20 +4,18 @@ import socket
import threading import threading
import time import time
ip='218.104.194.130'
port=43215 port=43215
source='selfplay_data' source='selfplay_data'
packsize=64 #the number of files that 1 '*.tar' file contain packsize=64 #the number of files that 1 '*.tar' file contain
maxthread=5
sleeptime=3 #I will try to change it dynamically
alpha=0.8 #maybe moving average?
#if there are many threads sending at the same time,they may be slow together
delete_command='rm'+' ' #'del ' for windows, 'rm ' for linux packindex=0 #index for tar
packlist=[] maxindex=10000
packindex=0
tempdir='sendTempFile' tempdir='sendTempFile'
waitinglist=[]
maxExistTempFile=25
delete_command='rm'+' ' #'del ' for windows, 'rm ' for linux
if not os.path.exists(tempdir): if not os.path.exists(tempdir):
os.system('mkdir '+tempdir) os.system('mkdir '+tempdir)
...@@ -41,40 +39,63 @@ def getPrefix(path): ...@@ -41,40 +39,63 @@ def getPrefix(path):
else: else:
prefix[pre]=index prefix[pre]=index
return prefix return prefix
def tarFileThread():
global packindex
global prefix
global waitinglist
packlist=[]
while True:
for key in prefix:
if len(waitinglist)>maxExistTempFile:
continue
path=os.path.join(source,key+str(prefix[key])+'.moves.finish')
if os.path.exists(path):
filename=key+str(prefix[key])+'.moves'
packlist.append(filename)
if len(packlist)==packsize:
packname='tempPack'+str(packindex)+'.tar'
comm='tar -C '+source+' -cf '+os.path.join(tempdir,packname)
for f in packlist:
comm+=' '+f
os.system(comm)
waitinglist.append(packname)
packlist=[]
packindex+=1
if packindex>maxindex:
packindex=0
prefix[key]+=1
def sendFileThread(ip, port,packlist,packindex): def sendThread(sock,packname):
packname='tempPack'+str(packindex)+'.tar' global waitinglist
out='sending '+packname+'...'
comm='tar -C '+source+' -cf '+os.path.join(tempdir,packname)
for f in packlist:
comm+=' '+f
os.system(comm)
while True: while True:
time_start=time.time() time_start=time.time()
try: try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if not packname:
client.connect((ip, port)) print('no zip file to send yet!')
client.send(packname.encode('utf-8')) sock.send('wait'.encode())
respond = client.recv(5) break
out='sending '+packname+'...'
sock.send(packname.encode())
respond = sock.recv(10)
if respond == b'ACK': if respond == b'ACK':
with open(os.path.join(tempdir,packname), 'rb') as f: with open(os.path.join(tempdir,packname), 'rb') as f:
while True: while True:
part = f.read(2048) part = f.read(8192)
if not part: if not part:
break break
client.sendall(part) sock.sendall(part)
sock.close()
except Exception as e: except Exception as e:
print(e) print(e)
print(out+'retry') sock.close()
else: waitinglist.append(packname)
client.close() break
else:
time_end=time.time() time_end=time.time()
global sleeptime print(out+'done!used time={:.3f} sec'.format(time_end-time_start))
print(out+'done!total time={:.3f} sec,sleep time={:.3f} sec'.format(time_end-time_start,sleeptime))
# maybe I can use conn time to change sleeptime dynamically?
sleeptime=min(5,alpha*sleeptime+(1-alpha)*(time_end-time_start)*1.375)
os.system(delete_command+os.path.join(tempdir,packname)) os.system(delete_command+os.path.join(tempdir,packname))
break break
...@@ -97,25 +118,31 @@ while not (a=='Y' or a=='y'): ...@@ -97,25 +118,31 @@ while not (a=='Y' or a=='y'):
t = threading.Thread(target=listenforkeyboard) t = threading.Thread(target=listenforkeyboard)
t.start() t.start()
while runMainLoop: t = threading.Thread(target=tarFileThread)
for key in prefix: t.setDaemon(True)
if len(threading.enumerate()) >= maxthread: t.start()
continue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', port))
server.listen(5)
server.settimeout(1)
while runMainLoop:
try:
sock, _ = server.accept()
except:
pass
else:
if len(waitinglist):
packname=waitinglist.pop(0)
else:
packname=None
t=threading.Thread(target=sendThread,args=(sock,packname))
t.setDaemon(True)
t.start()
path=os.path.join(source,key+str(prefix[key])+'.moves.finish')
if os.path.exists(path):
filename=key+str(prefix[key])+'.moves'
packlist.append(filename)
if len(packlist)==packsize:
if packindex>1000:
packindex=0
t = threading.Thread(target=sendFileThread, args=(ip, port,packlist,packindex))
t.setDaemon(True) #thread will end when main thread end
t.start()
packlist=[]
packindex+=1
time.sleep(sleeptime) #wait for the socket connected
prefix[key]+=1
print('[info] stop and exit! will delete temp file (or not)!') print('[info] stop and exit! will delete temp file (or not)!')
print('[info] and all the thread will end (dont know what will happen)') print('[info] and all the thread will end (dont know what will happen)')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment