Commit 620fc45f by huangzhihao

used for transmiss .move file

parents
## tcp传输
#### 使用说明
**1、接收文件的一端,**需打开文件**tcprecv.py**修改参数
``` python
dst='recv' #文件存放地点,会把.moves文件送进去
tempdir='recvTempFile' #临时文件
port=43215 #随意指定的端口号,没有冲突就不管
```
**2、发送文件的一端**,需打开**tcpsend.py**文件修改参数,必须要改的是IP
``` python
ip='127.0.0.1' #目标主机IP
port=43215 #随意指定的端口号,没有冲突就不管
source='selfplay_data' #存放有.moves和.moves.finish的地方
packsize=64 #打包时多少个文件一组,小批次的话文件到达得就均匀些,文件产生慢的话似乎也没必要
maxthread=40 #最大开启的线程数量
sleeptime=1 #每两个打包文件发送的最少间隔,因为同一时刻有多个线程大家会一起变慢
alpha=0.8 #尝试用滑动平均法来控制sleeptime,网络慢的时候就发慢些
#alpha是上一时刻sleeptime的权重
```
**3、开启接受端**
```
python tcprecv.py
```
需要输入y确认启动,(不启动就ctrl-c),运行中输入quit或exit可以退出
**4、开启发送端**
```bash
python tcpsend.py
```
会首先检测有多少种前缀,比如测试的15616个文件里有**1087种前缀**
**一定要等到前缀数量和期望的卡的数量相同时再输入y启动**,随便输入点什么可以重新检测
运行起来就只用下标递增了,没办法再检测
想停下来输入quit或exit可以退出
#### 实现
还没写
#### 问题
1、两个包发送间隔用什么参数来估计?现在的公式是
$$
T_{sleep}=\alpha*T_{sleep}+(1-\alpha)T_{trans}*C
$$
​ T_trans是开始连接到传输完成的单次时延,C是一个常数,这个常数目前是1.2,低了会出现拥挤
​ 网络波动大的时候情况还没测试
​ 也许排队论有一些确定方法
import os
import socket
import threading
import time
dst='recv'
tempdir='recvTempFile'
port=43215
delete_command='rm'+' ' #'del ' for windows, 'rm ' for linux
def handle_file(sock):
data = b''
while True:
part = sock.recv(1024)
data += part
if len(part) < 1024:
break
filename = data.decode('utf-8')
sock.sendall('ACK'.encode('utf-8'))
with open(os.path.join(tempdir,filename), 'wb') as F:
part = sock.recv(4096)
while part:
F.write(part)
part = sock.recv(4096)
comm='tar -xf '+os.path.join(tempdir,filename)+' -C '+dst
os.system(comm)
comm=delete_command+os.path.join(tempdir,filename)
os.system(comm)
print(filename+'...received!')
if not os.path.exists(dst):
os.system('mkdir '+dst)
print('[info] create dir:'+dst)
if not os.path.exists(tempdir):
os.system('mkdir '+tempdir)
print('[info] create dir:'+tempdir)
runMainLoop=True
def listenforkeyboard():
while True:
aaa=input('[info] input <quit/exit> can quit:\n')
print('you input:'+aaa)
if aaa=='quit' or aaa=='exit':
global runMainLoop
runMainLoop=False
break
a=''
while not (a=='Y' or a=='y'):
a=input('start receiving?<Y or y> :')
t = threading.Thread(target=listenforkeyboard)
t.start()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', port))
server.listen(10)
server.settimeout(1)
while runMainLoop:
try:
sock, _ = server.accept()
except:
pass
else:
n=threading.Thread(target=handle_file, args=(sock,))
n.start()
print('[info] stop and exit! will delete temp file (or not)')
print('[info] and all the thread will end (dont know what will happen)')
\ No newline at end of file
import re
import os
import socket
import threading
import time
ip='127.0.0.1'
port=43215
source='selfplay_data'
packsize=128 #the number of files that 1 '*.tar' file contain
maxthread=40
sleeptime=1 #I will try to change it dynamically
alpha=0.8 #maybe moving average?
#if there are many threads sending at the same time,they may be slow together
delete_command='rm'+' ' #'del ' for windows, 'rm ' for linux
packlist=[]
packindex=0
tempdir='sendTempFile'
if not os.path.exists(tempdir):
os.system('mkdir '+tempdir)
print('[info] create dir:'+tempdir)
def getPrefix(path):
'''
get prefix
'''
filelist=os.listdir(source)
prefix={}
for f in filelist:
matchResult=re.match(r'(agz9_selfplay_zhuque_([-A-Za-z1-9]+)_mlu\d_)(\d+)\.moves',f)
if not matchResult:
continue
pre=matchResult.group(1)
index=int(matchResult.group(3))
if pre in prefix:
prefix[pre]=min(prefix[pre],index)
else:
prefix[pre]=index
return prefix
def sendFileThread(ip, port,packlist,packindex):
packname='tempPack'+str(packindex)+'.tar'
out='sending '+packname+'...'
comm='tar -C '+source+' -cf '+os.path.join(tempdir,packname)
for f in packlist:
comm+=' '+f
os.system(comm)
while True:
time_start=time.time()
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((ip, port))
client.send(packname.encode('utf-8'))
respond = client.recv(5)
if respond == b'ACK':
with open(os.path.join(tempdir,packname), 'rb') as f:
while True:
part = f.read(2048)
if not part:
break
client.sendall(part)
except Exception as e:
print(e)
print(out+'retry')
else:
client.close()
time_end=time.time()
global sleeptime
print(out+'done!total time={:.3f} sec,sleep time={:.3f} sec'.format(time_end-time_start,sleeptime))
# maybe I can use conn time to change sleeptime dynamically?
sleeptime=min(5,alpha*sleeptime+(1-alpha)*(time_end-time_start)*1.375)
os.system(delete_command+os.path.join(tempdir,packname))
break
runMainLoop=True
def listenforkeyboard():
while True:
aaa=input('[info] input <quit/exit> can quit:\n')
print('you input:'+aaa)
if aaa=='quit' or aaa=='exit':
global runMainLoop
runMainLoop=False
break
a=''
while not (a=='Y' or a=='y'):
prefix=getPrefix(source)
print('{} prefixes had found'.format(len(prefix)))
a=input('start sending?<Y or y> :')
t = threading.Thread(target=listenforkeyboard)
t.start()
while runMainLoop:
for key in prefix:
if len(threading.enumerate()) >= maxthread:
continue
path=os.path.join(source,key+str(prefix[key])+'.moves.finish')
if os.path.exists(path):
filename=key+str(prefix[key])+'.moves'
packlist.append(filename)
if len(packlist)==packsize:
if packindex>1000:
packindex=0
t = threading.Thread(target=sendFileThread, args=(ip, port,packlist,packindex))
t.setDaemon(True) #thread will end when main thread end
t.start()
packlist=[]
packindex+=1
time.sleep(sleeptime) #wait for the socket connected
prefix[key]+=1
print('[info] stop and exit! will delete temp file (or not)!')
print('[info] and all the thread will end (dont know what will happen)')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment