Parcours Chercheurs & Académiques

tl;dr: Programme 12-16 semaines pour explorer en profondeur les architectures, techniques avancées et méthodologies de recherche en IA

À qui s’adresse ce parcours ?

  • Doctorants en IA, ML, NLP, Computer Vision
  • Chercheurs en début ou milieu de carrière
  • Ingénieurs R&D travaillant sur des problématiques avancées
  • Enseignants-chercheurs souhaitant actualiser leurs connaissances
  • Data Scientists visant une expertise approfondie

Prérequis

  • Solides bases en mathématiques (algèbre linéaire, probabilités, calcul)
  • Maîtrise de Python et des bibliothèques scientifiques (NumPy, PyTorch/TensorFlow)
  • Connaissance des architectures de réseaux de neurones
  • Familiarité avec les publications scientifiques
  • Niveau anglais scientifique (lecture de papers)

Durée et engagement

  • Durée totale : 12-16 semaines
  • Temps hebdomadaire : 15-20 heures
  • Format : Lectures théoriques, expérimentations, reproduction de papers
  • Niveau : Avancé (perspective recherche)

Fondements théoriques approfondis (Semaines 1-3)

Objectif : Maîtriser les fondations mathématiques et architecturales de l’IA moderne

Architecture Transformer en profondeur

Comprendre l’architecture révolutionnaire

  • Transformers : Architecture et fonctionnement
    • Paper fondateur : “Attention is All You Need” (Vaswani et al., 2017)
    • Mécanismes clés : Self-attention, Multi-head attention, Positional encoding
    • Variantes : Encoder-only (BERT), Decoder-only (GPT), Encoder-Decoder (T5)

Implémentation from scratch

