Parcours Chercheurs & Académiques
À qui s’adresse ce parcours ?
- Doctorants en IA, ML, NLP, Computer Vision
- Chercheurs en début ou milieu de carrière
- Ingénieurs R&D travaillant sur des problématiques avancées
- Enseignants-chercheurs souhaitant actualiser leurs connaissances
- Data Scientists visant une expertise approfondie
Prérequis
- Solides bases en mathématiques (algèbre linéaire, probabilités, calcul)
- Maîtrise de Python et des bibliothèques scientifiques (NumPy, PyTorch/TensorFlow)
- Connaissance des architectures de réseaux de neurones
- Familiarité avec les publications scientifiques
- Niveau anglais scientifique (lecture de papers)
Durée et engagement
- Durée totale : 12-16 semaines
- Temps hebdomadaire : 15-20 heures
- Format : Lectures théoriques, expérimentations, reproduction de papers
- Niveau : Avancé (perspective recherche)
Fondements théoriques approfondis (Semaines 1-3)
Objectif : Maîtriser les fondations mathématiques et architecturales de l’IA moderne
Architecture Transformer en profondeur
Comprendre l’architecture révolutionnaire
- Transformers : Architecture et fonctionnement
- Paper fondateur : “Attention is All You Need” (Vaswani et al., 2017)
- Mécanismes clés : Self-attention, Multi-head attention, Positional encoding
- Variantes : Encoder-only (BERT), Decoder-only (GPT), Encoder-Decoder (T5)
Implémentation from scratch
import torch
import torch.nn as nn
import math
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
"""
Q, K, V: (batch_size, num_heads, seq_len, d_k)
"""
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attention_weights = torch.softmax(scores, dim=-1)
output = torch.matmul(attention_weights, V)
return output, attention_weights
def split_heads(self, x):
"""
x: (batch_size, seq_len, d_model)
return: (batch_size, num_heads, seq_len, d_k)
"""
batch_size, seq_len, _ = x.size()
return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
def forward(self, Q, K, V, mask=None):
batch_size = Q.size(0)
# Linear projections
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))
# Attention
x, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)
# Concatenate heads
x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
# Final linear projection
return self.W_o(x), attention_weights
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Linear(d_ff, d_model)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
# Multi-head attention with residual connection
attn_output, _ = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
# Feed-forward with residual connection
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x
Analyse mathématique
# Complexité computationnelle du self-attention
def attention_complexity_analysis():
"""
Pour une séquence de longueur n et dimension d:
1. Calcul des matrices Q, K, V: O(n * d²)
2. Calcul des scores d'attention: O(n² * d)
3. Application softmax: O(n²)
4. Multiplication avec V: O(n² * d)
Total: O(n² * d + n * d²)
Pour n >> d: Dominé par O(n² * d) → Quadratique en longueur séquence
Pour d >> n: Dominé par O(n * d²) → Quadratique en dimension
"""
import matplotlib.pyplot as plt
import numpy as np
# Visualisation de la complexité
n_values = np.arange(100, 10000, 100)
d = 768 # Dimension typique (BERT-base)
complexity = n_values**2 * d + n_values * d**2
plt.figure(figsize=(10, 6))
plt.plot(n_values, complexity / 1e9, label='Standard Attention')
plt.xlabel('Sequence Length (n)')
plt.ylabel('Operations (billions)')
plt.title('Computational Complexity of Self-Attention')
plt.legend()
plt.grid(True)
plt.show()
# Variantes pour réduire la complexité
"""
Attention efficace:
1. Linear Attention: O(n * d²) - Approximation linéaire
2. Sparse Attention: O(n * sqrt(n) * d) - Attention sur motifs
3. Flash Attention: O(n² * d) mais optimisé mémoire GPU
"""
Papers à étudier (Semaine 1)
Vaswani et al. (2017) - “Attention is All You Need”
- arXiv:1706.03762
- Contribution : Architecture Transformer originale
- À reproduire : Implémentation complète, expériences sur WMT
Devlin et al. (2018) - “BERT: Pre-training of Deep Bidirectional Transformers”
- arXiv:1810.04805
- Contribution : Pré-entraînement bidirectionnel avec MLM
- À analyser : Impact du masking, architecture encoder-only
Radford et al. (2019) - “Language Models are Unsupervised Multitask Learners” (GPT-2)
- Contribution : Scaling laws, zero-shot learning
- À comprendre : Différences encoder-only vs decoder-only

