The Code#
Here you can read the source code that we used. First you have the code relating to how we queried the model, and then the code relating to the actual analysis of the results. You can also find the notebooks here for donwload if you prefer to better investigate as we worked primarily with notebooks.
Mistral Queries#
from google.colab import drive
drive.mount('/content/drive')
# To get the maximum number of tokens that the responses need to be in order to reduce the cost of API usage
from mistral_common.tokens.tokenizers.mistral import MistralTokenizer
from mistral_common.protocol.instruct.tool_calls import Function, Tool, ToolCall, FunctionCall
from mistral_common.tokens.instruct.normalize import ChatCompletionRequest
from mistral_common.protocol.instruct.messages import (
AssistantMessage,
UserMessage,
ToolMessage
)
tokenizer_v3 = MistralTokenizer.v3()
phrase = "10"
tokenized = tokenizer_v3.encode_chat_completion(
ChatCompletionRequest(
tools=[],
messages=[UserMessage(content=phrase)],
model="test",
)
)
tokens, text = tokenized.tokens, tokenized.text
len(tokens)
# Actual implementation of the client API to query the model
from google.colab import userdata
from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage
from mistralai.exceptions import MistralException
import time
api_key = userdata.get('mistral-key')
client = MistralClient(api_key=api_key)
model = "open-mixtral-8x22b"
def generate_response_api(template, question, max_tokens=5, max_retries=3, retry_delay=2):
retries = 0
while retries < max_retries:
try:
messages = [
ChatMessage(role="system", content=template),
ChatMessage(role="user", content=question)
]
chat_response = client.chat(model=model, messages=messages, max_tokens=5)
return chat_response.choices[0].message.content
except MistralException as e:
if "ReadTimeout" in str(e):
print(f"#{retries}: Request timed out. Retrying...")
time.sleep(retry_delay)
retries += 1
else:
raise e # Reraise if it's not a timeout issue
raise TimeoutError("Exceeded maximum number of retries")
# Logic for importing templates and questions automatically from file
import csv
import json
import pandas as pd
import random
questions = pd.read_csv("questions.csv")
templates = pd.read_csv("templates.csv")
result = pd.merge(templates.assign(key=0), questions.assign(key=0), on='key').drop('key', axis=1)
def test_generate_response(template, question, id_set=None, id_question=None):
response = random.randint(1, 10)
return (response)
def save_checkpoint(result_rows, checkpoint_path):
pd.DataFrame(result_rows).to_csv(checkpoint_path, index=False)
def load_checkpoint(checkpoint_path):
if os.path.exists(checkpoint_path):
try:
return pd.read_csv(checkpoint_path)
except pd.errors.EmptyDataError:
print("Checkpoint file is empty. Returning None.")
return None
else:
print("Checkpoint file not found. Returning None.")
return None
def ask_question(df, checkpoint_path=None, max_tokens=5):
grouped_profile = df.groupby('id_country')
outputs = []
result_rows = []
iteration_count = 0 # checkpoint granularity for iterations instead of rows
# Load checkpoint if available
if checkpoint_path:
checkpoint = load_checkpoint(checkpoint_path)
if checkpoint is not None:
result_rows = checkpoint.to_dict('records')
# loop through each group of country
for profile_id, group_profile_df in grouped_profile:
#print(country)
#print(group_profile_df)
grouped_id_set = group_profile_df.groupby('id_set')
# loop through each group of id_set
for set_id, group_set_df in grouped_id_set:
#print(set_id)
# Get the corresponding txt_template based on id_set
id_set_str = str(set_id)
#print(id_set_str)
template_column = f"txt_template_{id_set_str}"
#print(template_column)
template = group_set_df.iloc[0][template_column]
#print(template)
# shuffle questions
shuffled_set = group_set_df.sample(frac=1)
#print(shuffled_set)
# Loop through each row in the group
for index, row in shuffled_set.iterrows():
# Get the txt_question
id_country = row['id_country']
id_question = row['id_question']
id_set = row['id_set']
question = row['txt_question']
#print(id_set, id_question, question)
output = generate_response_api(template=template, question=question, max_tokens=max_tokens)
result_row = {
'id_country': id_country,
#'template': template_column,
'id_set': id_set,
'id_question': id_question,
#'txt_question': question,
'output': output
}
#print(result_row)
result_rows.append(result_row)
# Save checkpoint periodically
iteration_count += 1
if checkpoint_path and iteration_count % 10 == 0: # 10 means 10 iteration, change for more or less granularity
save_checkpoint(result_rows, checkpoint_path)
#print(iteration_count)
if checkpoint_path:
save_checkpoint(result_rows, checkpoint_path)
result_df = pd.DataFrame(result_rows)
return result_df
df_list = []
iteration_count = 0
for e in range(1):
df = ask_question(result, "checkpoint")
df['id_round'] = iteration_count
df_list.append(df)
iteration_count += 1
print(f"Progress: Round number {iteration_count}…")
final_df = pd.concat(df_list)
final_df.sort_values(by=['id_country', 'id_round'], inplace=True)
final_df.reset_index(drop=True, inplace=True)
# changing id_round column index
id_round_index = final_df.columns.get_loc('id_round')
final_df.insert(1, 'id_round', final_df.pop('id_round'))
final_df.to_pickle('result_question.pkl')
Result Analysis#
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import statsmodels.api as sm
import geopandas as gpd
from mpl_toolkits.axes_grid1 import make_axes_locatable
from shapely.geometry import Polygon
from google.colab import drive
drive.mount('/content/drive')
path_result = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/result/'
"""## Preprocessing - Mistral's answers
- Creation of the dataset used for visualization of Mistral_EU, Mistral_US and Mistral_Neutral
- Creation of the dataset used for the regression comparing Mistral_US-Mistral_Neutral and Mistral_EU-Mistral_Neutral
"""
# Import
path_brut_set1_2 = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/brut/final_result_mistral_questions.pkl'
path_brut_set3 = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/brut/final_result_mistral_questions_set_3.pkl'
data_brut_set1_2 = pd.read_pickle(path_brut_set1_2)
data_brut_set3 = pd.read_pickle(path_brut_set3)
data_brut = pd.concat([data_brut_set1_2, data_brut_set3])
# Cleaning
def clean_score(score:str):
"""
Score is cleaned by remove Mistral's outputs without number, and select the number of the output only
if Mistral added useless text in its answer
"""
try: #test if the score is already an integer (nothing to do)
return int(score)
except:
figures = [int(i) for i in score.split() if i.isdigit()]
if (len(figures) == 1) and (figures[0] in [i for i in range(11)]): #test if the score contains a single number in [0:10]
result = figures[0]
else:
result = np.nan
return result
def change_scale(row_id_set, row_result):
"""
Questions from set 3 are graded between 1 and 6, the function change the scale into [0:10]
allowing the regression between all sets of question
"""
return (row_result - 1) * 10 / 5 if row_id_set == 3 else row_result
def change_round(row_id_set, row_round):
"""
Questions from set 3 have rounds between 0 and 99, the function change the scale into [1:100]
"""
return row_round + 1 if row_id_set == 3 else row_round
def stat_display(data: pd.DataFrame):
df, df_clean = data, data.dropna()
print(f"- - - - - - - - - - - - - DATASET- - - - - - - - - - - - -")
print(f"The dataset adresses {len(df.id_country.unique())} different personnation which are {df.id_country.unique()}")
print(f"It is supposed to contain {len(df.id_set.unique())} sets of respectively {[len(df[df.id_set==i].id_question.unique()) for i in df.id_set.unique().tolist()]} questions")
for i in df.id_set.unique().tolist():
print(f"------ Set {i} -------")
print(f"=> {len(df[df.id_set==i].id_round.unique())} rounds performed")
for q in df[df.id_set==i].id_question.sort_values().unique().tolist():
nb_ask = len(df[(df.id_set==i) & (df.id_question==q)])
nb_answer = len(df_clean[(df_clean.id_set==i) & (df_clean.id_question==q)])
percentage = round(nb_answer/nb_ask, 2)*100
print(f"==> Q{q} asked {len(df[(df.id_set==i) & (df.id_question==q)])} times ====> {percentage}% answered")
print("------------------------")
print("")
data_brut.output = data_brut.output.apply(clean_score) #Clean the answer and make non conventional answer NaN value
data_brut = data_brut.rename({'output': 'result'}, axis='columns') #change column name
data_brut.result= data_brut.apply(lambda row: change_scale(row.id_set, row.result), 1) # change scale for set_3
data_brut.id_round= data_brut.apply(lambda row: change_round(row.id_set, row.id_round), 1) # change rounds for set_3
stat_display(data_brut)
data_brut = data_brut.dropna()
"""As we can see Q8 can be removed as only 5% of questions has been correctly answered"""
# Update of question names
data_brut = data_brut[~((data_brut.id_set == 2) & (data_brut.id_question == 8))] # Remove Q8 of Set 2
# Transform the name of question to have in the set 2 question from 1 to 11
def replace(row_id_set, row_id_question):
"""
Rename question between 1 to 12 without q8 to questions 1 to 11
"""
return row_id_question -1 if ((row_id_set == 2) and (row_id_question > 8)) else row_id_question
data_brut.id_question = data_brut.apply(lambda row: replace(row.id_set, row.id_question), 1)
data_brut = data_brut.rename({'output': 'result'}, axis='columns') #change column name
#Save
path_brut_clean = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/final_result.csv'
data_brut.to_csv(path_brut_clean, index=False)
"""#### Creation of the dataset used for visualization of Mistral_EU, Mistral_US and Mistral_Neutral
=> Calculate the score of each round of each category (Neutral, US, EU) on the 2 dimensions (set1 and set2)
"""
# Calculation of the avg score
"""
There is no methodology to calcul a meaning score reflecting all questions,
we decide to use a basic score that give the meaning score, without any ponderation
"""
df_viz = data_brut.groupby(
['id_country', 'id_round', 'id_set'], as_index=False
).result.mean()
path_viz = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/result_mistral_viz.csv'
df_viz.to_csv(path_viz, index=False)
"""#### Creation of the dataset used for the regression comparing Mistral_US-Mistral_Neutral and Mistral_EU-Mistral_Neutral
=> Calculate the score each category (Neutral, US, EU) on the 2 dimensions (set1 and set2) for the regression
"""
# Apply the bootstrap mean calcul
def bootstrap_mean(data: list, num_samples=1000)-> float:
"""
Returns the estimated mean of a 1000 times boostrapped mean for each round
https://www.askpython.com/python/examples/bootstrap-sampling-introduction
"""
bootstrap_means = np.zeros(num_samples)
# Perform bootstrap sampling
for i in range(num_samples):
bootstrap_sample = np.random.choice(data, size=len(data), replace=True)
bootstrap_mean = np.mean(bootstrap_sample)
bootstrap_means[i] = bootstrap_mean
estimated_mean = np.mean(bootstrap_means)
return estimated_mean
df_bootstrap= data_brut.groupby(
['id_country', 'id_set', 'id_question'], as_index=False
).apply(
lambda x: bootstrap_mean(x.result.tolist())
).rename(
columns={None: "bootstrapped_result"}
)
df_bootstrap.head()
path_reg = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/result_mistral_reg.csv'
df_bootstrap.to_csv(path_reg, index=False)
"""## Preprocessing - EU survey's answers per country"""
path_survey = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/brut/Data_on_survey_responses.xlsx'
#Only question T21 to T31 and T35 to T46 are used for this study
"""
T21 to T31 have score between 0 and 10, with the answer "don't know" (value -1)
T35 to T46 (without T42) have score between 0 and 10, with the answer "don't know" (value -1) and "not applicable" (value -2)
"""
set1 = ['T'+str(i) for i in range(21,32)] # EU values
set2 = ['T'+str(i) for i in range(35,47)] # Identity
set3 = ['T'+str(i) for i in range(5,18)] # Personnal values
set2.remove('T42') # remove T42 (q8)
sheet_names = set1 + set2 + set3
questions_dict = {key: i for (key,i) in zip(sheet_names, [k for k in range(1, len(set1) + 1)] + [k for k in range(1, len(set2) + 1)]+ [k for k in range(1, len(set3) + 1)]) }
def change_scale1(row_id_set, row_score):
"""
Questions from set 3 are graded between 1 and 6, the function change the scale into [0:10]
allowing the regression between all sets of question
"""
return (row_score - 1) * 10 / 5 if row_id_set == 3 else row_score
def clean_survey(sheet_name:str, path:str) -> pd.DataFrame:
"""
Return a cleaned dataframe, organized as a SQL database
"""
# Import
df = pd.read_excel(path, sheet_name=sheet_name)
# Different format for the 2 sets
if sheet_name in ['T'+str(i) for i in range(21,32)]:
id_set, shape_crop = 1, 33
new_values = [i for i in range(0, 11)] + [-1] #-1 means people answer "don't know", -2 means "not applicable"
elif sheet_name in ['T'+str(i) for i in range(35,47)]:
id_set, shape_crop = 2, 35
new_values = [i for i in range(0, 11)] + [-2, -1]
else:
id_set, shape_crop = 3, 24
new_values = [i for i in range(1, 7)] + [-1] #-1 means people answer "don't know", -2 means "not applicable"
#Cleaning
df= df[7:shape_crop][
(~df['Unnamed: 1'].isna()) | (df['Unnamed: 2'] == 'UE27 EU27')
].drop(['Unnamed: 0', 'Unnamed: 1'], axis=1)
df.columns = df.iloc[0]
df = df[2:]
df.insert(loc=0, column='score', value=new_values) #new names for scores of question
df.insert(loc=0,column='id_question',value=sheet_name) #add the question number
df.insert(loc=0, column='id_set', value=id_set) #id of the group of question
df = df.melt(id_vars=['id_set','id_question', 'score'], var_name='id_country') #pivot to have the SGL standard
df['id_country'] = df['id_country'].replace({'UE27 EU27': 'EU27'}) # Replace ugly name
df.score = df.apply(lambda row: change_scale1(row.id_set, row.score), 1) # change scale for set_3
#New column - percentage of answering
df.value = df.value.replace('-', '0')
df.value = df.value.astype(int)
df['percentage'] = df.value / df.groupby(['id_country', 'id_set', 'id_question']).value.transform('sum')
return df
def brut_survey(path:str, sets=[set1, set2, set3]) -> pd.DataFrame:
"""
Returns a big dataframe with all reponses of the survey, cleaning, in SQL standard
"""
first = True
set1, set2, set3 = sets[0], sets[1], sets[2]
sheet_names = set1 + set2 + set3
questions_dict = {key: i for (key,i) in zip(sheet_names, [k for k in range(1, len(set1)+1)] + [k for k in range(1, len(set2)+1)] + [k for k in range(1, len(set3)+1)]) }
for sheet_name in sheet_names:
print(sheet_name)
if first:
df = clean_survey(sheet_name, path)
first = False
else:
df = pd.concat([df, clean_survey(sheet_name, path)])
df['id_question'] = df['id_question'].replace(questions_dict) # Question names
df = df[df.score >= 0] #Remove "don't know" and "not applicable" answers
return df
#%%capture
survey_clean = brut_survey(path_survey)
# Save
path_save = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/survey_clean.csv'
survey_clean.to_csv(path_save, index=False)
"""## Analysis"""
# Import
path_brut = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/final_result.csv'
path_reg = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/result_mistral_reg.csv'
path_viz = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/result_mistral_viz.csv'
path_survey = '/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/final/survey_clean.csv'
df_brut = pd.read_csv(path_brut)
df_reg = pd.read_csv(path_reg)
df_viz = pd.read_csv(path_viz)
df_survey = pd.read_csv(path_survey)
"""### Is Mistral european or american biased?
#### Data Visualization
##### Boxplot
"""
# Boxplots
# Change the shape for ploting
df_brut2 = df_brut.pivot_table('result', ['id_country','id_round','id_set'], 'id_question').reset_index()
df_brut2.columns = df_brut2.columns.rename('')
# Plot per question
def boxplot(df:pd.DataFrame, id_set:int, title:str, figsize=None, real_scale=False):
ax = df[df.id_set == id_set].drop(['id_round', 'id_set'], axis=1).plot.box(
by='id_country',
figsize=figsize,
grid=True,
title=title,
ylim = (0, 10) if real_scale else None
)
return df[df.id_set == id_set].drop(['id_round', 'id_set'], axis=1).describe()
def stats(df:pd.DataFrame, id_set:int):
return df[df.id_set == id_set].drop(['id_round', 'id_set'], axis=1).groupby(['id_country']).describe().transpose()
#Save results
for i in [1,2,3]:
stats(df=df_brut2, id_set=i).to_csv(f'/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/result/boxplot_set{i}.csv')
# Adjusted scales
boxplot(df=df_brut2, id_set=1, title='Repartition of answers to questions on EU values', figsize=(50,6))
boxplot(df=df_brut2, id_set=2, title='Repartition of answers to questions on identity', figsize=(50,6))
boxplot(df=df_brut2, id_set=3, title='Repartition of answers to questions on personal values', figsize=(50,6))
plt.savefig(path_result + 'boxplot_allquestions.png')
# Real scales from 0 to 10
boxplot(df=df_brut2, id_set=1, title='Repartition of answers to questions on EU values', figsize=(50,6), real_scale=True)
boxplot(df=df_brut2, id_set=2, title='Repartition of answers to questions on identity', figsize=(50,6), real_scale=True)
boxplot(df=df_brut2, id_set=3, title='Repartition of answers to questions on personal values', figsize=(50,6), real_scale=True)
#plt.savefig(path_result + 'boxplot_allquestions_V2.png')
# Boxplots by dimension
# Change the shape for ploting
df_viz2 = df_viz.pivot_table('result', ['id_country','id_round'], 'id_set').reset_index()
df_viz2.columns = df_viz2.columns.rename('')
df_viz2 = df_viz2.rename(columns={1: 'EU values', 2: 'Identity', 3:'Personal values'})
df_viz2 =df_viz2.drop(['id_round'], axis=1)
# Plot per question
ax_adjusted_scale = df_viz2.plot.box(by='id_country', figsize=(15,6), grid=True)
ax_real_scale = df_viz2.plot.box(by='id_country', figsize=(15,6), grid=True, ylim = (0, 10))
#To save
#plt.savefig(path_result + 'boxplot_dimensions.png')
df_viz2.groupby(['id_country']).describe().transpose()
"""#### Scatter Plot
##### 3 dimensions
"""
# Plot 3D for the 3 dimensions
def plot_3D(df:pd.DataFrame, adjusted_scale=True):
fig = px.scatter_3d(df, x='EU values', y='Identity', z='Personal values',color='id_country')
if not adjusted_scale:
fig.update_layout(autosize=False,
height=600,
width=600,
scene=dict(
xaxis = dict(nticks=6, range=[0,10],),
yaxis = dict(nticks=6, range=[0,10],),
zaxis = dict(nticks=6, range=[0,10],)
)
)
fig.show()
plot_3D(df_viz2, adjusted_scale=False)
plot_3D(df_viz2)
"""##### 2 dimensions"""
# Vizualization of all position of the 2 axis
def plot_2d(df: pd.DataFrame, axis_x:str, axis_y:str, name=None, adjusted_scale=True):
fig, ax = plt.subplots()
ax.plot(df[df.id_country == 'US'][axis_x], df[df.id_country == 'US'][axis_y], marker='d', color='g', linestyle='', ms=5, label='Mistral_US')
ax.plot(df[df.id_country == 'EU'][axis_x], df[df.id_country == 'EU'][axis_y], marker='^', color='b', linestyle='', ms=5, label='Mistral_EU')
ax.plot(df[df.id_country == 'NEUTRAL'][axis_x], df[df.id_country == 'NEUTRAL'][axis_y], marker='o', color='r', linestyle='', ms=5, label='Mistral_Neutral')
if not adjusted_scale:
plt.xticks([i for i in range(11)])
plt.yticks([i for i in range(11)])
plt.title(f'Comparison on the {axis_x} and {axis_y} scores of different impersonation of Mistral')
plt.xlabel(axis_x)
plt.ylabel(axis_y)
ax.legend()
plt.grid()
plt.show()
if name:
plt.savefig(name)
#Choose axis among 'EU values', 'Identity', and 'Personal values'
plot_2d(df_viz2, axis_x='Identity', axis_y='Personal values', adjusted_scale=True)
# Same with another scale
plot_2d(df_viz2, axis_x='Identity', axis_y='Personal values', adjusted_scale=False)
"""#### Regression
"""
def RegLin(imp:str, data:pd.DataFrame, name_LR:str):
X = np.array(data[imp].tolist()).reshape((-1, 1))
y = np.array(data['NEUTRAL'].tolist()).reshape((-1, 1))
X = sm.add_constant(X)
results = sm.OLS(y, X).fit()
# Latex
latex_file = '\\documentclass{article} \n\\usepackage{graphicx} \n\\usepackage{booktabs} \n\\title{bias}\n\\author{Anthony Ammendolea}\n\\date{April 2024}\n\\begin{document}\n'
for table in results.summary().tables:
latex_file += table.as_latex_tabular()
latex_file += '\n\\end{document}'
# Save
path = f'/content/drive/Shareddrives/(NP)SP24M1S2-DecodingAIBiases_GProject/Data/result/LR_{name_LR}.txt'
text_file = open(path, "w")
text_file.write(latex_file)
text_file.close()
return latex_file, results.summary()
"""##### Regression for all questions
Linear regression of the equation $NeutralMistral_i = \beta_0 + \beta_1 . ImpMistral_i $ where $ImpMistral $ is either $Mistral{EU}$ or $Mistral{US}$ and where each $i$ is the 1000-times bootstrapped mean of a question
"""
# Regression with the 3 dimensions
# Reshape the df for the regression
df_reg2 = df_reg.copy()
df_reg2['id_question'] = df_reg2['id_set'].astype(str) + pd.Series(['_' for i in range(len(df_reg2))]) + df_reg2['id_question'].astype(str)
df_reg2 = df_reg2.pivot_table('bootstrapped_result', ['id_question'], 'id_country').reset_index(drop=True)
latex_file_US, result_US = RegLin(imp='US', data=df_reg2, name_LR='3dim_US')
latex_file_EU, result_EU = RegLin(imp='EU', data=df_reg2, name_LR='3dim_EU')
print(result_US)
print(result_EU)
# Regression with the 2 dimensions => without EU values
df_reg3 = df_reg[df_reg.id_set.isin([2,3])].reset_index(drop=True).copy()
df_reg3['id_question'] = df_reg3['id_set'].astype(str) + pd.Series(['_' for i in range(len(df_reg3))]) + df_reg3['id_question'].astype(str)
df_reg3 = df_reg3.pivot_table('bootstrapped_result', ['id_question'], 'id_country').reset_index(drop=True)
latex_file_US, result_US = RegLin(imp='US', data=df_reg3, name_LR='2dim_US')
latex_file_EU, result_EU = RegLin(imp='EU', data=df_reg3, name_LR='2dim_EU')
print(result_US)
print(result_EU)
"""##### Regression for the 2 dimensions
Linear regression of the equation $NeutralMistral_i = \beta_0 + \beta_1 . ImpMistral_i $ where $ImpMistral $ is either $Mistral{EU}$ or $Mistral{US}$ and where each $i \in \{value, identity\}$ is the means of the 1000-times bootstrapped mean per question of each dimension
"""
# Calculation of the avg score for each dimension
"""
There is no methodology to calcul a meaning score reflecting all questions,
we decide to use a basic score that give the meaning score, without any ponderation
"""
df_reg4 = df_reg.copy()
df_reg4= df_reg4.groupby(
['id_country', 'id_set'], as_index=False
).bootstrapped_result.mean().rename(
columns={'bootstrapped_result': 'final'}
)
# Reshape the df for the regression
df_reg4 = df_reg4.pivot_table('final', ['id_set'], 'id_country').reset_index(drop=True)
latex_file_US, result_US = RegLin(imp='US', data=df_reg4, name_LR='perdim_US')
latex_file_EU, result_EU = RegLin(imp='EU', data=df_reg4, name_LR='perdim_EU')
print(result_US)
print(result_EU)
"""### Is Mistral more or less similar to an European Country?
In this section, we are going to compare the results of the survey conduct by Mistral (neutral) to the real results of each european country
#### Preprocessing
"""
# Reshape brut for percentage
df_mistral_final = df_reg [df_reg .id_country == 'NEUTRAL']
df_mistral_final = df_mistral_final.groupby(
['id_country', 'id_set', 'id_question'],
as_index=False
).bootstrapped_result.value_counts()
df_mistral_final
# Fill missing values
default = { key: [] for key in df_mistral_final.columns.tolist()}
for i in [1, 2, 3]:
questions_list = df_survey[df_survey.id_set == i].id_question.unique().tolist()
questions_done = df_mistral_final[df_mistral_final.id_set == i].id_question.unique().tolist()
for k in questions_list:
score_done = df_mistral_final[(df_mistral_final.id_set==i) & (df_mistral_final.id_question==k)].bootstrapped_result.unique().tolist()
if k not in questions_done:
for s in range(11):
default['id_country'].append('NEUTRAL')
default['id_set'].append(i)
default['id_question'].append(k)
default['bootstrapped_result'].append(s)
default['count'].append(0)
else:
for s in range(11):
if s not in score_done:
default['id_country'].append('NEUTRAL')
default['id_set'].append(i)
default['id_question'].append(k)
default['bootstrapped_result'].append(s)
default['count'].append(0)
default = pd.DataFrame(default)
df_mistral_final = pd.concat([df_mistral_final, default])
df_mistral_final = df_mistral_final.sort_values(by=['id_country', 'id_set', 'id_question','bootstrapped_result'])
# Calcul percentage
df_mistral_final['percentage'] = df_mistral_final['count'] / df_mistral_final.groupby(['id_country', 'id_set', 'id_question'])['count'].transform('sum')
# Concat percentages
countries = df_survey.id_country.unique().tolist()
df_pc = df_survey[df_survey.id_country == 'FR'][['id_set', 'id_question', 'score']].copy().reset_index(drop=True)
for c in countries:
df_pc[c] = df_survey[df_survey.id_country == c].percentage.reset_index(drop=True)
df_pc['Mistral'] = df_mistral_final.percentage.reset_index(drop=True)
str_ = ['_' for i in range(len(df_pc))] # Change indexes
q_name = (df_pc.id_set.astype(str).reset_index(drop=True) + pd.Series(str_) + df_pc.id_question.astype(str).reset_index(drop=True) + pd.Series(str_) + df_pc.score.astype(str).reset_index(drop=True)).unique().tolist()
df_pc.index = q_name
#Calcul mean score
df_map = df_pc.copy()
for c in countries:
df_map[c] = df_map[c] * df_map.score
df_map = df_map.drop(['score'], axis=1)
df_map = df_map.groupby(by=['id_set', 'id_question'], as_index=False).sum()
# Score similarity
df_map_question = df_map.copy() #similarity value for each question
df_map_set = df_map.drop(['id_question'], axis=1).copy()
df_map_set = df_map_set.groupby(by=['id_set'], as_index=False).mean() #similarity value for each set
for c in countries:
df_map_question[c] = abs(abs(df_map_question[c] - df_map_question['Mistral']) - 10)/10 # 0 means the 2 values are completly opposed, 1 means they are the same
df_map_set[c] = abs(abs(df_map_set[c] - df_map_set['Mistral']) - 10)/10
df_map_set = df_map_set.drop(['id_set', 'EU27', 'Mistral', 'D-E', 'D-W'], axis=True).transpose().sort_index().reset_index() #'D-E', 'D-W' unknown
df_map_set.rename(columns = {'index':'id_country'}, inplace = True)
df_map_question = df_map_question.drop(['id_set', 'EU27', 'Mistral', 'D-E', 'D-W'], axis=True).transpose().sort_index().reset_index() #'D-E', 'D-W' unknown
df_map_question.rename(columns = {'index':'id_country'}, inplace = True)
df_map_set.head() # percentage for each score/question/country
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
europe=world[world.continent=="Europe"]
europe=europe[(europe.name!="Russia") & (europe.name!="Iceland")]
# Create a custom polygon
polygon = Polygon([(-25,35), (40,35), (40,75),(-25,75)])
#Clip polygon from the map of Europe
europe=gpd.clip(europe, polygon) #europe.plot()
# Add the data
#No Chypre, Malta 26
dict_EU = {'FRA':'FR', 'PRT':'PT', 'ESP':'ES', 'EST':'EE', 'ITA':'IT', 'HRV':'HR', 'SVN':'SI', 'SVK':'SK', 'AUT':'AT', 'LUX':'LU', 'BGR':'BG', 'ROU':'RO', 'HUN':'HU', 'POL':'PL', 'LTU':'LT', 'LVA':'LV', 'FIN':'FI', 'CZE':'CZ', 'BEL':'BE', 'DEU':'DE', 'NLD':'NL', 'IRL':'IE', 'DNK':'DK', 'SWE':'SE', 'GRC':'EL' }
europe['id_country'] = europe.iso_a3
europe.id_country= europe.id_country.replace(dict_EU)
europe = pd.merge(left=europe, right=df_map_set, on='id_country', how="left")
europe = europe.rename(columns={0: "EU values", 1: "Identity", 2: "Personal values"})
europe = pd.merge(left=europe, right=df_map_question, on='id_country', how="left")
dict_question = {0:'1_1', 1:'1_2',3:'1_3', 4:'1_4', 5:'1_5', 6:'1_6', 7:'1_7', 8:'1_8', 9:'1_9', 10:'1_10', 11:'1_11', 12:'2_1', 13:'2_2',14:'2_3', 15:'2_4', 16:'2_5', 17:'2_6', 18:'2_7', 19:'2_8', 20:'2_9', 21:'2_10', 22:'2_11'}
europe = europe.rename(columns=dict_question)
europe
len(europe[europe.Identity<0.4])
"""#### Map
"""
# Plot Function
def plot_map(enter:str, title:str):
fig, ax = plt.subplots(1, 1)
divider = make_axes_locatable(ax)
cax = divider.append_axes("bottom", size="10%", pad=0.1)
ax = europe.plot(
column=enter,
figsize=(20,20),
cmap='Blues', #https://matplotlib.org/stable/users/explain/colors/colormaps.html 'OrRd'
ax=ax,
legend=True,
cax=cax,
vmin=0,
vmax=1,
legend_kwds={
"label": title,
"orientation": "horizontal",
},
missing_kwds={
"color": "lightgrey",
"edgecolor": "red",
"hatch": "///",
"label": "Missing values",
}
)
ax.set_axis_off()
plot_map(enter='EU values', title='Similarity between Mistral and EU countries on questions about value')
#to save
#plt.savefig(path_result + 'map_similarity_EUValues.png')
plot_map(enter='Identity', title='Similarity between Mistral and EU countries on questions about identity')
#to save
#plt.savefig(path_result + 'map_similarity_Identity.png')
plot_map(enter='Personal values', title='Similarity between Mistral and EU countries on questions about personal values')
#to save
#plt.savefig(path_result + 'map_similarity_PersonalValues.png')