import torch
import torch.nn as nn
import math

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        Q, K, V: (batch_size, num_heads, seq_len, d_k)
        """
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attention_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)

        return output, attention_weights

    def split_heads(self, x):
        """
        x: (batch_size, seq_len, d_model)
        return: (batch_size, num_heads, seq_len, d_k)
        """
        batch_size, seq_len, _ = x.size()
        return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)

        # Linear projections
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        # Attention
        x, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)

        # Concatenate heads
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # Final linear projection
        return self.W_o(x), attention_weights


class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()

        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Multi-head attention with residual connection
        attn_output, _ = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))

        # Feed-forward with residual connection
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))

        return x

Analyse mathématique

# Complexité computationnelle du self-attention
def attention_complexity_analysis():
    """
    Pour une séquence de longueur n et dimension d:

    1. Calcul des matrices Q, K, V: O(n * d²)
    2. Calcul des scores d'attention: O(n² * d)
    3. Application softmax: O(n²)
    4. Multiplication avec V: O(n² * d)

    Total: O(n² * d + n * d²)

    Pour n >> d: Dominé par O(n² * d) → Quadratique en longueur séquence
    Pour d >> n: Dominé par O(n * d²) → Quadratique en dimension
    """

    import matplotlib.pyplot as plt
    import numpy as np

    # Visualisation de la complexité
    n_values = np.arange(100, 10000, 100)
    d = 768  # Dimension typique (BERT-base)

    complexity = n_values**2 * d + n_values * d**2

    plt.figure(figsize=(10, 6))
    plt.plot(n_values, complexity / 1e9, label='Standard Attention')
    plt.xlabel('Sequence Length (n)')
    plt.ylabel('Operations (billions)')
    plt.title('Computational Complexity of Self-Attention')
    plt.legend()
    plt.grid(True)
    plt.show()

# Variantes pour réduire la complexité
"""
Attention efficace:
1. Linear Attention: O(n * d²) - Approximation linéaire
2. Sparse Attention: O(n * sqrt(n) * d) - Attention sur motifs
3. Flash Attention: O(n² * d) mais optimisé mémoire GPU
"""

Papers à étudier (Semaine 1)

  1. Vaswani et al. (2017) - “Attention is All You Need”

    • arXiv:1706.03762
    • Contribution : Architecture Transformer originale
    • À reproduire : Implémentation complète, expériences sur WMT
  2. Devlin et al. (2018) - “BERT: Pre-training of Deep Bidirectional Transformers”

    • arXiv:1810.04805
    • Contribution : Pré-entraînement bidirectionnel avec MLM
    • À analyser : Impact du masking, architecture encoder-only
  3. Radford et al. (2019) - “Language Models are Unsupervised Multitask Learners” (GPT-2)

    • Contribution : Scaling laws, zero-shot learning
    • À comprendre : Différences encoder-only vs decoder-only

Parcours chercheur

Tokenisation et représentations

Mécanismes de tokenisation avancés

Implémentation BPE (Byte Pair Encoding)

from collections import Counter
import re

class BytePairEncoding:
    def __init__(self, vocab_size=10000):
        self.vocab_size = vocab_size
        self.vocab = {}
        self.merges = {}

    def get_stats(self, words):
        """Compte les paires de symboles adjacents"""
        pairs = Counter()
        for word, freq in words.items():
            symbols = word.split()
            for i in range(len(symbols) - 1):
                pairs[(symbols[i], symbols[i+1])] += freq
        return pairs

    def merge_vocab(self, pair, words):
        """Fusionne la paire la plus fréquente"""
        bigram = ' '.join(pair)
        replacement = ''.join(pair)

        new_words = {}
        pattern = re.escape(bigram)

        for word, freq in words.items():
            new_word = re.sub(pattern, replacement, word)
            new_words[new_word] = freq

        return new_words

    def train(self, corpus):
        """Entraîne le tokenizer BPE sur un corpus"""
        # Initialisation : caractères individuels
        words = Counter()
        for text in corpus:
            words[' '.join(text) + ' </w>'] += 1

        # Vocabulaire initial : tous les caractères
        self.vocab = set()
        for word in words.keys():
            self.vocab.update(word.split())

        # Fusion itérative des paires
        for i in range(self.vocab_size - len(self.vocab)):
            pairs = self.get_stats(words)
            if not pairs:
                break

            best_pair = max(pairs, key=pairs.get)
            words = self.merge_vocab(best_pair, words)
            self.merges[best_pair] = i
            self.vocab.add(''.join(best_pair))

            if (i + 1) % 100 == 0:
                print(f"Merge {i+1}: {best_pair} -> {''.join(best_pair)}")

        return self.vocab, self.merges

    def tokenize(self, text):
        """Tokenise un texte avec le vocabulaire appris"""
        word = ' '.join(text) + ' </w>'

        while True:
            pairs = [(word[i:i+1], word[i+1:i+2])
                     for i in range(len(word.split()) - 1)]
            pairs = [p for p in pairs if p in self.merges]

            if not pairs:
                break

            best_pair = min(pairs, key=lambda p: self.merges[p])
            word = word.replace(' '.join(best_pair), ''.join(best_pair))

        return word.split()

# Exemple d'utilisation
corpus = [
    "low lower lowest",
    "new newer newest",
    "wide wider widest"
]

bpe = BytePairEncoding(vocab_size=50)
vocab, merges = bpe.train(corpus)

print("Vocabulaire final:", vocab)
print("\nTokenisation de 'lowest':", bpe.tokenize("lowest"))

Embeddings et espaces vectoriels

Analyse des espaces d’embedding

import torch
import torch.nn as nn
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

class EmbeddingAnalyzer:
    def __init__(self, embedding_dim=768):
        self.embedding_dim = embedding_dim

    def analyze_isotropy(self, embeddings):
        """
        Mesure l'isotropie de l'espace d'embedding
        Un espace isotrope a des vecteurs uniformément distribués
        """
        # Calcul de la matrice de similarité cosinus
        embeddings_norm = embeddings / embeddings.norm(dim=1, keepdim=True)
        similarity_matrix = torch.mm(embeddings_norm, embeddings_norm.t())

        # Moyenne des similarités (exclut diagonale)
        mask = ~torch.eye(len(embeddings), dtype=bool)
        avg_similarity = similarity_matrix[mask].mean().item()

        # Variance des similarités
        similarity_var = similarity_matrix[mask].var().item()

        return {
            'avg_similarity': avg_similarity,
            'similarity_variance': similarity_var,
            'isotropy_score': 1 - abs(avg_similarity)  # Plus proche de 1 = plus isotrope
        }

    def compute_intrinsic_dimension(self, embeddings, n_samples=1000):
        """
        Estime la dimension intrinsèque de l'espace d'embedding
        Utilise la méthode MLE (Maximum Likelihood Estimation)
        """
        # Échantillonnage aléatoire
        if len(embeddings) > n_samples:
            indices = torch.randperm(len(embeddings))[:n_samples]
            embeddings = embeddings[indices]

        # Calcul des distances euclidiennes
        distances = torch.cdist(embeddings, embeddings)

        # Pour chaque point, trouver les k plus proches voisins
        k = 20
        nearest_distances, _ = torch.topk(distances, k + 1, largest=False, dim=1)
        nearest_distances = nearest_distances[:, 1:]  # Exclut le point lui-même

        # Estimation MLE de la dimension intrinsèque
        ratio = nearest_distances[:, -1] / nearest_distances[:, 0]
        intrinsic_dim = k / torch.log(ratio).mean()

        return intrinsic_dim.item()

    def visualize_embedding_space(self, embeddings, labels=None):
        """Visualisation 2D de l'espace d'embedding via PCA"""
        # Réduction de dimension avec PCA
        pca = PCA(n_components=2)
        embeddings_2d = pca.fit_transform(embeddings.cpu().numpy())

        # Visualisation
        plt.figure(figsize=(12, 8))

        if labels is not None:
            unique_labels = list(set(labels))
            colors = plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))

            for label, color in zip(unique_labels, colors):
                mask = [l == label for l in labels]
                plt.scatter(embeddings_2d[mask, 0], embeddings_2d[mask, 1],
                           c=[color], label=label, alpha=0.6)
            plt.legend()
        else:
            plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], alpha=0.6)

        plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)')
        plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)')
        plt.title('Embedding Space Visualization (PCA)')
        plt.grid(True, alpha=0.3)
        plt.show()

        return pca.explained_variance_ratio_

# Exemple d'analyse
from transformers import AutoTokenizer, AutoModel

tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModel.from_pretrained('bert-base-uncased')

texts = [
    "The cat sits on the mat",
    "A feline rests on the rug",
    "The stock market crashed today",
    "Financial markets fell sharply"
]

# Génération des embeddings
inputs = tokenizer(texts, return_tensors='pt', padding=True)
with torch.no_grad():
    outputs = model(**inputs)
    embeddings = outputs.last_hidden_state[:, 0, :]  # [CLS] token

analyzer = EmbeddingAnalyzer()

# Analyse de l'isotropie
isotropy = analyzer.analyze_isotropy(embeddings)
print(f"Isotropy score: {isotropy['isotropy_score']:.4f}")
print(f"Average similarity: {isotropy['avg_similarity']:.4f}")

# Dimension intrinsèque
intrinsic_dim = analyzer.compute_intrinsic_dimension(embeddings)
print(f"Intrinsic dimension: {intrinsic_dim:.2f} (vs nominal 768)")

Modèles génératifs et paramètres

Comprendre les architectures modernes

Scaling Laws (Kaplan et al., 2020)

import numpy as np
import matplotlib.pyplot as plt

class ScalingLawsAnalyzer:
    """
    Analyse des scaling laws pour les LLMs
    Basé sur "Scaling Laws for Neural Language Models" (Kaplan et al., 2020)
    """

    def __init__(self):
        # Constantes empiriques (approximations)
        self.alpha_N = 0.076  # Exposant pour scaling avec N (paramètres)
        self.alpha_D = 0.095  # Exposant pour scaling avec D (données)
        self.alpha_C = 0.050  # Exposant pour scaling avec C (compute)

    def compute_loss(self, N, D, C=None):
        """
        Prédit la loss en fonction de:
        - N: Nombre de paramètres (en millions)
        - D: Taille du dataset (en tokens)
        - C: Compute budget (en FLOPs, optionnel)

        Loss ≈ (N_c / N)^α_N + (D_c / D)^α_D
        où N_c et D_c sont des constantes critiques
        """
        N_c = 8.8e13  # Paramètres critiques
        D_c = 5.4e13  # Tokens critiques

        loss_N = (N_c / N) ** self.alpha_N
        loss_D = (D_c / D) ** self.alpha_D

        return loss_N + loss_D

    def optimal_model_size(self, compute_budget):
        """
        Taille de modèle optimale selon le budget compute
        D'après Hoffmann et al. (2022): "Training Compute-Optimal Large Language Models" (Chinchilla)

        Pour chaque doublement du compute:
        - Doubler le nombre de paramètres
        - Doubler le nombre de tokens d'entraînement
        """
        # Règle empirique de Chinchilla
        # N_optimal ≈ (C / 6)^0.5
        # D_optimal ≈ 20 * N_optimal

        N_optimal = (compute_budget / 6) ** 0.5
        D_optimal = 20 * N_optimal

        return {
            'parameters': N_optimal,
            'tokens': D_optimal,
            'flops_per_token': 6 * N_optimal
        }

    def plot_scaling_curves(self):
        """Visualise les courbes de scaling"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))

        # 1. Loss vs Model Size
        N_range = np.logspace(6, 11, 100)  # 1M à 100B paramètres
        D_fixed = 1e12  # 1T tokens

        losses = [self.compute_loss(N, D_fixed) for N in N_range]

        axes[0].loglog(N_range, losses)
        axes[0].set_xlabel('Model Size (parameters)')
        axes[0].set_ylabel('Loss')
        axes[0].set_title('Scaling with Model Size\n(fixed dataset: 1T tokens)')
        axes[0].grid(True, alpha=0.3)

        # Annotations pour modèles connus
        models = {
            'GPT-2': 1.5e9,
            'GPT-3': 175e9,
            'LLaMA-2 70B': 70e9,
            'Mistral 7B': 7e9
        }
        for name, size in models.items():
            loss = self.compute_loss(size, D_fixed)
            axes[0].scatter([size], [loss], s=100, zorder=5)
            axes[0].annotate(name, (size, loss), xytext=(10, 10),
                           textcoords='offset points', fontsize=8)

        # 2. Loss vs Dataset Size
        D_range = np.logspace(9, 13, 100)  # 1B à 10T tokens
        N_fixed = 7e9  # 7B paramètres

        losses = [self.compute_loss(N_fixed, D) for D in D_range]

        axes[1].loglog(D_range, losses)
        axes[1].set_xlabel('Dataset Size (tokens)')
        axes[1].set_ylabel('Loss')
        axes[1].set_title('Scaling with Data\n(fixed model: 7B params)')
        axes[1].grid(True, alpha=0.3)

        # 3. Optimal allocation (Chinchilla)
        compute_range = np.logspace(19, 25, 100)  # FLOPs

        optimal_N = []
        optimal_D = []

        for C in compute_range:
            opt = self.optimal_model_size(C)
            optimal_N.append(opt['parameters'])
            optimal_D.append(opt['tokens'])

        axes[2].loglog(compute_range, optimal_N, label='Optimal Model Size')
        axes[2].loglog(compute_range, optimal_D, label='Optimal Dataset Size')
        axes[2].set_xlabel('Compute Budget (FLOPs)')
        axes[2].set_ylabel('Optimal Size')
        axes[2].set_title('Compute-Optimal Training\n(Chinchilla scaling)')
        axes[2].legend()
        axes[2].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

# Analyse
analyzer = ScalingLawsAnalyzer()
analyzer.plot_scaling_curves()

# Calcul pour un projet spécifique
compute_budget = 1e23  # FLOPs (similaire à LLaMA-2 70B)
optimal = analyzer.optimal_model_size(compute_budget)

print(f"\nPour un budget de {compute_budget:.2e} FLOPs:")
print(f"  Taille optimale: {optimal['parameters']/1e9:.1f}B paramètres")
print(f"  Tokens optimaux: {optimal['tokens']/1e12:.1f}T tokens")
print(f"  FLOPs/token: {optimal['flops_per_token']:.2e}")

Papers à étudier (Semaines 2-3)

  1. Kaplan et al. (2020) - “Scaling Laws for Neural Language Models”

    • arXiv:2001.08361
    • Contribution : Lois de puissance pour performance vs taille/données/compute
  2. Hoffmann et al. (2022) - “Training Compute-Optimal Large Language Models” (Chinchilla)

    • arXiv:2203.15556
    • Contribution : Révision des scaling laws, importance d’équilibrer taille/données
  3. Sennrich et al. (2016) - “Neural Machine Translation of Rare Words with Subword Units”


Techniques avancées (Semaines 4-7)

