import tensorflow as tf
import numpy as np
import os
import logging
import time
import random
import sys
from metrics import update_metric, update_speaker_prob, update_listener_prob, update_R_and_MIS
# hyperparameters
MAX_EP = 10000000 # maximum count of training data
A_LR = 0.002 # learning rate for actor
C_LR = 0.002 # learning rate for critic
CLIP = 0.2
SHAPE_NUM = 3 # M0
COLOR_NUM = 3 # M1
H_SIZE = 20 # agent capacity
log_file ='log_Shape' + str(SHAPE_NUM) + '_Color' + str(COLOR_NUM) + '_Symbol' + str(SYMBOL_DIM) + '_H' + str(H_SIZE)
np.set_printoptions(precision=3, suppress=True)
os.environ['CUDA_VISIBLE_DEVICES'] = sys.argv[1]
logger = logging.getLogger('mylogger')
timestamp = str(int(time.time()))
fh = logging.FileHandler('./run_logs/' + log_file)
ch = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')
class Speaker(object):
def __init__(self, symbol_dim, shape_num, color_num):
self.buffer_t = []
self.buffer_symbol = []
self.buffer_reward = []
self.symbol_dim, self.shape_num, self.color_num = symbol_dim, shape_num, color_num
gpu_options = tf.GPUOptions(allow_growth = True)
config = tf.ConfigProto(gpu_options=gpu_options)
self.sess = tf.Session(config=config)
self.t = tf.placeholder(tf.float32, [None, self.shape_num + self.color_num], 'shape_color')
self.symbol0_prob, self.symbol1_prob, speak_params = self.speak_net(self.t, 'speak_net', trainable = True)
self.old_symbol0_prob, self.old_symbol1_prob, old_speak_params = self.speak_net(self.t, 'old_speak_net', trainable = True)
self.update_prob = [oldp.assign(p) for p, oldp in zip(speak_params, old_speak_params)]
self.tf_symbol_out = tf.placeholder(tf.float32, [None, 2], 'symbol_out')
self.tf_symbol0_out, self.tf_symbol1_out = tf.split(self.tf_symbol_out, [1, 1], axis = -1)
self.tf_reward = tf.placeholder(tf.float32, [None, 1], 'reward')
symbol0_dist = tf.distributions.Categorical(probs = self.symbol0_prob)
symbol1_dist = tf.distributions.Categorical(probs = self.symbol1_prob)
old_symbol0_dist = tf.distributions.Categorical(probs = self.old_symbol0_prob)
old_symbol1_dist = tf.distributions.Categorical(probs = self.old_symbol1_prob)
ratio0 = symbol0_dist.prob(self.tf_symbol0_out) / (old_symbol0_dist.prob(self.tf_symbol0_out) + 1e-5)
ratio1 = symbol1_dist.prob(self.tf_symbol1_out) / (old_symbol1_dist.prob(self.tf_symbol1_out) + 1e-5)
ratio = ratio0 * ratio1
surrogate = ratio * self.tf_reward
self.loss = -tf.reduce_mean(tf.minimum(surrogate, tf.clip_by_value(ratio, 1. - CLIP, 1. + CLIP) * self.tf_reward))
self.train = tf.train.AdamOptimizer(A_LR).minimize(self.loss, var_list = [speak_params])
def learn(self):
t = np.vstack(self.buffer_t)
symbol = np.vstack(self.buffer_symbol)
reward = np.vstack(self.buffer_reward)
rand_indices = np.arange(len(self.buffer_t))
for i in range(BATCH_SIZE):
indices = rand_indices[i]
batch_t = t[indices][np.newaxis, :]
batch_symbol = symbol[indices][np.newaxis, :]
batch_reward = reward[indices][np.newaxis, :]
# update actor and critic in a update loop
_, loss =[self.train, self.loss], feed_dict = {
self.t: batch_t,
self.tf_symbol_out: batch_symbol,
self.tf_reward: batch_reward
# copy speak_params to old_speak_params
# clear buffers
self.buffer_t, self.buffer_symbol, self.buffer_reward = [], [], []
return 0
def choose_symbol(self, t, train_flag = False):
symbol0_prob, symbol1_prob =[self.symbol0_prob, self.symbol1_prob], feed_dict = {self.t: t[np.newaxis, :]})
if train_flag:
U = np.random.uniform(0, 1)
if U <= 1 - EPSILON:
symbol0 = np.random.choice(range(symbol0_prob.shape[1]), p = symbol0_prob.ravel())
symbol1 = np.random.choice(range(symbol1_prob.shape[1]), p = symbol1_prob.ravel())
symbol0 = np.random.choice(range(symbol0_prob.shape[1]))
symbol1 = np.random.choice(range(symbol1_prob.shape[1]))
U = np.random.uniform(0, 1)
if U <= 1 - EPSILON:
symbol0 = np.argmax(np.squeeze(symbol0_prob))
symbol1 = np.argmax(np.squeeze(symbol1_prob))
symbol0 = np.random.choice(range(symbol0_prob.shape[1]))
symbol1 = np.random.choice(range(symbol1_prob.shape[1]))
return symbol0, symbol1, np.squeeze(symbol0_prob), np.squeeze(symbol1_prob)
def choice_symbol(self, symbol):
one_hot = np.zeros(self.symbol_dim)
one_hot[symbol] = 1
return one_hot
def debug_info(self, t):
t, symbol0_prob, symbol1_prob =[
], feed_dict = {self.t: t[np.newaxis, :]})
return t, symbol0_prob, symbol1_prob
def speak_net(self, data_in, name, trainable):
# (lock, key) ---> symbol_out
with tf.variable_scope(name):
h = tf.layers.dense(
inputs = data_in,
units = 2 * H_SIZE,
activation = tf.nn.tanh,
use_bias = False,
kernel_initializer = tf.random_normal_initializer(mean = 0, stddev = 0.3)
h0, h1 = tf.split(h, [H_SIZE, H_SIZE], axis = -1)
o0 = tf.layers.dense(
inputs = h0,
units = self.symbol_dim,
activation = None,
use_bias = False,
kernel_initializer = tf.random_normal_initializer(mean = 0, stddev = 0.3)
data_out0 = tf.nn.softmax(o0)
o1 = tf.layers.dense(
inputs = h1,
units = self.symbol_dim,
activation = None,
use_bias = False,
kernel_initializer = tf.random_normal_initializer(mean = 0, stddev = 0.3)
data_out1 = tf.nn.softmax(o1)
params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope = name)
return data_out0, data_out1, params
def store_transition(self, t, symbol, reward):
class Listener(object):
def __init__(self, symbol_dim, shape_num, color_num):
self.buffer_symbol = []
self.buffer_t_hat = []
self.buffer_reward = []
self.symbol_dim, self.shape_num, self.color_num = symbol_dim, shape_num, color_num
gpu_options=tf.GPUOptions(allow_growth = True)
self.sess = tf.Session(config=config)
self.symbol = tf.placeholder(tf.float32, [None, 2 * self.symbol_dim], 'symbol0_symbol1')
self.symbol0_in, self.symbol1_in = tf.split(self.symbol, [self.symbol_dim, self.symbol_dim], axis = -1)
self.t_hat_prob, listen_params = self.listen_net(self.symbol0_in, self.symbol1_in, 'listen_net', trainable = True)
self.old_t_hat_prob, old_listen_params = self.listen_net(self.symbol0_in, self.symbol1_in, 'old_listen_net', trainable = True)
self.update_prob = [oldp.assign(p) for p, oldp in zip(listen_params, old_listen_params)]
self.tf_t_hat = tf.placeholder(tf.float32, [None, 1], 't_hat')
self.tf_reward = tf.placeholder(tf.float32, [None, 1], 'reward')
t_hat_dist = tf.distributions.Categorical(probs = self.t_hat_prob)
old_t_hat_dist = tf.distributions.Categorical(probs = self.old_t_hat_prob)
ratio = t_hat_dist.prob(self.tf_t_hat) / (old_t_hat_dist.prob(self.tf_t_hat) + 1e-5)
surrogate = ratio * self.tf_reward
self.loss = -tf.reduce_mean(tf.minimum(surrogate, tf.clip_by_value(ratio, 1. - CLIP, 1. + CLIP) * self.tf_reward))
self.train = tf.train.AdamOptimizer(A_LR).minimize(self.loss, var_list = [listen_params])
def learn(self):
symbol = np.vstack(self.buffer_symbol)
t_hat = np.vstack(self.buffer_t_hat)
reward = np.vstack(self.buffer_reward)
rand_indices = np.arange(len(self.buffer_symbol))
for i in range(BATCH_SIZE):
indices = rand_indices[i]
batch_symbol = symbol[indices][np.newaxis, :]
batch_t_hat = t_hat[indices][np.newaxis, :]
batch_reward = reward[indices][np.newaxis, :]
# update actor and critic in a update loop
_, loss =[self.train, self.loss], feed_dict = {
self.symbol: batch_symbol,
self.tf_t_hat: batch_t_hat,
self.tf_reward: batch_reward,
# copy speak_params to old_speak_params
# clear buffers
self.buffer_symbol, self.buffer_t_hat, self.buffer_reward = [], [], []
return 0
def choose_concept(self, symbol, train_flag = False):
t_hat_prob =, feed_dict = {self.symbol: symbol[np.newaxis, :]})
if train_flag:
U = np.random.uniform(0, 1)
if U <= 1 - EPSILON:
t_hat = np.random.choice(range(t_hat_prob.shape[1]), p = t_hat_prob.ravel())
t_hat = np.random.choice(range(t_hat_prob.shape[1]))
U = np.random.uniform(0, 1)
if U <= 1 - EPSILON:
t_hat = np.argmax(np.squeeze(t_hat_prob))
t_hat = np.random.choice(range(t_hat_prob.shape[1]))
return t_hat, np.squeeze(t_hat_prob)
def debug_info(self, symbol):
symbol0_in, symbol1_in, t_hat_prob =[
], feed_dict = {self.symbol: symbol[np.newaxis, :]})
return symbol0_in, symbol1_in, t_hat_prob
def listen_net(self, data_in0, data_in1, name, trainable):
with tf.variable_scope(name):
h0 = tf.layers.dense(
inputs = data_in0,
units = H_SIZE,
activation = tf.nn.tanh,
use_bias = False,
kernel_initializer = tf.random_normal_initializer(mean = 0, stddev = 0.3)
h1 = tf.layers.dense(
inputs = data_in1,
units = H_SIZE,
activation = tf.nn.tanh,
use_bias = False,
kernel_initializer = tf.random_normal_initializer(mean = 0, stddev = 0.3)
h = tf.concat([h0, h1], axis = -1)
o = tf.layers.dense(
inputs = h,
units = self.shape_num * self.color_num,
activation = None,
use_bias = False,
kernel_initializer = tf.random_normal_initializer(mean = 0, stddev = 0.3)
data_out = tf.nn.softmax(o)
params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope = name)
return data_out, params
def store_transition(self, symbol, t_hat, reward):
if __name__ == '__main__':
shape_num = SHAPE_NUM
color_num = COLOR_NUM
symbol_dim = SYMBOL_DIM
# define agent
G0 = tf.Graph()
G1 = tf.Graph()
with G0.as_default():
agent0 = Speaker(symbol_dim, shape_num, color_num)
with G1.as_default():
agent1 = Listener(symbol_dim, shape_num, color_num)
M, MIS, speaker_language, listener_language,\
P_speaker_symbol0_t, P_speaker_symbol1_t, P_listener_t_hat_symbol\
= update_metric(agent0, agent1, shape_num, color_num, symbol_dim)
speaker_buffer_cnt = 0
listener_buffer_cnt = 0
learned_episode = 0
# train speaker first
train_speaker_flag = 1
train_listener_flag = 0
for i in range(MAX_EP):
rand_shape_id = np.random.randint(shape_num)
rand_color_id = np.random.randint(color_num)
t_shape = np.zeros(shape_num)
t_color = np.zeros(color_num)
t_shape[rand_shape_id] = 1
t_color[rand_color_id] = 1
t = np.concatenate((t_shape, t_color))
# agent0's step
symbol0_out_scalar, symbol1_out_scalar, _, _ = agent0.choose_symbol(t, train_speaker_flag)
symbol0_out = agent0.choice_symbol(symbol0_out_scalar)
symbol1_out = agent0.choice_symbol(symbol1_out_scalar)
# collect symbol
symbol = np.concatenate((symbol0_out, symbol1_out))
symbol_scalar = np.array([symbol0_out_scalar, symbol1_out_scalar])
# agent1's step
t_hat_scalar, _ = agent1.choose_concept(symbol, train_listener_flag)
# debug information
t, symbol0_prob, symbol1_prob = agent0.debug_info(t)
symbol0_in, symbol1_in, t_hat_prob = agent1.debug_info(symbol)
# get reward
reward_n = 0
if (t_hat_scalar == (rand_shape_id * color_num + rand_color_id)):
reward_n = 1.0
reward_n = -1.0
# collect t, t_hat, symbol, reward in one iteration
if train_speaker_flag:
agent0.store_transition(t, symbol_scalar, reward_n)
speaker_buffer_cnt += 1
if train_listener_flag:
agent1.store_transition(symbol, t_hat_scalar, reward_n)
listener_buffer_cnt += 1
if(speaker_buffer_cnt == BATCH_SIZE):'--------------------- train speaker ------------------------')
speaker_buffer_cnt = 0
M, MIS, speaker_language, listener_language,\
P_speaker_symbol0_t, P_speaker_symbol1_t, P_listener_t_hat_symbol\
= update_metric(agent0, agent1, shape_num, color_num, symbol_dim)
train_speaker_flag = 0
train_listener_flag = 1
if(listener_buffer_cnt == BATCH_SIZE):'--------------------- train listener -----------------------')
listener_buffer_cnt = 0
M, MIS, speaker_language, listener_language,\
P_speaker_symbol0_t, P_speaker_symbol1_t, P_listener_t_hat_symbol\
= update_metric(agent0, agent1, shape_num, color_num, symbol_dim)
train_speaker_flag = 1
train_listener_flag = 0
if (speaker_buffer_cnt == 0 and listener_buffer_cnt == 0):
# print debug information in some timesteps'rand_shape_id: ' + str(rand_shape_id))'rand_color_id: ' + str(rand_color_id))'')'speaker t: ' + str(t))'speaker symbol: ' + str(symbol))'speaker symbol_scalar: ' + str(symbol_scalar))'speaker symbol0_prob: ' + str(symbol0_prob))'speaker symbol1_prob: ' + str(symbol1_prob))'speaker reward: ' + str(reward_n))'')'listener symbol0_in: ' + str(symbol0_in))'listener symbol1_in: ' + str(symbol1_in))'listener t_hat_scalar: ' + str(t_hat_scalar))'listener t_hat_prob: ' + str(t_hat_prob))'listener reward: ' + str(reward_n))'')
for k0 in range(shape_num):
for k1 in range(color_num):'P(Speaker: symbol0 | t: shape = %d, color = %d) = %s' % (k0, k1, str(P_speaker_symbol0_t[k0][k1])))'')
for k0 in range(shape_num):
for k1 in range(color_num):'P(Speaker: symbol1 | t: shape = %d, color = %d) = %s' % (k0, k1, str(P_speaker_symbol1_t[k0][k1])))'')
for k0 in range(symbol_dim):
for k1 in range(symbol_dim):'P(Listener: t_hat | s: symbol0 = %d, symbol1 = %d) =\n%d %s' % (k0, k1, np.argmax(P_listener_t_hat_symbol[k0][k1]), str(P_listener_t_hat_symbol[k0][k1])))'')'M = ')' ' + str(M[0]))' ' + str(M[1]))'MIS = ' + str(MIS))'')
# test accuracy
right_count = 0
for k in range(shape_num):
for l in range(color_num):
shape = np.zeros(shape_num)
color = np.zeros(color_num)
shape[k] = 1
color[l] = 1
t = np.concatenate((shape, color))
_, _, symbol0_test_prob, symbol1_test_prob = agent0.choose_symbol(t)
symbol0_test = agent0.choice_symbol(np.argmax(symbol0_test_prob))
symbol1_test = agent0.choice_symbol(np.argmax(symbol1_test_prob))
symbol = np.concatenate((symbol0_test, symbol1_test))
_, t_hat_test_prob = agent1.choose_concept(symbol)
target = np.argmax(t_hat_test_prob)
if target == k * color_num + l:
right_count += 1
accuracy = right_count / (shape_num * color_num)'Training data count: %d, accuracy: %f' % (i, accuracy))
if accuracy == 1:'LEARNED! ' + str(learned_episode))
learned_episode += 1
learned_episode = 0'')
# Finish: 10 times in a row, test accuracy = 1
if learned_episode > 10:
metric_file = 'result_logs/' + log_file
with open(metric_file, 'a+') as f:
f.write('M = \n' + str(M) + '\n')
f.write('MIS = ' + str(MIS) + '\n')
f.write('\n')'speaker_language: ')
line = ''
for kk in range(color_num):
if kk == 0:
line += ' color' + str(kk)
line += ' color' + str(kk)
for kk in range(shape_num):'shape' + str(kk) + ' ' + str(speaker_language[kk]))'')'listener_language: ')
alphabet = 'abcdefghijklmnopqrstuvwxyz'
line = ''
for kk in range(symbol_dim):
if kk == 0:
line += ' s1:' + alphabet[kk]
line += ' s1:' + alphabet[kk]
for kk in range(symbol_dim):'s0:' + alphabet[kk] + ' ' + str(listener_language[kk]))'')'Finished!' + '\n')
# Environment
A speaker-listener referential game based on reinforcement learning algorithm
# Agents (Listener and Speaker)
Stochastic Policy Gradient agents without parameter sharing or network connecting
# Code structure
'': contains code for the whole referential game framework
1). class Speaker(): algorithm and structure of the speaker
2). class Listener(): algorithm and structure of the listener
3). main(): the top function of all code, including settings, running process and evaluation of the referential game
'': contrains code for getting the probability distribution about symbols and concepts, and for computing the MIS, which is a metric to measure compositionality in our paper
1). update_speaker_prob(): getting policy and probability distribution of the speaker
2). update_listener_prob(): getting policy and probability distribution of the listener
3). update_R_and_MIS(): getting the metric MIS
4). update_metric(): the top function of ''
# Run
python GPU_ID
for example, if you want use GPU 0,1,2, you can run like: python Agent_algorithm 0,1,2
# Logs
run_logs/log_XXX: contains policies of agents during the training process and the emergent language after trainig
result_logs/log_XXX: contains mutual information matrix M and the metric MIS
import numpy as np
def update_metric(agent0, agent1, shape_num, color_num, symbol_dim):
P_speaker_symbol0_shape, P_speaker_symbol0_color,\
P_speaker_symbol1_shape, P_speaker_symbol1_color,\
speaker_language, P_speaker_symbol, P_speaker_symbol0_t, P_speaker_symbol1_t \
= update_speaker_prob(agent0, shape_num, color_num, symbol_dim)
P_listener_shape_symbol0, P_listener_color_symbol0,\
P_listener_shape_symbol1, P_listener_color_symbol1,\
listener_language, P_listener_t_hat_symbol \
= update_listener_prob(agent1, P_speaker_symbol, shape_num, color_num, symbol_dim)
M, MIS = update_R_and_MIS(
shape_num, color_num, symbol_dim,
P_listener_shape_symbol0, P_speaker_symbol0_shape,
P_listener_shape_symbol1, P_speaker_symbol1_shape,
P_listener_color_symbol0, P_speaker_symbol0_color,
P_listener_color_symbol1, P_speaker_symbol1_color
return M, MIS, speaker_language, listener_language, P_speaker_symbol0_t, P_speaker_symbol1_t, P_listener_t_hat_symbol
def update_speaker_prob(agent0, shape_num, color_num, symbol_dim):
# P_speaker_symbol0_t[i][j][] = P(symbol0 | shape = i, color = j)
# P_speaker_symbol1_t[i][j][] = P(symbol1 | shape = i, color = j)
# P_speaker_symbol_t[i][j][] = P(symbol0, symbol1 | shape = i, color = j)
P_speaker_symbol0_t = np.zeros((shape_num, color_num, symbol_dim))
P_speaker_symbol1_t = np.zeros((shape_num, color_num, symbol_dim))
P_speaker_symbol_t = np.zeros((shape_num, color_num, symbol_dim * symbol_dim))
alphabet = 'abcdefghijklmnopqrstuvwxyz'
speaker_language = []
for i in range(shape_num):
speaker_language_shape_i = []
for j in range(color_num):
shape = np.zeros(shape_num)
color = np.zeros(color_num)
shape[i] = 1
color[j] = 1
t = np.concatenate((shape, color))
_, _, P_speaker_symbol0_t[i][j], P_speaker_symbol1_t[i][j] = agent0.choose_symbol(t)
s0 = alphabet[np.argmax(P_speaker_symbol0_t[i][j])]
s1 = alphabet[np.argmax(P_speaker_symbol1_t[i][j])]
speaker_language_shape_i.append(s0 + s1)
for k in range(symbol_dim):
for l in range(symbol_dim):
# symbol0 = k, symbol1 = l
P_speaker_symbol_t[i][j][k * symbol_dim + l] = P_speaker_symbol0_t[i][j][k] * P_speaker_symbol1_t[i][j][l]
# P_speaker_symbol0_shape[i][] = P(symbol0 | shape = i)
# P_speaker_symbol0_color[i][] = P(symbol0 | color = i)
# P_speaker_symbol1_shape[i][] = P(symbol1 | shape = i)
# P_speaker_symbol1_color[i][] = P(symbol1 | color = i)
P_speaker_symbol0_shape = np.zeros((shape_num, symbol_dim))
P_speaker_symbol0_color = np.zeros((color_num, symbol_dim))
P_speaker_symbol1_shape = np.zeros((shape_num, symbol_dim))
P_speaker_symbol1_color = np.zeros((color_num, symbol_dim))
for i in range(shape_num):
for j in range(color_num):
for k in range(symbol_dim):
P_speaker_symbol0_shape[i][k] += P_speaker_symbol0_t[i][j][k] / color_num
P_speaker_symbol0_color[j][k] += P_speaker_symbol0_t[i][j][k] / shape_num
P_speaker_symbol1_shape[i][k] += P_speaker_symbol1_t[i][j][k] / color_num
P_speaker_symbol1_color[j][k] += P_speaker_symbol1_t[i][j][k] / shape_num
# P_speaker_symbol[k][l] = P(Speaker: symbol0 = k, symbol1 = l)
P_speaker_symbol = np.zeros((symbol_dim, symbol_dim))
for k in range(symbol_dim):
for l in range(symbol_dim):
for i in range(shape_num):
for j in range(color_num):
P_speaker_symbol[k][l] += P_speaker_symbol_t[i][j][k * symbol_dim + l] / (shape_num * color_num)
return P_speaker_symbol0_shape, P_speaker_symbol0_color, P_speaker_symbol1_shape, P_speaker_symbol1_color, speaker_language, P_speaker_symbol, P_speaker_symbol0_t, P_speaker_symbol1_t
def update_listener_prob(agent1, P_speaker_symbol, shape_num, color_num, symbol_dim):
# P_listener_t_hat_symbol[i][j][] = P(shape_hat, color_hat | symbol0 = i, symbol1 = j)
P_listener_t_hat_symbol = np.zeros((symbol_dim, symbol_dim, shape_num * color_num))
shape_list = []
for i in range(shape_num):
shape_list.append('shape' + str(i))
color_list = []
for i in range(color_num):
color_list.append('color' + str(i))
listener_language = []
for i in range(symbol_dim):
listener_language_s0_i = []
for j in range(symbol_dim):
symbol0 = np.zeros(symbol_dim)
symbol1 = np.zeros(symbol_dim)
symbol0[i] = 1
symbol1[j] = 1
tmp = np.concatenate((symbol0, symbol1))
_, P_listener_t_hat_symbol[i][j] = agent1.choose_concept(tmp)
c0 = shape_list[int(np.argmax(P_listener_t_hat_symbol[i][j]) / color_num)]
c1 = color_list[int(np.argmax(P_listener_t_hat_symbol[i][j]) % color_num)]
listener_language_s0_i.append(c0 + ',' + c1)
# P_speaker_symbol0[i] = P(symbol0)
# P_speaker_symbol1[i] = P(symbol1)
P_speaker_symbol0 = np.zeros(symbol_dim)
P_speaker_symbol1 = np.zeros(symbol_dim)
for i in range(symbol_dim):
for j in range(symbol_dim):
P_speaker_symbol0[i] += P_speaker_symbol[i][j]
P_speaker_symbol1[j] += P_speaker_symbol[i][j]
# P_listener_t_hat_symbol0[i][] = P(shape_hat, color_hat | symbol0 = i)
# P_listener_t_hat_symbol1[j][] = P(shape_hat, color_hat | symbol1 = j)
P_listener_t_hat_symbol0 = np.zeros((symbol_dim, shape_num * color_num))
P_listener_t_hat_symbol1 = np.zeros((symbol_dim, shape_num * color_num))
for i in range(symbol_dim):
for j in range(symbol_dim):
for k in range(shape_num * color_num):
P_listener_t_hat_symbol0[i][k] += P_listener_t_hat_symbol[i][j][k] * P_speaker_symbol1[j]
P_listener_t_hat_symbol1[j][k] += P_listener_t_hat_symbol[i][j][k] * P_speaker_symbol0[i]
# P_listener_shape_symbol0[i][] = P(shape_hat | symbol0 = i)
# P_listener_color_symbol0[i][] = P(color_hat | symbol0 = i)
# P_listener_shape_symbol1[i][] = P(shape_hat | symbol1 = i)
# P_listener_color_symbol1[i][] = P(color_hat | symbol1 = i)
P_listener_shape_symbol0 = np.zeros((symbol_dim, shape_num))
P_listener_color_symbol0 = np.zeros((symbol_dim, color_num))
P_listener_shape_symbol1 = np.zeros((symbol_dim, shape_num))
P_listener_color_symbol1 = np.zeros((symbol_dim, color_num))
for i in range(symbol_dim):
for k0 in range(shape_num):
for k1 in range(color_num):
P_listener_shape_symbol0[i][k0] += P_listener_t_hat_symbol0[i][k0 * color_num + k1]
P_listener_color_symbol0[i][k1] += P_listener_t_hat_symbol0[i][k0 * color_num + k1]
P_listener_shape_symbol1[i][k0] += P_listener_t_hat_symbol1[i][k0 * color_num + k1]
P_listener_color_symbol1[i][k1] += P_listener_t_hat_symbol1[i][k0 * color_num + k1]
return P_listener_shape_symbol0, P_listener_color_symbol0, P_listener_shape_symbol1, P_listener_color_symbol1, listener_language, P_listener_t_hat_symbol
def update_R_and_MIS(
shape_num, color_num, symbol_dim,
P_listener_shape_symbol0, P_speaker_symbol0_shape,
P_listener_shape_symbol1, P_speaker_symbol1_shape,
P_listener_color_symbol0, P_speaker_symbol0_color,
P_listener_color_symbol1, P_speaker_symbol1_color
# information entropy of shape
H_shape = 0
for i in range(shape_num):
P = 1. / shape_num
H_shape += P * np.log(1/P)
# information entropy of color
H_color = 0
for i in range(color_num):
P = 1. / color_num
H_color += P * np.log(1/P)
# R(shape, symbol0) = I(shape, shape_hat | symbol0 = s_c) / H(shape)
R_shape_symbol0 = 0
for i in range(shape_num):
for j in range(shape_num):
# get s_c = argmax P_listener_shape_symbol0[s][shape_hat = j] * P_speaker_symbol0_shape[shape = i][s]
P_max = 0
s_c = 0
for k in range(symbol_dim):
if P_max < P_listener_shape_symbol0[k][j] * P_speaker_symbol0_shape[i][k]:
P_max = P_listener_shape_symbol0[k][j] * P_speaker_symbol0_shape[i][k]
s_c = k
# get I(shape, shape_hat | symbol0 = s_c)
P0 = P_listener_shape_symbol0[s_c][j] * P_speaker_symbol0_shape[i][s_c] / shape_num
P1 = 0
for k in range(shape_num):
P1 += P_listener_shape_symbol0[s_c][j] * P_speaker_symbol0_shape[k][s_c] / shape_num
P1 = P1 / shape_num
R_shape_symbol0 += P0 * np.log(P0 / P1)
R_shape_symbol0 /= H_shape
# R(shape, symbol1) = I(shape, shape_hat | symbol1 = s_c) / H(shape)
R_shape_symbol1 = 0
for i in range(shape_num):
for j in range(shape_num):
# get s_c = argmax P_listener_shape_symbol1[s][shape_hat = j] * P_speaker_symbol1_shape[shape = i][s]
P_max = 0
s_c = 0
for k in range(symbol_dim):
if P_max < P_listener_shape_symbol1[k][j] * P_speaker_symbol1_shape[i][k]:
P_max = P_listener_shape_symbol1[k][j] * P_speaker_symbol1_shape[i][k]
s_c = k
# get I(shape, shape_hat | symbol1 = s_c)
P0 = P_listener_shape_symbol1[s_c][j] * P_speaker_symbol1_shape[i][s_c] / shape_num
P1 = 0
for k in range(shape_num):
P1 += P_listener_shape_symbol1[s_c][j] * P_speaker_symbol1_shape[k][s_c] / shape_num
P1 = P1 / shape_num
R_shape_symbol1 += P0 * np.log(P0 / P1)
R_shape_symbol1 /= H_shape
# R(color, symbol0) = I(color, color_hat | symbol0 = s_c) / H(color)
R_color_symbol0 = 0
for i in range(color_num):
for j in range(color_num):
# get s_c = argmax P_listener_color_symbol0[s][color_hat = j] * P_speaker_symbol0_color[color = i][s]
P_max = 0
s_c = 0
for k in range(symbol_dim):
if P_max < P_listener_color_symbol0[k][j] * P_speaker_symbol0_color[i][k]:
P_max = P_listener_color_symbol0[k][j] * P_speaker_symbol0_color[i][k]
s_c = k
# get I(color, color_hat | symbol0 = s_c)
P0 = P_listener_color_symbol0[s_c][j] * P_speaker_symbol0_color[i][s_c] / color_num
P1 = 0
for k in range(color_num):
P1 += P_listener_color_symbol0[s_c][j] * P_speaker_symbol0_color[k][s_c] / color_num
P1 = P1 / color_num
R_color_symbol0 += P0 * np.log(P0 / P1)
R_color_symbol0 /= H_color
# R(color, symbol1) = I(color, color_hat | symbol1 = s_c) / H(color)
R_color_symbol1 = 0
for i in range(color_num):
for j in range(color_num):
# get s_c = argmax P_listener_color_symbol1[s][color_hat = j] * P_speaker_symbol1_color[color = i][s]
P_max = 0
s_c = 0
for k in range(symbol_dim):
if P_max < P_listener_color_symbol1[k][j] * P_speaker_symbol1_color[i][k]:
P_max = P_listener_color_symbol1[k][j] * P_speaker_symbol1_color[i][k]
s_c = k
# get I(color, color_hat | symbol1 = s_c)
P0 = P_listener_color_symbol1[s_c][j] * P_speaker_symbol1_color[i][s_c] / color_num
P1 = 0
for k in range(color_num):
P1 += P_listener_color_symbol1[s_c][j] * P_speaker_symbol1_color[k][s_c] / color_num
P1 = P1 / color_num
R_color_symbol1 += P0 * np.log(P0 / P1)
R_color_symbol1 /= H_color
# get normalized mutual information matrix M
M = np.zeros((2,2))
M[0,0] = R_shape_symbol0
M[1,0] = R_color_symbol0
M[0,1] = R_shape_symbol1
M[1,1] = R_color_symbol1
# get our metric MIS
MIS = get_MIS(R_shape_symbol0, R_color_symbol0, R_shape_symbol1, R_color_symbol1)
return M, MIS
def get_MIS(R_shape_symbol0, R_color_symbol0, R_shape_symbol1, R_color_symbol1):
# cosine simularity between (R_shape_symbol0, R_color_symbol0) and a 2-dim one-hot vector
cos_symbol0 = np.max([R_shape_symbol0, R_color_symbol0]) / (1e-5 + np.sqrt(np.square(R_shape_symbol0) + np.square(R_color_symbol0)))
# cosine simularity between (R_shape_symbol1, R_color_symbol1) and a 2-dim one-hot vector
cos_symbol1 = np.max([R_shape_symbol1, R_color_symbol1]) / (1e-5 + np.sqrt(np.square(R_shape_symbol1) + np.square(R_color_symbol1)))
# average simularity
MIS0 = (cos_symbol0 + cos_symbol1) / 2
# normalization
MIS = 2*(MIS0 - 1/2)
return MIS