Tokenisation et représentations
Mécanismes de tokenisation avancés
Implémentation BPE (Byte Pair Encoding)
from collections import Counter
import re
class BytePairEncoding:
def __init__(self, vocab_size=10000):
self.vocab_size = vocab_size
self.vocab = {}
self.merges = {}
def get_stats(self, words):
"""Compte les paires de symboles adjacents"""
pairs = Counter()
for word, freq in words.items():
symbols = word.split()
for i in range(len(symbols) - 1):
pairs[(symbols[i], symbols[i+1])] += freq
return pairs
def merge_vocab(self, pair, words):
"""Fusionne la paire la plus fréquente"""
bigram = ' '.join(pair)
replacement = ''.join(pair)
new_words = {}
pattern = re.escape(bigram)
for word, freq in words.items():
new_word = re.sub(pattern, replacement, word)
new_words[new_word] = freq
return new_words
def train(self, corpus):
"""Entraîne le tokenizer BPE sur un corpus"""
# Initialisation : caractères individuels
words = Counter()
for text in corpus:
words[' '.join(text) + ' </w>'] += 1
# Vocabulaire initial : tous les caractères
self.vocab = set()
for word in words.keys():
self.vocab.update(word.split())
# Fusion itérative des paires
for i in range(self.vocab_size - len(self.vocab)):
pairs = self.get_stats(words)
if not pairs:
break
best_pair = max(pairs, key=pairs.get)
words = self.merge_vocab(best_pair, words)
self.merges[best_pair] = i
self.vocab.add(''.join(best_pair))
if (i + 1) % 100 == 0:
print(f"Merge {i+1}: {best_pair} -> {''.join(best_pair)}")
return self.vocab, self.merges
def tokenize(self, text):
"""Tokenise un texte avec le vocabulaire appris"""
word = ' '.join(text) + ' </w>'
while True:
pairs = [(word[i:i+1], word[i+1:i+2])
for i in range(len(word.split()) - 1)]
pairs = [p for p in pairs if p in self.merges]
if not pairs:
break
best_pair = min(pairs, key=lambda p: self.merges[p])
word = word.replace(' '.join(best_pair), ''.join(best_pair))
return word.split()
# Exemple d'utilisation
corpus = [
"low lower lowest",
"new newer newest",
"wide wider widest"
]
bpe = BytePairEncoding(vocab_size=50)
vocab, merges = bpe.train(corpus)
print("Vocabulaire final:", vocab)
print("\nTokenisation de 'lowest':", bpe.tokenize("lowest"))
Embeddings et espaces vectoriels
Analyse des espaces d’embedding
import torch
import torch.nn as nn
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
class EmbeddingAnalyzer:
def __init__(self, embedding_dim=768):
self.embedding_dim = embedding_dim
def analyze_isotropy(self, embeddings):
"""
Mesure l'isotropie de l'espace d'embedding
Un espace isotrope a des vecteurs uniformément distribués
"""
# Calcul de la matrice de similarité cosinus
embeddings_norm = embeddings / embeddings.norm(dim=1, keepdim=True)
similarity_matrix = torch.mm(embeddings_norm, embeddings_norm.t())
# Moyenne des similarités (exclut diagonale)
mask = ~torch.eye(len(embeddings), dtype=bool)
avg_similarity = similarity_matrix[mask].mean().item()
# Variance des similarités
similarity_var = similarity_matrix[mask].var().item()
return {
'avg_similarity': avg_similarity,
'similarity_variance': similarity_var,
'isotropy_score': 1 - abs(avg_similarity) # Plus proche de 1 = plus isotrope
}
def compute_intrinsic_dimension(self, embeddings, n_samples=1000):
"""
Estime la dimension intrinsèque de l'espace d'embedding
Utilise la méthode MLE (Maximum Likelihood Estimation)
"""
# Échantillonnage aléatoire
if len(embeddings) > n_samples:
indices = torch.randperm(len(embeddings))[:n_samples]
embeddings = embeddings[indices]
# Calcul des distances euclidiennes
distances = torch.cdist(embeddings, embeddings)
# Pour chaque point, trouver les k plus proches voisins
k = 20
nearest_distances, _ = torch.topk(distances, k + 1, largest=False, dim=1)
nearest_distances = nearest_distances[:, 1:] # Exclut le point lui-même
# Estimation MLE de la dimension intrinsèque
ratio = nearest_distances[:, -1] / nearest_distances[:, 0]
intrinsic_dim = k / torch.log(ratio).mean()
return intrinsic_dim.item()
def visualize_embedding_space(self, embeddings, labels=None):
"""Visualisation 2D de l'espace d'embedding via PCA"""
# Réduction de dimension avec PCA
pca = PCA(n_components=2)
embeddings_2d = pca.fit_transform(embeddings.cpu().numpy())
# Visualisation
plt.figure(figsize=(12, 8))
if labels is not None:
unique_labels = list(set(labels))
colors = plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))
for label, color in zip(unique_labels, colors):
mask = [l == label for l in labels]
plt.scatter(embeddings_2d[mask, 0], embeddings_2d[mask, 1],
c=[color], label=label, alpha=0.6)
plt.legend()
else:
plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], alpha=0.6)
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)')
plt.title('Embedding Space Visualization (PCA)')
plt.grid(True, alpha=0.3)
plt.show()
return pca.explained_variance_ratio_
# Exemple d'analyse
from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModel.from_pretrained('bert-base-uncased')
texts = [
"The cat sits on the mat",
"A feline rests on the rug",
"The stock market crashed today",
"Financial markets fell sharply"
]
# Génération des embeddings
inputs = tokenizer(texts, return_tensors='pt', padding=True)
with torch.no_grad():
outputs = model(**inputs)
embeddings = outputs.last_hidden_state[:, 0, :] # [CLS] token
analyzer = EmbeddingAnalyzer()
# Analyse de l'isotropie
isotropy = analyzer.analyze_isotropy(embeddings)
print(f"Isotropy score: {isotropy['isotropy_score']:.4f}")
print(f"Average similarity: {isotropy['avg_similarity']:.4f}")
# Dimension intrinsèque
intrinsic_dim = analyzer.compute_intrinsic_dimension(embeddings)
print(f"Intrinsic dimension: {intrinsic_dim:.2f} (vs nominal 768)")
Modèles génératifs et paramètres
Comprendre les architectures modernes
Scaling Laws (Kaplan et al., 2020)
import numpy as np
import matplotlib.pyplot as plt
class ScalingLawsAnalyzer:
"""
Analyse des scaling laws pour les LLMs
Basé sur "Scaling Laws for Neural Language Models" (Kaplan et al., 2020)
"""
def __init__(self):
# Constantes empiriques (approximations)
self.alpha_N = 0.076 # Exposant pour scaling avec N (paramètres)
self.alpha_D = 0.095 # Exposant pour scaling avec D (données)
self.alpha_C = 0.050 # Exposant pour scaling avec C (compute)
def compute_loss(self, N, D, C=None):
"""
Prédit la loss en fonction de:
- N: Nombre de paramètres (en millions)
- D: Taille du dataset (en tokens)
- C: Compute budget (en FLOPs, optionnel)
Loss ≈ (N_c / N)^α_N + (D_c / D)^α_D
où N_c et D_c sont des constantes critiques
"""
N_c = 8.8e13 # Paramètres critiques
D_c = 5.4e13 # Tokens critiques
loss_N = (N_c / N) ** self.alpha_N
loss_D = (D_c / D) ** self.alpha_D
return loss_N + loss_D
def optimal_model_size(self, compute_budget):
"""
Taille de modèle optimale selon le budget compute
D'après Hoffmann et al. (2022): "Training Compute-Optimal Large Language Models" (Chinchilla)
Pour chaque doublement du compute:
- Doubler le nombre de paramètres
- Doubler le nombre de tokens d'entraînement
"""
# Règle empirique de Chinchilla
# N_optimal ≈ (C / 6)^0.5
# D_optimal ≈ 20 * N_optimal
N_optimal = (compute_budget / 6) ** 0.5
D_optimal = 20 * N_optimal
return {
'parameters': N_optimal,
'tokens': D_optimal,
'flops_per_token': 6 * N_optimal
}
def plot_scaling_curves(self):
"""Visualise les courbes de scaling"""
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 1. Loss vs Model Size
N_range = np.logspace(6, 11, 100) # 1M à 100B paramètres
D_fixed = 1e12 # 1T tokens
losses = [self.compute_loss(N, D_fixed) for N in N_range]
axes[0].loglog(N_range, losses)
axes[0].set_xlabel('Model Size (parameters)')
axes[0].set_ylabel('Loss')
axes[0].set_title('Scaling with Model Size\n(fixed dataset: 1T tokens)')
axes[0].grid(True, alpha=0.3)
# Annotations pour modèles connus
models = {
'GPT-2': 1.5e9,
'GPT-3': 175e9,
'LLaMA-2 70B': 70e9,
'Mistral 7B': 7e9
}
for name, size in models.items():
loss = self.compute_loss(size, D_fixed)
axes[0].scatter([size], [loss], s=100, zorder=5)
axes[0].annotate(name, (size, loss), xytext=(10, 10),
textcoords='offset points', fontsize=8)
# 2. Loss vs Dataset Size
D_range = np.logspace(9, 13, 100) # 1B à 10T tokens
N_fixed = 7e9 # 7B paramètres
losses = [self.compute_loss(N_fixed, D) for D in D_range]
axes[1].loglog(D_range, losses)
axes[1].set_xlabel('Dataset Size (tokens)')
axes[1].set_ylabel('Loss')
axes[1].set_title('Scaling with Data\n(fixed model: 7B params)')
axes[1].grid(True, alpha=0.3)
# 3. Optimal allocation (Chinchilla)
compute_range = np.logspace(19, 25, 100) # FLOPs
optimal_N = []
optimal_D = []
for C in compute_range:
opt = self.optimal_model_size(C)
optimal_N.append(opt['parameters'])
optimal_D.append(opt['tokens'])
axes[2].loglog(compute_range, optimal_N, label='Optimal Model Size')
axes[2].loglog(compute_range, optimal_D, label='Optimal Dataset Size')
axes[2].set_xlabel('Compute Budget (FLOPs)')
axes[2].set_ylabel('Optimal Size')
axes[2].set_title('Compute-Optimal Training\n(Chinchilla scaling)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Analyse
analyzer = ScalingLawsAnalyzer()
analyzer.plot_scaling_curves()
# Calcul pour un projet spécifique
compute_budget = 1e23 # FLOPs (similaire à LLaMA-2 70B)
optimal = analyzer.optimal_model_size(compute_budget)
print(f"\nPour un budget de {compute_budget:.2e} FLOPs:")
print(f" Taille optimale: {optimal['parameters']/1e9:.1f}B paramètres")
print(f" Tokens optimaux: {optimal['tokens']/1e12:.1f}T tokens")
print(f" FLOPs/token: {optimal['flops_per_token']:.2e}")
Papers à étudier (Semaines 2-3)
Kaplan et al. (2020) - “Scaling Laws for Neural Language Models”
- arXiv:2001.08361
- Contribution : Lois de puissance pour performance vs taille/données/compute
Hoffmann et al. (2022) - “Training Compute-Optimal Large Language Models” (Chinchilla)
- arXiv:2203.15556
- Contribution : Révision des scaling laws, importance d’équilibrer taille/données
Sennrich et al. (2016) - “Neural Machine Translation of Rare Words with Subword Units”
- arXiv:1508.07909
- Contribution : BPE pour NMT, gestion des mots rares
Techniques avancées (Semaines 4-7)
Objectif : Maîtriser les techniques de pointe pour optimiser et adapter les LLMs
Retrieval-Augmented Generation (RAG)
Architecture et variantes
Implémentation RAG avancée avec reranking
import torch
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer, CrossEncoder
import faiss
import numpy as np
class AdvancedRAGSystem:
def __init__(self,
embedding_model='sentence-transformers/all-MiniLM-L6-v2',
reranker_model='cross-encoder/ms-marco-MiniLM-L-6-v2',
llm_model='gpt-4'):
"""
RAG avancé avec:
1. Dense retrieval (embeddings)
2. Reranking avec cross-encoder
3. Génération avec LLM
"""
self.embedding_model = SentenceTransformer(embedding_model)
self.reranker = CrossEncoder(reranker_model)
self.llm_model = llm_model
self.index = None
self.documents = []
def build_index(self, documents, use_gpu=False):
"""Construit l'index FAISS pour la recherche vectorielle"""
self.documents = documents
# Génération des embeddings
embeddings = self.embedding_model.encode(
documents,
show_progress_bar=True,
convert_to_numpy=True
)
# Construction de l'index FAISS
dimension = embeddings.shape[1]
# Index IVF pour de grandes collections
if len(documents) > 10000:
nlist = int(np.sqrt(len(documents))) # Nombre de clusters
quantizer = faiss.IndexFlatL2(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
# Entraînement de l'index
self.index.train(embeddings)
self.index.add(embeddings)
self.index.nprobe = min(10, nlist) # Recherche dans 10 clusters
else:
# Index plat pour petites collections
self.index = faiss.IndexFlatL2(dimension)
self.index.add(embeddings)
if use_gpu and faiss.get_num_gpus() > 0:
self.index = faiss.index_cpu_to_gpu(
faiss.StandardGpuResources(), 0, self.index
)
print(f"Index built with {len(documents)} documents")
def retrieve(self, query, top_k=20, rerank_top_n=5):
"""
Récupération en deux étapes:
1. Dense retrieval: top_k candidats
2. Reranking: top_n résultats finaux
"""
# Étape 1: Dense retrieval
query_embedding = self.embedding_model.encode(
[query],
convert_to_numpy=True
)
distances, indices = self.index.search(query_embedding, top_k)
candidates = [
{
'text': self.documents[idx],
'score': float(dist),
'rank': rank
}
for rank, (idx, dist) in enumerate(zip(indices[0], distances[0]))
]
# Étape 2: Reranking avec cross-encoder
pairs = [[query, cand['text']] for cand in candidates]
rerank_scores = self.reranker.predict(pairs)
# Fusion des scores (moyenne pondérée)
for cand, rerank_score in zip(candidates, rerank_scores):
cand['rerank_score'] = float(rerank_score)
cand['final_score'] = 0.3 * (1 / (1 + cand['score'])) + 0.7 * rerank_score
# Tri par score final
candidates.sort(key=lambda x: x['final_score'], reverse=True)
return candidates[:rerank_top_n]
def generate(self, query, context_docs, max_tokens=500):
"""Génération de la réponse avec le LLM"""
# Construction du context
context = "\n\n".join([
f"[Document {i+1}]\n{doc['text']}"
for i, doc in enumerate(context_docs)
])
# Prompt engineering
prompt = f"""Tu es un assistant intelligent qui répond aux questions en te basant sur les documents fournis.
Documents de référence:
{context}
Question: {query}
Réponds à la question en citant les documents pertinents. Si l'information n'est pas dans les documents, indique-le clairement.
Réponse:"""
# Appel au LLM (ici simplifié, utiliser OpenAI API ou autre)
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "Tu es un assistant précis qui base ses réponses sur les sources fournies."},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
temperature=0.7
)
return {
'answer': response.choices[0].message.content,
'sources': context_docs,
'metadata': {
'tokens_used': response.usage.total_tokens,
'model': self.llm_model
}
}
def evaluate_retrieval(self, test_queries, ground_truth_docs):
"""
Évaluation de la qualité de récupération
Métriques: Recall@K, MRR, MAP
"""
results = {
'recall_at_5': [],
'mrr': [],
'map': []
}
for query, relevant_docs in zip(test_queries, ground_truth_docs):
retrieved = self.retrieve(query, top_k=20, rerank_top_n=5)
retrieved_texts = [doc['text'] for doc in retrieved]
# Recall@5
relevant_in_top5 = len(set(retrieved_texts[:5]) & set(relevant_docs))
recall = relevant_in_top5 / len(relevant_docs)
results['recall_at_5'].append(recall)
# MRR (Mean Reciprocal Rank)
for rank, doc in enumerate(retrieved_texts, 1):
if doc in relevant_docs:
results['mrr'].append(1.0 / rank)
break
else:
results['mrr'].append(0.0)
# MAP (Mean Average Precision)
precisions = []
num_relevant = 0
for rank, doc in enumerate(retrieved_texts, 1):
if doc in relevant_docs:
num_relevant += 1
precisions.append(num_relevant / rank)
if precisions:
results['map'].append(np.mean(precisions))
else:
results['map'].append(0.0)
return {
'recall@5': np.mean(results['recall_at_5']),
'mrr': np.mean(results['mrr']),
'map': np.mean(results['map'])
}
# Exemple d'utilisation
documents = [
"Les Transformers utilisent le mécanisme d'attention pour capturer les dépendances à longue distance.",
"BERT est un modèle encoder-only pré-entraîné avec masked language modeling.",
"GPT utilise une architecture decoder-only et génère du texte de manière autoregressive.",
"Le fine-tuning adapte un modèle pré-entraîné à une tâche spécifique.",
"RAG combine retrieval et génération pour améliorer la factualité des réponses."
]
rag_system = AdvancedRAGSystem()
rag_system.build_index(documents)
query = "Comment fonctionne BERT ?"
retrieved_docs = rag_system.retrieve(query, top_k=10, rerank_top_n=3)
print("Documents récupérés:")
for i, doc in enumerate(retrieved_docs, 1):
print(f"\n{i}. Score: {doc['final_score']:.4f}")
print(f" {doc['text']}")
# Génération de la réponse
result = rag_system.generate(query, retrieved_docs)
print(f"\nRéponse: {result['answer']}")
Papers à étudier (Semaine 4)
Lewis et al. (2020) - “Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks”
- arXiv:2005.11401
- Contribution : Architecture RAG originale
Guu et al. (2020) - “REALM: Retrieval-Augmented Language Model Pre-Training”
- arXiv:2002.08909
- Contribution : Pré-entraînement avec retrieval
Izacard & Grave (2021) - “Leveraging Passage Retrieval with Generative Models”
- arXiv:2007.01282
- Contribution : Fusion-in-Decoder (FiD)
Fine-tuning et PEFT
Parameter-Efficient Fine-Tuning (PEFT)
Implémentation LoRA from scratch
import torch
import torch.nn as nn
class LoRALayer(nn.Module):
"""
Low-Rank Adaptation (LoRA) layer
Au lieu de fine-tuner tous les poids W, on apprend:
W' = W + BA
où B ∈ R^(d×r) et A ∈ R^(r×k) avec r << min(d, k)
"""
def __init__(self, in_features, out_features, rank=8, alpha=16, dropout=0.1):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.rank = rank
self.alpha = alpha
# Poids originaux (frozen)
self.weight = nn.Parameter(torch.randn(out_features, in_features))
self.weight.requires_grad = False
# Matrices LoRA (trainable)
self.lora_A = nn.Parameter(torch.randn(rank, in_features))
self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
self.dropout = nn.Dropout(dropout)
self.scaling = alpha / rank
# Initialisation
nn.init.kaiming_uniform_(self.lora_A, a=np.sqrt(5))
def forward(self, x):
# Forward standard
result = torch.matmul(x, self.weight.t())
# Ajout de la correction LoRA
lora_correction = torch.matmul(
torch.matmul(self.dropout(x), self.lora_A.t()),
self.lora_B.t()
) * self.scaling
return result + lora_correction
def merge_weights(self):
"""Fusionne les poids LoRA avec les poids originaux"""
merged_weight = self.weight + (self.lora_B @ self.lora_A) * self.scaling
return merged_weight
class LoRAAttention(nn.Module):
"""
Multi-Head Attention avec LoRA appliqué aux projections Q, K, V
"""
def __init__(self, d_model, num_heads, lora_rank=8):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
# Projections avec LoRA
self.W_q = LoRALayer(d_model, d_model, rank=lora_rank)
self.W_k = LoRALayer(d_model, d_model, rank=lora_rank)
self.W_v = LoRALayer(d_model, d_model, rank=lora_rank)
self.W_o = LoRALayer(d_model, d_model, rank=lora_rank)
def forward(self, x):
batch_size = x.size(0)
# Projections avec LoRA
Q = self.W_q(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
K = self.W_k(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
V = self.W_v(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
# Attention
scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)
attn_weights = torch.softmax(scores, dim=-1)
attn_output = torch.matmul(attn_weights, V)
# Concatenate heads
attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
# Output projection
return self.W_o(attn_output)
# Analyse du gain de paramètres avec LoRA
def analyze_lora_efficiency(d_model=768, num_layers=12, rank=8):
"""
Compare le nombre de paramètres trainables: Full fine-tuning vs LoRA
"""
# Poids par layer Transformer standard
attn_params = 4 * d_model * d_model # Q, K, V, O
ff_params = 2 * d_model * (4 * d_model) # Feed-forward (expansion 4x)
params_per_layer = attn_params + ff_params
total_params_full_ft = num_layers * params_per_layer
# Avec LoRA (rank r)
# Pour chaque projection: r * d_model (A) + d_model * r (B) = 2 * r * d_model
lora_params_per_proj = 2 * rank * d_model
lora_params_per_layer = 4 * lora_params_per_proj # Q, K, V, O
total_params_lora = num_layers * lora_params_per_layer
print(f"Configuration: d_model={d_model}, layers={num_layers}, rank={rank}")
print(f"\nFull fine-tuning: {total_params_full_ft:,} paramètres")
print(f"LoRA: {total_params_lora:,} paramètres")
print(f"Réduction: {(1 - total_params_lora/total_params_full_ft)*100:.2f}%")
print(f"Facteur: {total_params_full_ft/total_params_lora:.1f}x moins de paramètres")
# Mémoire GPU estimation
bytes_per_param = 4 # float32
gb_full_ft = (total_params_full_ft * bytes_per_param) / 1e9
gb_lora = (total_params_lora * bytes_per_param) / 1e9
print(f"\nMémoire gradients:")
print(f" Full fine-tuning: {gb_full_ft:.2f} GB")
print(f" LoRA: {gb_lora:.2f} GB")
print(f" Économie: {gb_full_ft - gb_lora:.2f} GB")
# Exemple pour différents modèles
print("=== BERT-base ===")
analyze_lora_efficiency(d_model=768, num_layers=12, rank=8)
print("\n=== LLaMA-2 7B (approximation) ===")
analyze_lora_efficiency(d_model=4096, num_layers=32, rank=16)
print("\n=== LLaMA-2 70B (approximation) ===")
analyze_lora_efficiency(d_model=8192, num_layers=80, rank=64)
Autres méthodes PEFT
class PrefixTuning(nn.Module):
"""
Prefix Tuning: Ajoute des "virtual tokens" apprenables au début de chaque layer
"""
def __init__(self, prefix_length=20, d_model=768, num_layers=12):
super().__init__()
self.prefix_length = prefix_length
self.d_model = d_model
# Embeddings de préfixe pour chaque layer
self.prefix_embeddings = nn.Parameter(
torch.randn(num_layers, prefix_length, d_model)
)
def forward(self, layer_idx):
"""Retourne les embeddings de préfixe pour une layer donnée"""
return self.prefix_embeddings[layer_idx]
class AdapterLayer(nn.Module):
"""
Adapter: Ajoute une petite bottleneck layer après attention/FF
"""
def __init__(self, d_model=768, bottleneck_dim=64):
super().__init__()
self.down_project = nn.Linear(d_model, bottleneck_dim)
self.up_project = nn.Linear(bottleneck_dim, d_model)
self.activation = nn.ReLU()
def forward(self, x):
# Projection down → activation → projection up
h = self.activation(self.down_project(x))
return self.up_project(h) + x # Residual connection
class IA3Layer(nn.Module):
"""
IA³ (Infused Adapter by Inhibiting and Amplifying Inner Activations)
Apprend des vecteurs de scaling multiplicatifs
"""
def __init__(self, d_model=768):
super().__init__()
# Vecteurs de scaling apprenables
self.scale_k = nn.Parameter(torch.ones(d_model))
self.scale_v = nn.Parameter(torch.ones(d_model))
self.scale_ff = nn.Parameter(torch.ones(d_model))
def forward(self, x, position='k'):
"""
Applique le scaling selon la position:
- 'k': Sur les keys de l'attention
- 'v': Sur les values de l'attention
- 'ff': Sur le feed-forward
"""
if position == 'k':
return x * self.scale_k
elif position == 'v':
return x * self.scale_v
elif position == 'ff':
return x * self.scale_ff
# Comparaison des méthodes PEFT
def compare_peft_methods(d_model=768, num_layers=12):
"""Compare le nombre de paramètres de différentes méthodes PEFT"""
results = {}
# LoRA
rank = 8
lora_params = num_layers * 4 * 2 * rank * d_model
results['LoRA (r=8)'] = lora_params
# Prefix Tuning
prefix_length = 20
prefix_params = num_layers * prefix_length * d_model
results['Prefix Tuning (L=20)'] = prefix_params
# Adapters
bottleneck = 64
adapter_params = num_layers * 2 * (d_model * bottleneck + bottleneck * d_model)
results['Adapters (d=64)'] = adapter_params
# IA³
ia3_params = num_layers * 3 * d_model
results['IA³'] = ia3_params
# Full fine-tuning (référence)
full_ft_params = num_layers * (4 * d_model**2 + 8 * d_model**2)
results['Full FT'] = full_ft_params
print(f"Comparaison des méthodes PEFT (d_model={d_model}, layers={num_layers})\n")
print(f"{'Méthode':<25} {'Paramètres':>15} {'% de Full FT':>12} {'Mémoire (MB)':>15}")
print("-" * 70)
for method, params in sorted(results.items(), key=lambda x: x[1]):
pct = (params / full_ft_params) * 100
memory_mb = (params * 4) / 1e6 # float32
print(f"{method:<25} {params:>15,} {pct:>11.3f}% {memory_mb:>14.1f}")
compare_peft_methods()
Papers à étudier (Semaines 5-6)
Hu et al. (2021) - “LoRA: Low-Rank Adaptation of Large Language Models”
- arXiv:2106.09685
- À reproduire : Expériences sur BERT/GPT-2
Li & Liang (2021) - “Prefix-Tuning: Optimizing Continuous Prompts”
Houlsby et al. (2019) - “Parameter-Efficient Transfer Learning for NLP”
- arXiv:1902.00751
- Contribution : Adapter layers
Liu et al. (2022) - “Few-Shot Parameter-Efficient Fine-Tuning”
- arXiv:2205.05638
- Contribution : IA³
Optimisation et quantification
Techniques de quantification
Implémentation de quantification post-training
import torch
import torch.nn as nn
class Quantizer:
"""
Quantification de modèles: FP32 → INT8
"""
@staticmethod
def quantize_tensor(tensor, num_bits=8):
"""
Quantifie un tensor en utilisant quantification symétrique
Q = round(R / scale)
où scale = max(|R|) / (2^(num_bits-1) - 1)
"""
# Calcul du scale
max_val = tensor.abs().max()
scale = max_val / (2 ** (num_bits - 1) - 1)
# Quantification
quantized = torch.round(tensor / scale)
# Clipping
max_quant = 2 ** (num_bits - 1) - 1
quantized = torch.clamp(quantized, -max_quant, max_quant)
return quantized.to(torch.int8), scale
@staticmethod
def dequantize_tensor(quantized_tensor, scale):
"""Déquantifie un tensor"""
return quantized_tensor.float() * scale
@staticmethod
def quantize_model(model, num_bits=8):
"""
Quantifie tous les poids d'un modèle
Retourne le modèle quantifié et les scales
"""
quantized_state_dict = {}
scales = {}
for name, param in model.named_parameters():
if 'weight' in name and param.dim() > 1: # Quantifier seulement les matrices
quant_param, scale = Quantizer.quantize_tensor(param.data, num_bits)
quantized_state_dict[name] = quant_param
scales[name] = scale
else:
quantized_state_dict[name] = param.data
return quantized_state_dict, scales
class QuantizedLinear(nn.Module):
"""
Layer Linear quantifiée en INT8 avec compute en INT8
"""
def __init__(self, in_features, out_features):
super().__init__()
self.in_features = in_features
self.out_features = out_features
# Poids quantifiés (INT8)
self.weight_quantized = None
self.weight_scale = None
# Activation scales (calculés dynamiquement ou statiquement)
self.input_scale = None
def quantize_weights(self, weight):
"""Quantifie les poids du layer"""
self.weight_quantized, self.weight_scale = Quantizer.quantize_tensor(weight)
def forward(self, x):
"""
Forward pass en INT8
Y = (X_int8 * W_int8) * (scale_x * scale_w)
"""
# Quantification dynamique de l'input
x_quantized, x_scale = Quantizer.quantize_tensor(x)
# Multiplication en INT8 (simulée en float pour PyTorch)
output_quantized = torch.matmul(
x_quantized.float(),
self.weight_quantized.float().t()
)
# Rescaling
output_scale = x_scale * self.weight_scale
output = output_quantized * output_scale
return output
# Analyse de l'impact de la quantification
def analyze_quantization_error(tensor, num_bits_list=[8, 4, 2]):
"""
Analyse l'erreur de quantification pour différents nombres de bits
"""
import matplotlib.pyplot as plt
results = []
for num_bits in num_bits_list:
# Quantification
quant_tensor, scale = Quantizer.quantize_tensor(tensor, num_bits)
dequant_tensor = Quantizer.dequantize_tensor(quant_tensor, scale)
# Calcul de l'erreur
mse = ((tensor - dequant_tensor) ** 2).mean().item()
mae = (tensor - dequant_tensor).abs().mean().item()
snr = 10 * np.log10((tensor ** 2).mean() / mse)
results.append({
'bits': num_bits,
'mse': mse,
'mae': mae,
'snr_db': snr
})
print(f"\n{num_bits}-bit quantization:")
print(f" MSE: {mse:.6f}")
print(f" MAE: {mae:.6f}")
print(f" SNR: {snr:.2f} dB")
# Visualisation
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
bits = [r['bits'] for r in results]
axes[0].plot(bits, [r['mse'] for r in results], 'o-')
axes[0].set_xlabel('Bits')
axes[0].set_ylabel('MSE')
axes[0].set_title('Mean Squared Error')
axes[0].grid(True)
axes[1].plot(bits, [r['mae'] for r in results], 'o-')
axes[1].set_xlabel('Bits')
axes[1].set_ylabel('MAE')
axes[1].set_title('Mean Absolute Error')
axes[1].grid(True)
axes[2].plot(bits, [r['snr_db'] for r in results], 'o-')
axes[2].set_xlabel('Bits')
axes[2].set_ylabel('SNR (dB)')
axes[2].set_title('Signal-to-Noise Ratio')
axes[2].grid(True)
plt.tight_layout()
plt.show()
# Test sur des poids réalistes
model = nn.Linear(768, 768)
weight = model.weight.data
analyze_quantization_error(weight, num_bits_list=[16, 8, 4, 2])
# Calcul du gain de mémoire
def compute_memory_savings(model_size_params, bits_original=32, bits_quantized=8):
"""
Calcule le gain de mémoire de la quantification
"""
memory_original = (model_size_params * bits_original) / 8 / 1e9 # GB
memory_quantized = (model_size_params * bits_quantized) / 8 / 1e9 # GB
savings = memory_original - memory_quantized
savings_pct = (savings / memory_original) * 100
print(f"\nModèle: {model_size_params/1e9:.1f}B paramètres")
print(f"Mémoire {bits_original}-bit: {memory_original:.2f} GB")
print(f"Mémoire {bits_quantized}-bit: {memory_quantized:.2f} GB")
print(f"Économie: {savings:.2f} GB ({savings_pct:.1f}%)")
compute_memory_savings(7e9, bits_original=32, bits_quantized=8) # LLaMA-2 7B
compute_memory_savings(70e9, bits_original=32, bits_quantized=8) # LLaMA-2 70B
Papers à étudier (Semaine 7)
Dettmers et al. (2022) - “LLM.int8(): 8-bit Matrix Multiplication for Transformers at Scale”
Xiao et al. (2023) - “SmoothQuant: Accurate and Efficient Post-Training Quantization”
Frantar et al. (2023) - “GPTQ: Accurate Post-Training Quantization for GPT”
Agents et orchestration (Semaines 8-10)
Objectif : Maîtriser les systèmes multi-agents et l’orchestration complexe
Semaines 8-9 : Agents autonomes
Implémentation d’agent avec ReAct
from typing import List, Dict, Any
import re
class ReActAgent:
"""
Agent basé sur ReAct (Reasoning + Acting)
Cycle: Thought → Action → Observation → ... → Answer
"""
def __init__(self, llm, tools):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = 10
def parse_action(self, text):
"""
Parse l'output du LLM pour extraire l'action
Format attendu:
Action: tool_name
Action Input: {"param": "value"}
"""
action_match = re.search(r'Action:\s*(.+?)(?:\n|$)', text)
action_input_match = re.search(r'Action Input:\s*(.+?)(?:\n|$)', text, re.DOTALL)
if action_match and action_input_match:
action = action_match.group(1).strip()
action_input = action_input_match.group(1).strip()
# Parse JSON si possible
try:
import json
action_input = json.loads(action_input)
except:
pass
return {'action': action, 'action_input': action_input}
return None
def run(self, query: str) -> Dict[str, Any]:
"""
Exécute l'agent sur une query
"""
# Historique des étapes
trajectory = []
# Prompt ReAct
prompt = self._build_initial_prompt(query)
for iteration in range(self.max_iterations):
# Pensée du LLM
response = self.llm(prompt)
trajectory.append({'type': 'thought', 'content': response})
# Check si réponse finale
if 'Final Answer:' in response:
final_answer = response.split('Final Answer:')[1].strip()
return {
'answer': final_answer,
'trajectory': trajectory,
'iterations': iteration + 1
}
# Parse action
parsed_action = self.parse_action(response)
if parsed_action is None:
prompt += f"\n\nInvalid action format. Please use:\nAction: tool_name\nAction Input: input"
continue
# Exécution de l'action
tool_name = parsed_action['action']
if tool_name not in self.tools:
observation = f"Error: Tool '{tool_name}' not found. Available tools: {list(self.tools.keys())}"
else:
try:
tool = self.tools[tool_name]
observation = tool.run(parsed_action['action_input'])
except Exception as e:
observation = f"Error executing tool: {str(e)}"
trajectory.append({
'type': 'action',
'tool': tool_name,
'input': parsed_action['action_input'],
'observation': observation
})
# Ajout de l'observation au prompt
prompt += f"\n\nObservation: {observation}\n\nThought:"
return {
'answer': "Max iterations reached without finding answer",
'trajectory': trajectory,
'iterations': self.max_iterations
}
def _build_initial_prompt(self, query):
"""Construit le prompt ReAct initial"""
tools_desc = "\n".join([
f"- {name}: {tool.description}"
for name, tool in self.tools.items()
])
return f"""Answer the following question as best you can. You have access to the following tools:
{tools_desc}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{', '.join(self.tools.keys())}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {query}
Thought:"""
# Exemple de tools
class WikipediaTool:
name = "wikipedia"
description = "Search Wikipedia for information about a topic"
def run(self, query):
# Simulation (intégrer API Wikipedia réelle)
return f"Wikipedia search results for '{query}': [Simulated content about {query}]"
class CalculatorTool:
name = "calculator"
description = "Perform mathematical calculations"
def run(self, expression):
try:
result = eval(expression) # En production, utiliser un parser sécurisé !
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
# Test de l'agent
tools = [WikipediaTool(), CalculatorTool()]
def dummy_llm(prompt):
"""LLM simulé pour démonstration"""
# En production, utiliser OpenAI API ou autre
return """Thought: I need to search for information about Paris
Action: wikipedia
Action Input: Paris France"""
agent = ReActAgent(llm=dummy_llm, tools=tools)
result = agent.run("What is the population of Paris?")
print("Answer:", result['answer'])
print(f"\nTrajectory ({result['iterations']} iterations):")
for step in result['trajectory']:
print(f"\n{step['type'].upper()}:")
if step['type'] == 'thought':
print(f" {step['content'][:200]}...")
else:
print(f" Tool: {step['tool']}")
print(f" Input: {step['input']}")
print(f" Observation: {step['observation'][:200]}...")
LangGraph et orchestration
Implémentation d’un système multi-agents
from langgraph.graph import Graph, StateGraph
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
"""État partagé entre les agents"""
messages: Annotated[List[Dict], operator.add]
next_agent: str
final_answer: str
class MultiAgentSystem:
"""
Système multi-agents avec orchestration via LangGraph
Agents:
1. Planner: Décompose la tâche en sous-tâches
2. Researcher: Recherche des informations
3. Analyst: Analyse les données
4. Writer: Rédige la réponse finale
"""
def __init__(self, llm):
self.llm = llm
self.graph = self._build_graph()
def planner_agent(self, state: AgentState) -> AgentState:
"""Agent planificateur"""
query = state['messages'][0]['content']
prompt = f"""You are a planning agent. Break down this query into subtasks:
Query: {query}
Provide a step-by-step plan."""
response = self.llm(prompt)
state['messages'].append({
'agent': 'planner',
'content': response
})
state['next_agent'] = 'researcher'
return state
def researcher_agent(self, state: AgentState) -> AgentState:
"""Agent chercheur"""
plan = state['messages'][-1]['content']
prompt = f"""You are a research agent. Based on this plan, gather information:
Plan: {plan}
Provide research findings."""
response = self.llm(prompt)
state['messages'].append({
'agent': 'researcher',
'content': response
})
state['next_agent'] = 'analyst'
return state
def analyst_agent(self, state: AgentState) -> AgentState:
"""Agent analyste"""
research = state['messages'][-1]['content']
prompt = f"""You are an analyst. Analyze this research:
Research: {research}
Provide key insights."""
response = self.llm(prompt)
state['messages'].append({
'agent': 'analyst',
'content': response
})
state['next_agent'] = 'writer'
return state
def writer_agent(self, state: AgentState) -> AgentState:
"""Agent rédacteur"""
analysis = state['messages'][-1]['content']
original_query = state['messages'][0]['content']
prompt = f"""You are a writer. Based on this analysis, write a final answer to the query:
Query: {original_query}
Analysis: {analysis}
Write a comprehensive answer."""
response = self.llm(prompt)
state['messages'].append({
'agent': 'writer',
'content': response
})
state['final_answer'] = response
state['next_agent'] = 'END'
return state
def router(self, state: AgentState) -> str:
"""Route vers le prochain agent"""
return state.get('next_agent', 'END')
def _build_graph(self):
"""Construit le graphe d'orchestration"""
workflow = StateGraph(AgentState)
# Ajout des nœuds
workflow.add_node("planner", self.planner_agent)
workflow.add_node("researcher", self.researcher_agent)
workflow.add_node("analyst", self.analyst_agent)
workflow.add_node("writer", self.writer_agent)
# Définition des transitions
workflow.set_entry_point("planner")
workflow.add_conditional_edges(
"planner",
self.router,
{
"researcher": "researcher",
"END": END
}
)
workflow.add_conditional_edges(
"researcher",
self.router,
{
"analyst": "analyst",
"END": END
}
)
workflow.add_conditional_edges(
"analyst",
self.router,
{
"writer": "writer",
"END": END
}
)
workflow.add_conditional_edges(
"writer",
self.router,
{
"END": END
}
)
return workflow.compile()
def run(self, query: str) -> Dict[str, Any]:
"""Exécute le système multi-agents"""
initial_state = {
'messages': [{'agent': 'user', 'content': query}],
'next_agent': 'planner',
'final_answer': ''
}
final_state = self.graph.invoke(initial_state)
return {
'answer': final_state['final_answer'],
'trajectory': final_state['messages']
}
Papers à étudier (Semaines 8-10)
Yao et al. (2022) - “ReAct: Synergizing Reasoning and Acting in Language Models”
Wang et al. (2023) - “Plan-and-Solve Prompting”
Wu et al. (2023) - “AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation”
Évaluation et recherche (Semaines 11-14)
Objectif : Maîtriser les méthodologies d’évaluation et contribuer à la recherche
Benchmarking et métriques
Implémentation d’une suite d’évaluation complète
import numpy as np
from typing import List, Dict
from collections import Counter
import sacrebleu
from rouge_score import rouge_scorer
class LLMEvaluator:
"""
Suite d'évaluation complète pour LLMs
"""
def __init__(self):
self.rouge_scorer = rouge_scorer.RougeScorer(
['rouge1', 'rouge2', 'rougeL'],
use_stemmer=True
)
def evaluate_generation(self,
predictions: List[str],
references: List[str],
metrics: List[str] = None) -> Dict[str, float]:
"""
Évalue la qualité de génération avec plusieurs métriques
"""
if metrics is None:
metrics = ['bleu', 'rouge', 'perplexity']
results = {}
if 'bleu' in metrics:
results.update(self.compute_bleu(predictions, references))
if 'rouge' in metrics:
results.update(self.compute_rouge(predictions, references))
if 'bertscore' in metrics:
results.update(self.compute_bertscore(predictions, references))
return results
def compute_bleu(self, predictions, references):
"""Calcule BLEU score"""
bleu = sacrebleu.corpus_bleu(predictions, [references])
return {
'bleu': bleu.score,
'bleu_precisions': bleu.precisions
}
def compute_rouge(self, predictions, references):
"""Calcule ROUGE scores"""
scores = {
'rouge1_f': [],
'rouge1_p': [],
'rouge1_r': [],
'rouge2_f': [],
'rougeL_f': []
}
for pred, ref in zip(predictions, references):
rouge_scores = self.rouge_scorer.score(ref, pred)
scores['rouge1_f'].append(rouge_scores['rouge1'].fmeasure)
scores['rouge1_p'].append(rouge_scores['rouge1'].precision)
scores['rouge1_r'].append(rouge_scores['rouge1'].recall)
scores['rouge2_f'].append(rouge_scores['rouge2'].fmeasure)
scores['rougeL_f'].append(rouge_scores['rougeL'].fmeasure)
return {k: np.mean(v) for k, v in scores.items()}
def compute_bertscore(self, predictions, references):
"""Calcule BERTScore (nécessite bert-score)"""
from bert_score import score
P, R, F1 = score(predictions, references, lang='en', verbose=False)
return {
'bertscore_precision': P.mean().item(),
'bertscore_recall': R.mean().item(),
'bertscore_f1': F1.mean().item()
}
def evaluate_classification(self,
predictions: List[int],
labels: List[int],
num_classes: int) -> Dict[str, Any]:
"""
Évalue une tâche de classification
"""
from sklearn.metrics import (
accuracy_score,
precision_recall_fscore_support,
confusion_matrix,
classification_report
)
accuracy = accuracy_score(labels, predictions)
precision, recall, f1, support = precision_recall_fscore_support(
labels, predictions, average='macro'
)
cm = confusion_matrix(labels, predictions)
return {
'accuracy': accuracy,
'precision_macro': precision,
'recall_macro': recall,
'f1_macro': f1,
'confusion_matrix': cm.tolist(),
'classification_report': classification_report(labels, predictions)
}
def compute_perplexity(self, model, tokenizer, texts: List[str]) -> float:
"""
Calcule la perplexité sur un ensemble de textes
PPL = exp(avg_loss)
"""
import torch
total_loss = 0
total_tokens = 0
model.eval()
with torch.no_grad():
for text in texts:
inputs = tokenizer(text, return_tensors='pt')
outputs = model(**inputs, labels=inputs['input_ids'])
loss = outputs.loss
num_tokens = inputs['input_ids'].size(1)
total_loss += loss.item() * num_tokens
total_tokens += num_tokens
avg_loss = total_loss / total_tokens
perplexity = np.exp(avg_loss)
return perplexity
def evaluate_retrieval(self,
retrieved_docs: List[List[str]],
relevant_docs: List[List[str]],
k_values: List[int] = [1, 5, 10]) -> Dict[str, float]:
"""
Évalue la qualité de retrieval (pour RAG)
"""
metrics = {}
# Recall@K
for k in k_values:
recalls = []
for retrieved, relevant in zip(retrieved_docs, relevant_docs):
retrieved_k = set(retrieved[:k])
relevant_set = set(relevant)
recall = len(retrieved_k & relevant_set) / len(relevant_set)
recalls.append(recall)
metrics[f'recall@{k}'] = np.mean(recalls)
# MRR (Mean Reciprocal Rank)
mrrs = []
for retrieved, relevant in zip(retrieved_docs, relevant_docs):
relevant_set = set(relevant)
for rank, doc in enumerate(retrieved, 1):
if doc in relevant_set:
mrrs.append(1.0 / rank)
break
else:
mrrs.append(0.0)
metrics['mrr'] = np.mean(mrrs)
# MAP (Mean Average Precision)
maps = []
for retrieved, relevant in zip(retrieved_docs, relevant_docs):
relevant_set = set(relevant)
precisions = []
num_relevant = 0
for rank, doc in enumerate(retrieved, 1):
if doc in relevant_set:
num_relevant += 1
precisions.append(num_relevant / rank)
if precisions:
maps.append(np.mean(precisions))
else:
maps.append(0.0)
metrics['map'] = np.mean(maps)
return metrics
# Exemple d'utilisation complète
evaluator = LLMEvaluator()
# 1. Évaluation génération
predictions = [
"The cat is sitting on the mat",
"Machine learning is a subset of AI"
]
references = [
"A cat is on the mat",
"Machine learning is part of artificial intelligence"
]
gen_metrics = evaluator.evaluate_generation(predictions, references)
print("Generation metrics:", gen_metrics)
# 2. Évaluation classification
pred_labels = [0, 1, 1, 0, 1]
true_labels = [0, 1, 0, 0, 1]
class_metrics = evaluator.evaluate_classification(pred_labels, true_labels, num_classes=2)
print("\nClassification metrics:", class_metrics)
# 3. Évaluation retrieval
retrieved = [
["doc1", "doc2", "doc3"],
["doc4", "doc5", "doc6"]
]
relevant = [
["doc1", "doc4"],
["doc5", "doc7"]
]
retrieval_metrics = evaluator.evaluate_retrieval(retrieved, relevant)
print("\nRetrieval metrics:", retrieval_metrics)
Expérimentation et reproduction
Framework d’expérimentation reproductible
import torch
import random
import numpy as np
import json
import hashlib
from pathlib import Path
from datetime import datetime
class ExperimentTracker:
"""
Système de tracking d'expériences pour recherche reproductible
"""
def __init__(self, experiment_name: str, base_dir: str = "./experiments"):
self.experiment_name = experiment_name
self.base_dir = Path(base_dir)
self.experiment_dir = self.base_dir / experiment_name / datetime.now().strftime("%Y%m%d_%H%M%S")
self.experiment_dir.mkdir(parents=True, exist_ok=True)
self.config = {}
self.metrics = {}
self.artifacts = {}
def set_seed(self, seed: int):
"""Fixe les seeds pour reproductibilité"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# Déterministe (peut réduire performance)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
self.config['seed'] = seed
def log_config(self, config: Dict[str, Any]):
"""Enregistre la configuration de l'expérience"""
self.config.update(config)
# Compute config hash for uniqueness
config_str = json.dumps(config, sort_keys=True)
config_hash = hashlib.md5(config_str.encode()).hexdigest()[:8]
self.config['config_hash'] = config_hash
# Save config
with open(self.experiment_dir / "config.json", 'w') as f:
json.dump(self.config, f, indent=2)
def log_metrics(self, metrics: Dict[str, float], step: int = None):
"""Enregistre des métriques"""
if step is not None:
if step not in self.metrics:
self.metrics[step] = {}
self.metrics[step].update(metrics)
else:
self.metrics.update(metrics)
# Save metrics
with open(self.experiment_dir / "metrics.json", 'w') as f:
json.dump(self.metrics, f, indent=2)
def log_artifact(self, name: str, artifact: Any):
"""Enregistre un artefact (modèle, graphique, etc.)"""
artifact_path = self.experiment_dir / f"{name}"
if isinstance(artifact, torch.nn.Module):
torch.save(artifact.state_dict(), artifact_path.with_suffix('.pt'))
elif isinstance(artifact, dict):
with open(artifact_path.with_suffix('.json'), 'w') as f:
json.dump(artifact, f, indent=2)
else:
# Generic pickle
import pickle
with open(artifact_path.with_suffix('.pkl'), 'wb') as f:
pickle.dump(artifact, f)
self.artifacts[name] = str(artifact_path)
def log_code_snapshot(self):
"""Enregistre un snapshot du code"""
import git
try:
repo = git.Repo(search_parent_directories=True)
self.config['git'] = {
'commit': repo.head.object.hexsha,
'branch': repo.active_branch.name,
'is_dirty': repo.is_dirty(),
'remote_url': repo.remotes.origin.url if repo.remotes else None
}
# Save diff if dirty
if repo.is_dirty():
diff = repo.git.diff()
with open(self.experiment_dir / "git_diff.patch", 'w') as f:
f.write(diff)
except git.InvalidGitRepositoryError:
print("Warning: Not a git repository, skipping code snapshot")
def compare_experiments(self, other_exp_dir: str, metric: str):
"""Compare cette expérience avec une autre"""
with open(Path(other_exp_dir) / "metrics.json") as f:
other_metrics = json.load(f)
with open(Path(other_exp_dir) / "config.json") as f:
other_config = json.load(f)
print(f"\nComparaison sur {metric}:")
print(f" Expérience actuelle: {self.metrics.get(metric, 'N/A')}")
print(f" Autre expérience: {other_metrics.get(metric, 'N/A')}")
print(f"\nDifférences de configuration:")
for key in set(list(self.config.keys()) + list(other_config.keys())):
if self.config.get(key) != other_config.get(key):
print(f" {key}: {self.config.get(key)} vs {other_config.get(key)}")
# Exemple d'utilisation pour une expérience de fine-tuning
def run_finetuning_experiment():
"""
Exemple complet d'expérience de fine-tuning avec tracking
"""
# Setup experiment
exp = ExperimentTracker("bert_classification_experiment")
exp.set_seed(42)
# Configuration
config = {
'model': 'bert-base-uncased',
'learning_rate': 2e-5,
'batch_size': 16,
'epochs': 3,
'max_length': 128,
'optimizer': 'AdamW',
'scheduler': 'linear',
'warmup_steps': 500
}
exp.log_config(config)
exp.log_code_snapshot()
# Simuler training
for epoch in range(config['epochs']):
train_loss = np.random.random()
val_loss = np.random.random()
val_acc = 0.7 + epoch * 0.05 + np.random.random() * 0.05
metrics = {
'train_loss': train_loss,
'val_loss': val_loss,
'val_accuracy': val_acc
}
exp.log_metrics(metrics, step=epoch)
print(f"Epoch {epoch}: {metrics}")
# Final metrics
final_metrics = {
'final_accuracy': 0.85,
'final_f1': 0.83,
'best_epoch': 2
}
exp.log_metrics(final_metrics)
# Save model (simulé)
model = {"weights": "dummy"}
exp.log_artifact("final_model", model)
print(f"\nExpérience sauvegardée dans: {exp.experiment_dir}")
run_finetuning_experiment()
Contribution à la recherche
Processus de publication
Template de paper LaTeX
\documentclass{article}
% Packages essentiels
\usepackage[utf8]{inputenc}
\usepackage{amsmath, amssymb, amsfonts}
\usepackage{algorithm, algpseudocode}
\usepackage{graphicx}
\usepackage{hyperref}
\usepackage{booktabs}
\title{[Titre de votre contribution]}
\author{
Votre Nom \\
Institution \\
\texttt{[email protected]}
}
\begin{document}
\maketitle
\begin{abstract}
Résumé concis (150-250 mots) présentant:
\begin{itemize}
\item Le problème adressé
\item Votre approche/contribution
\item Résultats principaux
\item Impact
\end{itemize}
\end{abstract}
\section{Introduction}
\subsection{Motivation}
\subsection{Contributions}
\section{Related Work}
\subsection{[Thème 1]}
\subsection{[Thème 2]}
\section{Method}
\subsection{Problem Formulation}
\subsection{Proposed Approach}
\begin{algorithm}
\caption{[Nom de l'algorithme]}
\begin{algorithmic}[1]
\Require $x$: input
\Ensure $y$: output
\State $y \leftarrow \text{function}(x)$
\end{algorithmic}
\end{algorithm}
\section{Experiments}
\subsection{Experimental Setup}
\subsection{Results}
\begin{table}[h]
\centering
\caption{[Description du tableau]}
\begin{tabular}{lcc}
\toprule
Method & Metric 1 & Metric 2 \\
\midrule
Baseline & 0.75 & 0.80 \\
Ours & \textbf{0.82} & \textbf{0.85} \\
\bottomrule
\end{tabular}
\end{table}
\section{Discussion}
\subsection{Analysis}
\subsection{Limitations}
\section{Conclusion}
\bibliographystyle{plain}
\bibliography{references}
\end{document}
Checklist de soumission
## Avant soumission à une conférence/journal {#avant-soumission-à-une-conférencejournal}
### Code et reproductibilité
- Code publié sur GitHub avec README complet
- Requirements.txt / environment.yml fourni
- Scripts de reproduction des expériences
- Données ou instructions pour les obtenir
- Checkpoints de modèles (si applicable)
- Seeds fixées et documentées
### Paper
- Respecte le template de la conférence/journal
- Nombre de pages conforme
- Figures en haute résolution
- Références bibliographiques complètes et formatées
- Abstract concis et informatif
- Contributions clairement énoncées
- Limitations discutées honnêtement
### Expériences
- Expériences sur plusieurs seeds (≥3)
- Barres d'erreur / écart-types rapportés
- Comparaisons avec baselines pertinentes
- Analyses d'ablation
- Tests statistiques de significativité
### Éthique et impact
- Statement d'éthique si requis
- Discussion des biais potentiels
- Impact sociétal considéré
- Limitations de l'approche discutées
### Supplémentaires
- Supplementary material préparé
- Vidéo de démonstration (si applicable)
- Site web du projet
- Réponses anticipées aux reviewers
Projet final de recherche
Objectif : Mener un mini-projet de recherche complet (Semaines 15-16)
Exemples de projets
Amélioration de RAG
- Hypothèse: Utiliser des embeddings multi-hop améliore la retrieval
- Expériences: Comparer différentes stratégies de retrieval
- Métriques: Recall@K, MRR, qualité génération (BLEU, ROUGE)
PEFT pour domaines spécialisés
- Hypothèse: LoRA avec rank adaptatif selon les layers
- Expériences: Fine-tuning sur corpus médical/légal
- Métriques: Accuracy, F1, coût compute
Agents multi-tâches
- Hypothèse: Architecture modulaire > monolithique
- Expériences: Comparer différentes architectures d’agents
- Métriques: Task success rate, efficacité (steps), robustesse
Structure de rapport
# [Titre du projet]
## 1. Introduction (2 pages) {#1-introduction-2-pages}
- Contexte et motivation
- Question de recherche
- Contributions
## 2. Background (3 pages) {#2-background-3-pages}
- État de l'art
- Fondations théoriques
- Lacunes identifiées
## 3. Méthode (4 pages) {#3-méthode-4-pages}
- Approche proposée
- Architecture/algorithme détaillé
- Complexité computationnelle
## 4. Expériences (5 pages) {#4-expériences-5-pages}
- Setup expérimental
- Datasets et métriques
- Baselines
- Résultats principaux
- Analyses d'ablation
## 5. Discussion (2 pages) {#5-discussion-2-pages}
- Interprétation des résultats
- Limitations
- Travaux futurs
## 6. Conclusion (1 page) {#6-conclusion-1-page}
## Annexes {#annexes}
- Hyperparamètres
- Résultats supplémentaires
- Code snippets
Ressources complémentaires
Cours et tutoriels avancés
MOOCs recommandés
- Stanford CS224N: Natural Language Processing with Deep Learning
- Stanford CS25: Transformers United
- Fast.ai: Practical Deep Learning for Coders (Part 2)
- Hugging Face Course (Advanced)
Chaînes YouTube
- Yannic Kilcher (paper reviews)
- AI Coffee Break with Letitia
- Two Minute Papers
- Stanford HAI
Conférences majeures
Tier 1 (Top venues)
- NeurIPS (Neural Information Processing Systems)
- ICML (International Conference on Machine Learning)
- ICLR (International Conference on Learning Representations)
- ACL (Association for Computational Linguistics)
- EMNLP (Empirical Methods in NLP)
- CVPR (Computer Vision and Pattern Recognition)
Tier 2 (Strong venues)
- NAACL, EACL (Regional ACL)
- AAAI, IJCAI (General AI)
- AISTATS (Statistics and ML)
Journaux
- JMLR (Journal of Machine Learning Research)
- TACL (Transactions of the ACL)
- Nature Machine Intelligence
- IEEE TPAMI
Outils de recherche
Suivi de la littérature
- ArXiv Sanity (Karpathy)
- Papers with Code
- Connected Papers
- Semantic Scholar
Expérimentation
- Weights & Biases
- MLflow
- Neptune.ai
- Comet.ml
Compute
- Google Colab Pro
- Paperspace Gradient
- Lambda Labs
- Academic cluster (demander accès)
Prochaines étapes
Félicitations pour avoir terminé ce parcours avancé !
Vous êtes maintenant équipé pour :
- Comprendre et implémenter les architectures state-of-the-art
- Mener des expériences rigoureuses et reproductibles
- Contribuer à la recherche en IA
- Publier dans des conférences et journals
Pour continuer votre carrière de chercheur
Doctorat
- Identifier des labs travaillant sur vos thématiques
- Contacter des professeurs (avec proposition de recherche)
- Préparer un portfolio (papers, code, projets)
Postdoc / Research Scientist
- Publier régulièrement
- Développer une expertise de niche
- Collaborer avec la communauté
- Reviewer pour des conférences
Industrie R&D
- Labs de recherche (OpenAI, Anthropic, Google Brain, Meta AI, etc.)
- Startups innovantes
- Équilibrer recherche fondamentale et applications
Rester à jour
Lectures hebdomadaires
- 5-10 papers par semaine (survol rapide)
- 1-2 papers par semaine (lecture approfondie)
- Implémenter 1 paper par mois
Participation communautaire
- Contribuer à l’open-source
- Présenter en conférences
- Organiser reading groups
- Mentorer juniors