Objectif : Maîtriser les techniques de pointe pour optimiser et adapter les LLMs

Retrieval-Augmented Generation (RAG)

Architecture et variantes

Implémentation RAG avancée avec reranking

import torch
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer, CrossEncoder
import faiss
import numpy as np

class AdvancedRAGSystem:
    def __init__(self,
                 embedding_model='sentence-transformers/all-MiniLM-L6-v2',
                 reranker_model='cross-encoder/ms-marco-MiniLM-L-6-v2',
                 llm_model='gpt-4'):
        """
        RAG avancé avec:
        1. Dense retrieval (embeddings)
        2. Reranking avec cross-encoder
        3. Génération avec LLM
        """
        self.embedding_model = SentenceTransformer(embedding_model)
        self.reranker = CrossEncoder(reranker_model)
        self.llm_model = llm_model

        self.index = None
        self.documents = []

    def build_index(self, documents, use_gpu=False):
        """Construit l'index FAISS pour la recherche vectorielle"""
        self.documents = documents

        # Génération des embeddings
        embeddings = self.embedding_model.encode(
            documents,
            show_progress_bar=True,
            convert_to_numpy=True
        )

        # Construction de l'index FAISS
        dimension = embeddings.shape[1]

        # Index IVF pour de grandes collections
        if len(documents) > 10000:
            nlist = int(np.sqrt(len(documents)))  # Nombre de clusters
            quantizer = faiss.IndexFlatL2(dimension)
            self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)

            # Entraînement de l'index
            self.index.train(embeddings)
            self.index.add(embeddings)
            self.index.nprobe = min(10, nlist)  # Recherche dans 10 clusters
        else:
            # Index plat pour petites collections
            self.index = faiss.IndexFlatL2(dimension)
            self.index.add(embeddings)

        if use_gpu and faiss.get_num_gpus() > 0:
            self.index = faiss.index_cpu_to_gpu(
                faiss.StandardGpuResources(), 0, self.index
            )

        print(f"Index built with {len(documents)} documents")

    def retrieve(self, query, top_k=20, rerank_top_n=5):
        """
        Récupération en deux étapes:
        1. Dense retrieval: top_k candidats
        2. Reranking: top_n résultats finaux
        """
        # Étape 1: Dense retrieval
        query_embedding = self.embedding_model.encode(
            [query],
            convert_to_numpy=True
        )

        distances, indices = self.index.search(query_embedding, top_k)

        candidates = [
            {
                'text': self.documents[idx],
                'score': float(dist),
                'rank': rank
            }
            for rank, (idx, dist) in enumerate(zip(indices[0], distances[0]))
        ]

        # Étape 2: Reranking avec cross-encoder
        pairs = [[query, cand['text']] for cand in candidates]
        rerank_scores = self.reranker.predict(pairs)

        # Fusion des scores (moyenne pondérée)
        for cand, rerank_score in zip(candidates, rerank_scores):
            cand['rerank_score'] = float(rerank_score)
            cand['final_score'] = 0.3 * (1 / (1 + cand['score'])) + 0.7 * rerank_score

        # Tri par score final
        candidates.sort(key=lambda x: x['final_score'], reverse=True)

        return candidates[:rerank_top_n]

    def generate(self, query, context_docs, max_tokens=500):
        """Génération de la réponse avec le LLM"""
        # Construction du context
        context = "\n\n".join([
            f"[Document {i+1}]\n{doc['text']}"
            for i, doc in enumerate(context_docs)
        ])

        # Prompt engineering
        prompt = f"""Tu es un assistant intelligent qui répond aux questions en te basant sur les documents fournis.

Documents de référence:
{context}

Question: {query}

Réponds à la question en citant les documents pertinents. Si l'information n'est pas dans les documents, indique-le clairement.

Réponse:"""

        # Appel au LLM (ici simplifié, utiliser OpenAI API ou autre)
        from openai import OpenAI
        client = OpenAI()

        response = client.chat.completions.create(
            model=self.llm_model,
            messages=[
                {"role": "system", "content": "Tu es un assistant précis qui base ses réponses sur les sources fournies."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=max_tokens,
            temperature=0.7
        )

        return {
            'answer': response.choices[0].message.content,
            'sources': context_docs,
            'metadata': {
                'tokens_used': response.usage.total_tokens,
                'model': self.llm_model
            }
        }

    def evaluate_retrieval(self, test_queries, ground_truth_docs):
        """
        Évaluation de la qualité de récupération
        Métriques: Recall@K, MRR, MAP
        """
        results = {
            'recall_at_5': [],
            'mrr': [],
            'map': []
        }

        for query, relevant_docs in zip(test_queries, ground_truth_docs):
            retrieved = self.retrieve(query, top_k=20, rerank_top_n=5)
            retrieved_texts = [doc['text'] for doc in retrieved]

            # Recall@5
            relevant_in_top5 = len(set(retrieved_texts[:5]) & set(relevant_docs))
            recall = relevant_in_top5 / len(relevant_docs)
            results['recall_at_5'].append(recall)

            # MRR (Mean Reciprocal Rank)
            for rank, doc in enumerate(retrieved_texts, 1):
                if doc in relevant_docs:
                    results['mrr'].append(1.0 / rank)
                    break
            else:
                results['mrr'].append(0.0)

            # MAP (Mean Average Precision)
            precisions = []
            num_relevant = 0
            for rank, doc in enumerate(retrieved_texts, 1):
                if doc in relevant_docs:
                    num_relevant += 1
                    precisions.append(num_relevant / rank)

            if precisions:
                results['map'].append(np.mean(precisions))
            else:
                results['map'].append(0.0)

        return {
            'recall@5': np.mean(results['recall_at_5']),
            'mrr': np.mean(results['mrr']),
            'map': np.mean(results['map'])
        }

# Exemple d'utilisation
documents = [
    "Les Transformers utilisent le mécanisme d'attention pour capturer les dépendances à longue distance.",
    "BERT est un modèle encoder-only pré-entraîné avec masked language modeling.",
    "GPT utilise une architecture decoder-only et génère du texte de manière autoregressive.",
    "Le fine-tuning adapte un modèle pré-entraîné à une tâche spécifique.",
    "RAG combine retrieval et génération pour améliorer la factualité des réponses."
]

rag_system = AdvancedRAGSystem()
rag_system.build_index(documents)

query = "Comment fonctionne BERT ?"
retrieved_docs = rag_system.retrieve(query, top_k=10, rerank_top_n=3)

print("Documents récupérés:")
for i, doc in enumerate(retrieved_docs, 1):
    print(f"\n{i}. Score: {doc['final_score']:.4f}")
    print(f"   {doc['text']}")

# Génération de la réponse
result = rag_system.generate(query, retrieved_docs)
print(f"\nRéponse: {result['answer']}")

Papers à étudier (Semaine 4)

  1. Lewis et al. (2020) - “Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks”

  2. Guu et al. (2020) - “REALM: Retrieval-Augmented Language Model Pre-Training”

  3. Izacard & Grave (2021) - “Leveraging Passage Retrieval with Generative Models”

Fine-tuning et PEFT

Parameter-Efficient Fine-Tuning (PEFT)

Implémentation LoRA from scratch

import torch
import torch.nn as nn

class LoRALayer(nn.Module):
    """
    Low-Rank Adaptation (LoRA) layer

    Au lieu de fine-tuner tous les poids W, on apprend:
    W' = W + BA
    où B ∈ R^(d×r) et A ∈ R^(r×k) avec r << min(d, k)
    """

    def __init__(self, in_features, out_features, rank=8, alpha=16, dropout=0.1):
        super().__init__()

        self.in_features = in_features
        self.out_features = out_features
        self.rank = rank
        self.alpha = alpha

        # Poids originaux (frozen)
        self.weight = nn.Parameter(torch.randn(out_features, in_features))
        self.weight.requires_grad = False

        # Matrices LoRA (trainable)
        self.lora_A = nn.Parameter(torch.randn(rank, in_features))
        self.lora_B = nn.Parameter(torch.zeros(out_features, rank))

        self.dropout = nn.Dropout(dropout)
        self.scaling = alpha / rank

        # Initialisation
        nn.init.kaiming_uniform_(self.lora_A, a=np.sqrt(5))

    def forward(self, x):
        # Forward standard
        result = torch.matmul(x, self.weight.t())

        # Ajout de la correction LoRA
        lora_correction = torch.matmul(
            torch.matmul(self.dropout(x), self.lora_A.t()),
            self.lora_B.t()
        ) * self.scaling

        return result + lora_correction

    def merge_weights(self):
        """Fusionne les poids LoRA avec les poids originaux"""
        merged_weight = self.weight + (self.lora_B @ self.lora_A) * self.scaling
        return merged_weight


class LoRAAttention(nn.Module):
    """
    Multi-Head Attention avec LoRA appliqué aux projections Q, K, V
    """

    def __init__(self, d_model, num_heads, lora_rank=8):
        super().__init__()
        assert d_model % num_heads == 0

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Projections avec LoRA
        self.W_q = LoRALayer(d_model, d_model, rank=lora_rank)
        self.W_k = LoRALayer(d_model, d_model, rank=lora_rank)
        self.W_v = LoRALayer(d_model, d_model, rank=lora_rank)
        self.W_o = LoRALayer(d_model, d_model, rank=lora_rank)

    def forward(self, x):
        batch_size = x.size(0)

        # Projections avec LoRA
        Q = self.W_q(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # Attention
        scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)
        attn_weights = torch.softmax(scores, dim=-1)
        attn_output = torch.matmul(attn_weights, V)

        # Concatenate heads
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # Output projection
        return self.W_o(attn_output)


# Analyse du gain de paramètres avec LoRA
def analyze_lora_efficiency(d_model=768, num_layers=12, rank=8):
    """
    Compare le nombre de paramètres trainables: Full fine-tuning vs LoRA
    """
    # Poids par layer Transformer standard
    attn_params = 4 * d_model * d_model  # Q, K, V, O
    ff_params = 2 * d_model * (4 * d_model)  # Feed-forward (expansion 4x)
    params_per_layer = attn_params + ff_params

    total_params_full_ft = num_layers * params_per_layer

    # Avec LoRA (rank r)
    # Pour chaque projection: r * d_model (A) + d_model * r (B) = 2 * r * d_model
    lora_params_per_proj = 2 * rank * d_model
    lora_params_per_layer = 4 * lora_params_per_proj  # Q, K, V, O
    total_params_lora = num_layers * lora_params_per_layer

    print(f"Configuration: d_model={d_model}, layers={num_layers}, rank={rank}")
    print(f"\nFull fine-tuning: {total_params_full_ft:,} paramètres")
    print(f"LoRA: {total_params_lora:,} paramètres")
    print(f"Réduction: {(1 - total_params_lora/total_params_full_ft)*100:.2f}%")
    print(f"Facteur: {total_params_full_ft/total_params_lora:.1f}x moins de paramètres")

    # Mémoire GPU estimation
    bytes_per_param = 4  # float32
    gb_full_ft = (total_params_full_ft * bytes_per_param) / 1e9
    gb_lora = (total_params_lora * bytes_per_param) / 1e9

    print(f"\nMémoire gradients:")
    print(f"  Full fine-tuning: {gb_full_ft:.2f} GB")
    print(f"  LoRA: {gb_lora:.2f} GB")
    print(f"  Économie: {gb_full_ft - gb_lora:.2f} GB")

# Exemple pour différents modèles
print("=== BERT-base ===")
analyze_lora_efficiency(d_model=768, num_layers=12, rank=8)

print("\n=== LLaMA-2 7B (approximation) ===")
analyze_lora_efficiency(d_model=4096, num_layers=32, rank=16)

print("\n=== LLaMA-2 70B (approximation) ===")
analyze_lora_efficiency(d_model=8192, num_layers=80, rank=64)

Autres méthodes PEFT

class PrefixTuning(nn.Module):
    """
    Prefix Tuning: Ajoute des "virtual tokens" apprenables au début de chaque layer
    """

    def __init__(self, prefix_length=20, d_model=768, num_layers=12):
        super().__init__()

        self.prefix_length = prefix_length
        self.d_model = d_model

        # Embeddings de préfixe pour chaque layer
        self.prefix_embeddings = nn.Parameter(
            torch.randn(num_layers, prefix_length, d_model)
        )

    def forward(self, layer_idx):
        """Retourne les embeddings de préfixe pour une layer donnée"""
        return self.prefix_embeddings[layer_idx]


class AdapterLayer(nn.Module):
    """
    Adapter: Ajoute une petite bottleneck layer après attention/FF
    """

    def __init__(self, d_model=768, bottleneck_dim=64):
        super().__init__()

        self.down_project = nn.Linear(d_model, bottleneck_dim)
        self.up_project = nn.Linear(bottleneck_dim, d_model)
        self.activation = nn.ReLU()

    def forward(self, x):
        # Projection down → activation → projection up
        h = self.activation(self.down_project(x))
        return self.up_project(h) + x  # Residual connection


class IA3Layer(nn.Module):
    """
    IA³ (Infused Adapter by Inhibiting and Amplifying Inner Activations)
    Apprend des vecteurs de scaling multiplicatifs
    """

    def __init__(self, d_model=768):
        super().__init__()

        # Vecteurs de scaling apprenables
        self.scale_k = nn.Parameter(torch.ones(d_model))
        self.scale_v = nn.Parameter(torch.ones(d_model))
        self.scale_ff = nn.Parameter(torch.ones(d_model))

    def forward(self, x, position='k'):
        """
        Applique le scaling selon la position:
        - 'k': Sur les keys de l'attention
        - 'v': Sur les values de l'attention
        - 'ff': Sur le feed-forward
        """
        if position == 'k':
            return x * self.scale_k
        elif position == 'v':
            return x * self.scale_v
        elif position == 'ff':
            return x * self.scale_ff


# Comparaison des méthodes PEFT
def compare_peft_methods(d_model=768, num_layers=12):
    """Compare le nombre de paramètres de différentes méthodes PEFT"""

    results = {}

    # LoRA
    rank = 8
    lora_params = num_layers * 4 * 2 * rank * d_model
    results['LoRA (r=8)'] = lora_params

    # Prefix Tuning
    prefix_length = 20
    prefix_params = num_layers * prefix_length * d_model
    results['Prefix Tuning (L=20)'] = prefix_params

    # Adapters
    bottleneck = 64
    adapter_params = num_layers * 2 * (d_model * bottleneck + bottleneck * d_model)
    results['Adapters (d=64)'] = adapter_params

    # IA³
    ia3_params = num_layers * 3 * d_model
    results['IA³'] = ia3_params

    # Full fine-tuning (référence)
    full_ft_params = num_layers * (4 * d_model**2 + 8 * d_model**2)
    results['Full FT'] = full_ft_params

    print(f"Comparaison des méthodes PEFT (d_model={d_model}, layers={num_layers})\n")
    print(f"{'Méthode':<25} {'Paramètres':>15} {'% de Full FT':>12} {'Mémoire (MB)':>15}")
    print("-" * 70)

    for method, params in sorted(results.items(), key=lambda x: x[1]):
        pct = (params / full_ft_params) * 100
        memory_mb = (params * 4) / 1e6  # float32
        print(f"{method:<25} {params:>15,} {pct:>11.3f}% {memory_mb:>14.1f}")

compare_peft_methods()

Papers à étudier (Semaines 5-6)

  1. Hu et al. (2021) - “LoRA: Low-Rank Adaptation of Large Language Models”

  2. Li & Liang (2021) - “Prefix-Tuning: Optimizing Continuous Prompts”

  3. Houlsby et al. (2019) - “Parameter-Efficient Transfer Learning for NLP”

  4. Liu et al. (2022) - “Few-Shot Parameter-Efficient Fine-Tuning”

Optimisation et quantification

Techniques de quantification

Implémentation de quantification post-training

import torch
import torch.nn as nn

class Quantizer:
    """
    Quantification de modèles: FP32 → INT8
    """

    @staticmethod
    def quantize_tensor(tensor, num_bits=8):
        """
        Quantifie un tensor en utilisant quantification symétrique

        Q = round(R / scale)
        où scale = max(|R|) / (2^(num_bits-1) - 1)
        """
        # Calcul du scale
        max_val = tensor.abs().max()
        scale = max_val / (2 ** (num_bits - 1) - 1)

        # Quantification
        quantized = torch.round(tensor / scale)

        # Clipping
        max_quant = 2 ** (num_bits - 1) - 1
        quantized = torch.clamp(quantized, -max_quant, max_quant)

        return quantized.to(torch.int8), scale

    @staticmethod
    def dequantize_tensor(quantized_tensor, scale):
        """Déquantifie un tensor"""
        return quantized_tensor.float() * scale

    @staticmethod
    def quantize_model(model, num_bits=8):
        """
        Quantifie tous les poids d'un modèle
        Retourne le modèle quantifié et les scales
        """
        quantized_state_dict = {}
        scales = {}

        for name, param in model.named_parameters():
            if 'weight' in name and param.dim() > 1:  # Quantifier seulement les matrices
                quant_param, scale = Quantizer.quantize_tensor(param.data, num_bits)
                quantized_state_dict[name] = quant_param
                scales[name] = scale
            else:
                quantized_state_dict[name] = param.data

        return quantized_state_dict, scales


class QuantizedLinear(nn.Module):
    """
    Layer Linear quantifiée en INT8 avec compute en INT8
    """

    def __init__(self, in_features, out_features):
        super().__init__()

        self.in_features = in_features
        self.out_features = out_features

        # Poids quantifiés (INT8)
        self.weight_quantized = None
        self.weight_scale = None

        # Activation scales (calculés dynamiquement ou statiquement)
        self.input_scale = None

    def quantize_weights(self, weight):
        """Quantifie les poids du layer"""
        self.weight_quantized, self.weight_scale = Quantizer.quantize_tensor(weight)

    def forward(self, x):
        """
        Forward pass en INT8

        Y = (X_int8 * W_int8) * (scale_x * scale_w)
        """
        # Quantification dynamique de l'input
        x_quantized, x_scale = Quantizer.quantize_tensor(x)

        # Multiplication en INT8 (simulée en float pour PyTorch)
        output_quantized = torch.matmul(
            x_quantized.float(),
            self.weight_quantized.float().t()
        )

        # Rescaling
        output_scale = x_scale * self.weight_scale
        output = output_quantized * output_scale

        return output


# Analyse de l'impact de la quantification
def analyze_quantization_error(tensor, num_bits_list=[8, 4, 2]):
    """
    Analyse l'erreur de quantification pour différents nombres de bits
    """
    import matplotlib.pyplot as plt

    results = []

    for num_bits in num_bits_list:
        # Quantification
        quant_tensor, scale = Quantizer.quantize_tensor(tensor, num_bits)
        dequant_tensor = Quantizer.dequantize_tensor(quant_tensor, scale)

        # Calcul de l'erreur
        mse = ((tensor - dequant_tensor) ** 2).mean().item()
        mae = (tensor - dequant_tensor).abs().mean().item()
        snr = 10 * np.log10((tensor ** 2).mean() / mse)

        results.append({
            'bits': num_bits,
            'mse': mse,
            'mae': mae,
            'snr_db': snr
        })

        print(f"\n{num_bits}-bit quantization:")
        print(f"  MSE: {mse:.6f}")
        print(f"  MAE: {mae:.6f}")
        print(f"  SNR: {snr:.2f} dB")

    # Visualisation
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))

    bits = [r['bits'] for r in results]

    axes[0].plot(bits, [r['mse'] for r in results], 'o-')
    axes[0].set_xlabel('Bits')
    axes[0].set_ylabel('MSE')
    axes[0].set_title('Mean Squared Error')
    axes[0].grid(True)

    axes[1].plot(bits, [r['mae'] for r in results], 'o-')
    axes[1].set_xlabel('Bits')
    axes[1].set_ylabel('MAE')
    axes[1].set_title('Mean Absolute Error')
    axes[1].grid(True)

    axes[2].plot(bits, [r['snr_db'] for r in results], 'o-')
    axes[2].set_xlabel('Bits')
    axes[2].set_ylabel('SNR (dB)')
    axes[2].set_title('Signal-to-Noise Ratio')
    axes[2].grid(True)

    plt.tight_layout()
    plt.show()

# Test sur des poids réalistes
model = nn.Linear(768, 768)
weight = model.weight.data

analyze_quantization_error(weight, num_bits_list=[16, 8, 4, 2])

# Calcul du gain de mémoire
def compute_memory_savings(model_size_params, bits_original=32, bits_quantized=8):
    """
    Calcule le gain de mémoire de la quantification
    """
    memory_original = (model_size_params * bits_original) / 8 / 1e9  # GB
    memory_quantized = (model_size_params * bits_quantized) / 8 / 1e9  # GB

    savings = memory_original - memory_quantized
    savings_pct = (savings / memory_original) * 100

    print(f"\nModèle: {model_size_params/1e9:.1f}B paramètres")
    print(f"Mémoire {bits_original}-bit: {memory_original:.2f} GB")
    print(f"Mémoire {bits_quantized}-bit: {memory_quantized:.2f} GB")
    print(f"Économie: {savings:.2f} GB ({savings_pct:.1f}%)")

compute_memory_savings(7e9, bits_original=32, bits_quantized=8)  # LLaMA-2 7B
compute_memory_savings(70e9, bits_original=32, bits_quantized=8)  # LLaMA-2 70B

Papers à étudier (Semaine 7)

  1. Dettmers et al. (2022) - “LLM.int8(): 8-bit Matrix Multiplication for Transformers at Scale”

  2. Xiao et al. (2023) - “SmoothQuant: Accurate and Efficient Post-Training Quantization”

  3. Frantar et al. (2023) - “GPTQ: Accurate Post-Training Quantization for GPT”


Agents et orchestration (Semaines 8-10)

Objectif : Maîtriser les systèmes multi-agents et l’orchestration complexe

Semaines 8-9 : Agents autonomes

Implémentation d’agent avec ReAct

from typing import List, Dict, Any
import re

class ReActAgent:
    """
    Agent basé sur ReAct (Reasoning + Acting)

    Cycle: Thought → Action → Observation → ... → Answer
    """

    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = 10

    def parse_action(self, text):
        """
        Parse l'output du LLM pour extraire l'action

        Format attendu:
        Action: tool_name
        Action Input: {"param": "value"}
        """
        action_match = re.search(r'Action:\s*(.+?)(?:\n|$)', text)
        action_input_match = re.search(r'Action Input:\s*(.+?)(?:\n|$)', text, re.DOTALL)

        if action_match and action_input_match:
            action = action_match.group(1).strip()
            action_input = action_input_match.group(1).strip()

            # Parse JSON si possible
            try:
                import json
                action_input = json.loads(action_input)
            except:
                pass

            return {'action': action, 'action_input': action_input}

        return None

    def run(self, query: str) -> Dict[str, Any]:
        """
        Exécute l'agent sur une query
        """
        # Historique des étapes
        trajectory = []

        # Prompt ReAct
        prompt = self._build_initial_prompt(query)

        for iteration in range(self.max_iterations):
            # Pensée du LLM
            response = self.llm(prompt)
            trajectory.append({'type': 'thought', 'content': response})

            # Check si réponse finale
            if 'Final Answer:' in response:
                final_answer = response.split('Final Answer:')[1].strip()
                return {
                    'answer': final_answer,
                    'trajectory': trajectory,
                    'iterations': iteration + 1
                }

            # Parse action
            parsed_action = self.parse_action(response)

            if parsed_action is None:
                prompt += f"\n\nInvalid action format. Please use:\nAction: tool_name\nAction Input: input"
                continue

            # Exécution de l'action
            tool_name = parsed_action['action']

            if tool_name not in self.tools:
                observation = f"Error: Tool '{tool_name}' not found. Available tools: {list(self.tools.keys())}"
            else:
                try:
                    tool = self.tools[tool_name]
                    observation = tool.run(parsed_action['action_input'])
                except Exception as e:
                    observation = f"Error executing tool: {str(e)}"

            trajectory.append({
                'type': 'action',
                'tool': tool_name,
                'input': parsed_action['action_input'],
                'observation': observation
            })

            # Ajout de l'observation au prompt
            prompt += f"\n\nObservation: {observation}\n\nThought:"

        return {
            'answer': "Max iterations reached without finding answer",
            'trajectory': trajectory,
            'iterations': self.max_iterations
        }

    def _build_initial_prompt(self, query):
        """Construit le prompt ReAct initial"""
        tools_desc = "\n".join([
            f"- {name}: {tool.description}"
            for name, tool in self.tools.items()
        ])

        return f"""Answer the following question as best you can. You have access to the following tools:

{tools_desc}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{', '.join(self.tools.keys())}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {query}
Thought:"""


# Exemple de tools
class WikipediaTool:
    name = "wikipedia"
    description = "Search Wikipedia for information about a topic"

    def run(self, query):
        # Simulation (intégrer API Wikipedia réelle)
        return f"Wikipedia search results for '{query}': [Simulated content about {query}]"


class CalculatorTool:
    name = "calculator"
    description = "Perform mathematical calculations"

    def run(self, expression):
        try:
            result = eval(expression)  # En production, utiliser un parser sécurisé !
            return f"Result: {result}"
        except Exception as e:
            return f"Error: {str(e)}"


# Test de l'agent
tools = [WikipediaTool(), CalculatorTool()]

def dummy_llm(prompt):
    """LLM simulé pour démonstration"""
    # En production, utiliser OpenAI API ou autre
    return """Thought: I need to search for information about Paris
Action: wikipedia
Action Input: Paris France"""

agent = ReActAgent(llm=dummy_llm, tools=tools)
result = agent.run("What is the population of Paris?")

print("Answer:", result['answer'])
print(f"\nTrajectory ({result['iterations']} iterations):")
for step in result['trajectory']:
    print(f"\n{step['type'].upper()}:")
    if step['type'] == 'thought':
        print(f"  {step['content'][:200]}...")
    else:
        print(f"  Tool: {step['tool']}")
        print(f"  Input: {step['input']}")
        print(f"  Observation: {step['observation'][:200]}...")

LangGraph et orchestration

Implémentation d’un système multi-agents

from langgraph.graph import Graph, StateGraph
from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    """État partagé entre les agents"""
    messages: Annotated[List[Dict], operator.add]
    next_agent: str
    final_answer: str

class MultiAgentSystem:
    """
    Système multi-agents avec orchestration via LangGraph

    Agents:
    1. Planner: Décompose la tâche en sous-tâches
    2. Researcher: Recherche des informations
    3. Analyst: Analyse les données
    4. Writer: Rédige la réponse finale
    """

    def __init__(self, llm):
        self.llm = llm
        self.graph = self._build_graph()

    def planner_agent(self, state: AgentState) -> AgentState:
        """Agent planificateur"""
        query = state['messages'][0]['content']

        prompt = f"""You are a planning agent. Break down this query into subtasks:

Query: {query}

Provide a step-by-step plan."""

        response = self.llm(prompt)

        state['messages'].append({
            'agent': 'planner',
            'content': response
        })
        state['next_agent'] = 'researcher'

        return state

    def researcher_agent(self, state: AgentState) -> AgentState:
        """Agent chercheur"""
        plan = state['messages'][-1]['content']

        prompt = f"""You are a research agent. Based on this plan, gather information:

Plan: {plan}

Provide research findings."""

        response = self.llm(prompt)

        state['messages'].append({
            'agent': 'researcher',
            'content': response
        })
        state['next_agent'] = 'analyst'

        return state

    def analyst_agent(self, state: AgentState) -> AgentState:
        """Agent analyste"""
        research = state['messages'][-1]['content']

        prompt = f"""You are an analyst. Analyze this research:

Research: {research}

Provide key insights."""

        response = self.llm(prompt)

        state['messages'].append({
            'agent': 'analyst',
            'content': response
        })
        state['next_agent'] = 'writer'

        return state

    def writer_agent(self, state: AgentState) -> AgentState:
        """Agent rédacteur"""
        analysis = state['messages'][-1]['content']
        original_query = state['messages'][0]['content']

        prompt = f"""You are a writer. Based on this analysis, write a final answer to the query:

Query: {original_query}
Analysis: {analysis}

Write a comprehensive answer."""

        response = self.llm(prompt)

        state['messages'].append({
            'agent': 'writer',
            'content': response
        })
        state['final_answer'] = response
        state['next_agent'] = 'END'

        return state

    def router(self, state: AgentState) -> str:
        """Route vers le prochain agent"""
        return state.get('next_agent', 'END')

    def _build_graph(self):
        """Construit le graphe d'orchestration"""
        workflow = StateGraph(AgentState)

        # Ajout des nœuds
        workflow.add_node("planner", self.planner_agent)
        workflow.add_node("researcher", self.researcher_agent)
        workflow.add_node("analyst", self.analyst_agent)
        workflow.add_node("writer", self.writer_agent)

        # Définition des transitions
        workflow.set_entry_point("planner")

        workflow.add_conditional_edges(
            "planner",
            self.router,
            {
                "researcher": "researcher",
                "END": END
            }
        )

        workflow.add_conditional_edges(
            "researcher",
            self.router,
            {
                "analyst": "analyst",
                "END": END
            }
        )

        workflow.add_conditional_edges(
            "analyst",
            self.router,
            {
                "writer": "writer",
                "END": END
            }
        )

        workflow.add_conditional_edges(
            "writer",
            self.router,
            {
                "END": END
            }
        )

        return workflow.compile()

    def run(self, query: str) -> Dict[str, Any]:
        """Exécute le système multi-agents"""
        initial_state = {
            'messages': [{'agent': 'user', 'content': query}],
            'next_agent': 'planner',
            'final_answer': ''
        }

        final_state = self.graph.invoke(initial_state)

        return {
            'answer': final_state['final_answer'],
            'trajectory': final_state['messages']
        }

Papers à étudier (Semaines 8-10)

  1. Yao et al. (2022) - “ReAct: Synergizing Reasoning and Acting in Language Models”

  2. Wang et al. (2023) - “Plan-and-Solve Prompting”

  3. Wu et al. (2023) - “AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation”


Évaluation et recherche (Semaines 11-14)

Objectif : Maîtriser les méthodologies d’évaluation et contribuer à la recherche

Benchmarking et métriques

Implémentation d’une suite d’évaluation complète

import numpy as np
from typing import List, Dict
from collections import Counter
import sacrebleu
from rouge_score import rouge_scorer

class LLMEvaluator:
    """
    Suite d'évaluation complète pour LLMs
    """

    def __init__(self):
        self.rouge_scorer = rouge_scorer.RougeScorer(
            ['rouge1', 'rouge2', 'rougeL'],
            use_stemmer=True
        )

    def evaluate_generation(self,
                           predictions: List[str],
                           references: List[str],
                           metrics: List[str] = None) -> Dict[str, float]:
        """
        Évalue la qualité de génération avec plusieurs métriques
        """
        if metrics is None:
            metrics = ['bleu', 'rouge', 'perplexity']

        results = {}

        if 'bleu' in metrics:
            results.update(self.compute_bleu(predictions, references))

        if 'rouge' in metrics:
            results.update(self.compute_rouge(predictions, references))

        if 'bertscore' in metrics:
            results.update(self.compute_bertscore(predictions, references))

        return results

    def compute_bleu(self, predictions, references):
        """Calcule BLEU score"""
        bleu = sacrebleu.corpus_bleu(predictions, [references])

        return {
            'bleu': bleu.score,
            'bleu_precisions': bleu.precisions
        }

    def compute_rouge(self, predictions, references):
        """Calcule ROUGE scores"""
        scores = {
            'rouge1_f': [],
            'rouge1_p': [],
            'rouge1_r': [],
            'rouge2_f': [],
            'rougeL_f': []
        }

        for pred, ref in zip(predictions, references):
            rouge_scores = self.rouge_scorer.score(ref, pred)

            scores['rouge1_f'].append(rouge_scores['rouge1'].fmeasure)
            scores['rouge1_p'].append(rouge_scores['rouge1'].precision)
            scores['rouge1_r'].append(rouge_scores['rouge1'].recall)
            scores['rouge2_f'].append(rouge_scores['rouge2'].fmeasure)
            scores['rougeL_f'].append(rouge_scores['rougeL'].fmeasure)

        return {k: np.mean(v) for k, v in scores.items()}

    def compute_bertscore(self, predictions, references):
        """Calcule BERTScore (nécessite bert-score)"""
        from bert_score import score

        P, R, F1 = score(predictions, references, lang='en', verbose=False)

        return {
            'bertscore_precision': P.mean().item(),
            'bertscore_recall': R.mean().item(),
            'bertscore_f1': F1.mean().item()
        }

    def evaluate_classification(self,
                                predictions: List[int],
                                labels: List[int],
                                num_classes: int) -> Dict[str, Any]:
        """
        Évalue une tâche de classification
        """
        from sklearn.metrics import (
            accuracy_score,
            precision_recall_fscore_support,
            confusion_matrix,
            classification_report
        )

        accuracy = accuracy_score(labels, predictions)
        precision, recall, f1, support = precision_recall_fscore_support(
            labels, predictions, average='macro'
        )

        cm = confusion_matrix(labels, predictions)

        return {
            'accuracy': accuracy,
            'precision_macro': precision,
            'recall_macro': recall,
            'f1_macro': f1,
            'confusion_matrix': cm.tolist(),
            'classification_report': classification_report(labels, predictions)
        }

    def compute_perplexity(self, model, tokenizer, texts: List[str]) -> float:
        """
        Calcule la perplexité sur un ensemble de textes

        PPL = exp(avg_loss)
        """
        import torch

        total_loss = 0
        total_tokens = 0

        model.eval()
        with torch.no_grad():
            for text in texts:
                inputs = tokenizer(text, return_tensors='pt')
                outputs = model(**inputs, labels=inputs['input_ids'])

                loss = outputs.loss
                num_tokens = inputs['input_ids'].size(1)

                total_loss += loss.item() * num_tokens
                total_tokens += num_tokens

        avg_loss = total_loss / total_tokens
        perplexity = np.exp(avg_loss)

        return perplexity

    def evaluate_retrieval(self,
                          retrieved_docs: List[List[str]],
                          relevant_docs: List[List[str]],
                          k_values: List[int] = [1, 5, 10]) -> Dict[str, float]:
        """
        Évalue la qualité de retrieval (pour RAG)
        """
        metrics = {}

        # Recall@K
        for k in k_values:
            recalls = []
            for retrieved, relevant in zip(retrieved_docs, relevant_docs):
                retrieved_k = set(retrieved[:k])
                relevant_set = set(relevant)

                recall = len(retrieved_k & relevant_set) / len(relevant_set)
                recalls.append(recall)

            metrics[f'recall@{k}'] = np.mean(recalls)

        # MRR (Mean Reciprocal Rank)
        mrrs = []
        for retrieved, relevant in zip(retrieved_docs, relevant_docs):
            relevant_set = set(relevant)

            for rank, doc in enumerate(retrieved, 1):
                if doc in relevant_set:
                    mrrs.append(1.0 / rank)
                    break
            else:
                mrrs.append(0.0)

        metrics['mrr'] = np.mean(mrrs)

        # MAP (Mean Average Precision)
        maps = []
        for retrieved, relevant in zip(retrieved_docs, relevant_docs):
            relevant_set = set(relevant)

            precisions = []
            num_relevant = 0

            for rank, doc in enumerate(retrieved, 1):
                if doc in relevant_set:
                    num_relevant += 1
                    precisions.append(num_relevant / rank)

            if precisions:
                maps.append(np.mean(precisions))
            else:
                maps.append(0.0)

        metrics['map'] = np.mean(maps)

        return metrics


# Exemple d'utilisation complète
evaluator = LLMEvaluator()

# 1. Évaluation génération
predictions = [
    "The cat is sitting on the mat",
    "Machine learning is a subset of AI"
]
references = [
    "A cat is on the mat",
    "Machine learning is part of artificial intelligence"
]

gen_metrics = evaluator.evaluate_generation(predictions, references)
print("Generation metrics:", gen_metrics)

# 2. Évaluation classification
pred_labels = [0, 1, 1, 0, 1]
true_labels = [0, 1, 0, 0, 1]

class_metrics = evaluator.evaluate_classification(pred_labels, true_labels, num_classes=2)
print("\nClassification metrics:", class_metrics)

# 3. Évaluation retrieval
retrieved = [
    ["doc1", "doc2", "doc3"],
    ["doc4", "doc5", "doc6"]
]
relevant = [
    ["doc1", "doc4"],
    ["doc5", "doc7"]
]

retrieval_metrics = evaluator.evaluate_retrieval(retrieved, relevant)
print("\nRetrieval metrics:", retrieval_metrics)

Expérimentation et reproduction

Framework d’expérimentation reproductible

import torch
import random
import numpy as np
import json
import hashlib
from pathlib import Path
from datetime import datetime

class ExperimentTracker:
    """
    Système de tracking d'expériences pour recherche reproductible
    """

    def __init__(self, experiment_name: str, base_dir: str = "./experiments"):
        self.experiment_name = experiment_name
        self.base_dir = Path(base_dir)
        self.experiment_dir = self.base_dir / experiment_name / datetime.now().strftime("%Y%m%d_%H%M%S")
        self.experiment_dir.mkdir(parents=True, exist_ok=True)

        self.config = {}
        self.metrics = {}
        self.artifacts = {}

    def set_seed(self, seed: int):
        """Fixe les seeds pour reproductibilité"""
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)

        # Déterministe (peut réduire performance)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

        self.config['seed'] = seed

    def log_config(self, config: Dict[str, Any]):
        """Enregistre la configuration de l'expérience"""
        self.config.update(config)

        # Compute config hash for uniqueness
        config_str = json.dumps(config, sort_keys=True)
        config_hash = hashlib.md5(config_str.encode()).hexdigest()[:8]
        self.config['config_hash'] = config_hash

        # Save config
        with open(self.experiment_dir / "config.json", 'w') as f:
            json.dump(self.config, f, indent=2)

    def log_metrics(self, metrics: Dict[str, float], step: int = None):
        """Enregistre des métriques"""
        if step is not None:
            if step not in self.metrics:
                self.metrics[step] = {}
            self.metrics[step].update(metrics)
        else:
            self.metrics.update(metrics)

        # Save metrics
        with open(self.experiment_dir / "metrics.json", 'w') as f:
            json.dump(self.metrics, f, indent=2)

    def log_artifact(self, name: str, artifact: Any):
        """Enregistre un artefact (modèle, graphique, etc.)"""
        artifact_path = self.experiment_dir / f"{name}"

        if isinstance(artifact, torch.nn.Module):
            torch.save(artifact.state_dict(), artifact_path.with_suffix('.pt'))
        elif isinstance(artifact, dict):
            with open(artifact_path.with_suffix('.json'), 'w') as f:
                json.dump(artifact, f, indent=2)
        else:
            # Generic pickle
            import pickle
            with open(artifact_path.with_suffix('.pkl'), 'wb') as f:
                pickle.dump(artifact, f)

        self.artifacts[name] = str(artifact_path)

    def log_code_snapshot(self):
        """Enregistre un snapshot du code"""
        import git

        try:
            repo = git.Repo(search_parent_directories=True)

            self.config['git'] = {
                'commit': repo.head.object.hexsha,
                'branch': repo.active_branch.name,
                'is_dirty': repo.is_dirty(),
                'remote_url': repo.remotes.origin.url if repo.remotes else None
            }

            # Save diff if dirty
            if repo.is_dirty():
                diff = repo.git.diff()
                with open(self.experiment_dir / "git_diff.patch", 'w') as f:
                    f.write(diff)

        except git.InvalidGitRepositoryError:
            print("Warning: Not a git repository, skipping code snapshot")

    def compare_experiments(self, other_exp_dir: str, metric: str):
        """Compare cette expérience avec une autre"""
        with open(Path(other_exp_dir) / "metrics.json") as f:
            other_metrics = json.load(f)

        with open(Path(other_exp_dir) / "config.json") as f:
            other_config = json.load(f)

        print(f"\nComparaison sur {metric}:")
        print(f"  Expérience actuelle: {self.metrics.get(metric, 'N/A')}")
        print(f"  Autre expérience: {other_metrics.get(metric, 'N/A')}")

        print(f"\nDifférences de configuration:")
        for key in set(list(self.config.keys()) + list(other_config.keys())):
            if self.config.get(key) != other_config.get(key):
                print(f"  {key}: {self.config.get(key)} vs {other_config.get(key)}")


# Exemple d'utilisation pour une expérience de fine-tuning
def run_finetuning_experiment():
    """
    Exemple complet d'expérience de fine-tuning avec tracking
    """
    # Setup experiment
    exp = ExperimentTracker("bert_classification_experiment")
    exp.set_seed(42)

    # Configuration
    config = {
        'model': 'bert-base-uncased',
        'learning_rate': 2e-5,
        'batch_size': 16,
        'epochs': 3,
        'max_length': 128,
        'optimizer': 'AdamW',
        'scheduler': 'linear',
        'warmup_steps': 500
    }
    exp.log_config(config)
    exp.log_code_snapshot()

    # Simuler training
    for epoch in range(config['epochs']):
        train_loss = np.random.random()
        val_loss = np.random.random()
        val_acc = 0.7 + epoch * 0.05 + np.random.random() * 0.05

        metrics = {
            'train_loss': train_loss,
            'val_loss': val_loss,
            'val_accuracy': val_acc
        }

        exp.log_metrics(metrics, step=epoch)
        print(f"Epoch {epoch}: {metrics}")

    # Final metrics
    final_metrics = {
        'final_accuracy': 0.85,
        'final_f1': 0.83,
        'best_epoch': 2
    }
    exp.log_metrics(final_metrics)

    # Save model (simulé)
    model = {"weights": "dummy"}
    exp.log_artifact("final_model", model)

    print(f"\nExpérience sauvegardée dans: {exp.experiment_dir}")

run_finetuning_experiment()

Contribution à la recherche

Processus de publication

Template de paper LaTeX

\documentclass{article}

% Packages essentiels
\usepackage[utf8]{inputenc}
\usepackage{amsmath, amssymb, amsfonts}
\usepackage{algorithm, algpseudocode}
\usepackage{graphicx}
\usepackage{hyperref}
\usepackage{booktabs}

\title{[Titre de votre contribution]}

\author{
  Votre Nom \\
  Institution \\
  \texttt{[email protected]}
}

\begin{document}

\maketitle

\begin{abstract}
Résumé concis (150-250 mots) présentant:
\begin{itemize}
    \item Le problème adressé
    \item Votre approche/contribution
    \item Résultats principaux
    \item Impact
\end{itemize}
\end{abstract}

\section{Introduction}
\subsection{Motivation}
\subsection{Contributions}

\section{Related Work}
\subsection{[Thème 1]}
\subsection{[Thème 2]}

\section{Method}
\subsection{Problem Formulation}
\subsection{Proposed Approach}

\begin{algorithm}
\caption{[Nom de l'algorithme]}
\begin{algorithmic}[1]
\Require $x$: input
\Ensure $y$: output
\State $y \leftarrow \text{function}(x)$
\end{algorithmic}
\end{algorithm}

\section{Experiments}
\subsection{Experimental Setup}
\subsection{Results}

\begin{table}[h]
\centering
\caption{[Description du tableau]}
\begin{tabular}{lcc}
\toprule
Method & Metric 1 & Metric 2 \\
\midrule
Baseline & 0.75 & 0.80 \\
Ours & \textbf{0.82} & \textbf{0.85} \\
\bottomrule
\end{tabular}
\end{table}

\section{Discussion}
\subsection{Analysis}
\subsection{Limitations}

\section{Conclusion}

\bibliographystyle{plain}
\bibliography{references}

\end{document}

Checklist de soumission

## Avant soumission à une conférence/journal {#avant-soumission-à-une-conférencejournal}

### Code et reproductibilité
- Code publié sur GitHub avec README complet
- Requirements.txt / environment.yml fourni
- Scripts de reproduction des expériences
- Données ou instructions pour les obtenir
- Checkpoints de modèles (si applicable)
- Seeds fixées et documentées

### Paper
- Respecte le template de la conférence/journal
- Nombre de pages conforme
- Figures en haute résolution
- Références bibliographiques complètes et formatées
- Abstract concis et informatif
- Contributions clairement énoncées
- Limitations discutées honnêtement

### Expériences
- Expériences sur plusieurs seeds (≥3)
- Barres d'erreur / écart-types rapportés
- Comparaisons avec baselines pertinentes
- Analyses d'ablation
- Tests statistiques de significativité

### Éthique et impact
- Statement d'éthique si requis
- Discussion des biais potentiels
- Impact sociétal considéré
- Limitations de l'approche discutées

### Supplémentaires
- Supplementary material préparé
- Vidéo de démonstration (si applicable)
- Site web du projet
- Réponses anticipées aux reviewers

Projet final de recherche

Objectif : Mener un mini-projet de recherche complet (Semaines 15-16)

Exemples de projets

  1. Amélioration de RAG

    • Hypothèse: Utiliser des embeddings multi-hop améliore la retrieval
    • Expériences: Comparer différentes stratégies de retrieval
    • Métriques: Recall@K, MRR, qualité génération (BLEU, ROUGE)
  2. PEFT pour domaines spécialisés

    • Hypothèse: LoRA avec rank adaptatif selon les layers
    • Expériences: Fine-tuning sur corpus médical/légal
    • Métriques: Accuracy, F1, coût compute
  3. Agents multi-tâches

    • Hypothèse: Architecture modulaire > monolithique
    • Expériences: Comparer différentes architectures d’agents
    • Métriques: Task success rate, efficacité (steps), robustesse

Structure de rapport

# [Titre du projet]

## 1. Introduction (2 pages) {#1-introduction-2-pages}
- Contexte et motivation
- Question de recherche
- Contributions

## 2. Background (3 pages) {#2-background-3-pages}
- État de l'art
- Fondations théoriques
- Lacunes identifiées

## 3. Méthode (4 pages) {#3-méthode-4-pages}
- Approche proposée
- Architecture/algorithme détaillé
- Complexité computationnelle

## 4. Expériences (5 pages) {#4-expériences-5-pages}
- Setup expérimental
- Datasets et métriques
- Baselines
- Résultats principaux
- Analyses d'ablation

## 5. Discussion (2 pages) {#5-discussion-2-pages}
- Interprétation des résultats
- Limitations
- Travaux futurs

## 6. Conclusion (1 page) {#6-conclusion-1-page}

## Annexes {#annexes}
- Hyperparamètres
- Résultats supplémentaires
- Code snippets

Ressources complémentaires

Cours et tutoriels avancés

MOOCs recommandés

  • Stanford CS224N: Natural Language Processing with Deep Learning
  • Stanford CS25: Transformers United
  • Fast.ai: Practical Deep Learning for Coders (Part 2)
  • Hugging Face Course (Advanced)

Chaînes YouTube

  • Yannic Kilcher (paper reviews)
  • AI Coffee Break with Letitia
  • Two Minute Papers
  • Stanford HAI

Conférences majeures

Tier 1 (Top venues)

  • NeurIPS (Neural Information Processing Systems)
  • ICML (International Conference on Machine Learning)
  • ICLR (International Conference on Learning Representations)
  • ACL (Association for Computational Linguistics)
  • EMNLP (Empirical Methods in NLP)
  • CVPR (Computer Vision and Pattern Recognition)

Tier 2 (Strong venues)

  • NAACL, EACL (Regional ACL)
  • AAAI, IJCAI (General AI)
  • AISTATS (Statistics and ML)

Journaux

  • JMLR (Journal of Machine Learning Research)
  • TACL (Transactions of the ACL)
  • Nature Machine Intelligence
  • IEEE TPAMI

Outils de recherche

Suivi de la littérature

  • ArXiv Sanity (Karpathy)
  • Papers with Code
  • Connected Papers
  • Semantic Scholar

Expérimentation

  • Weights & Biases
  • MLflow
  • Neptune.ai
  • Comet.ml

Compute

  • Google Colab Pro
  • Paperspace Gradient
  • Lambda Labs
  • Academic cluster (demander accès)

Prochaines étapes

Félicitations pour avoir terminé ce parcours avancé !

Vous êtes maintenant équipé pour :

  • Comprendre et implémenter les architectures state-of-the-art
  • Mener des expériences rigoureuses et reproductibles
  • Contribuer à la recherche en IA
  • Publier dans des conférences et journals

Pour continuer votre carrière de chercheur

Doctorat

  • Identifier des labs travaillant sur vos thématiques
  • Contacter des professeurs (avec proposition de recherche)
  • Préparer un portfolio (papers, code, projets)

Postdoc / Research Scientist

  • Publier régulièrement
  • Développer une expertise de niche
  • Collaborer avec la communauté
  • Reviewer pour des conférences

Industrie R&D

  • Labs de recherche (OpenAI, Anthropic, Google Brain, Meta AI, etc.)
  • Startups innovantes
  • Équilibrer recherche fondamentale et applications

Rester à jour

Lectures hebdomadaires

  • 5-10 papers par semaine (survol rapide)
  • 1-2 papers par semaine (lecture approfondie)
  • Implémenter 1 paper par mois

Participation communautaire

  • Contribuer à l’open-source
  • Présenter en conférences
  • Organiser reading groups
  • Mentorer juniors