首页
社区
课程
招聘
[原创] 纯小白向从零开始手写大模型
发表于: 2025-1-30 00:09 4556

[原创] 纯小白向从零开始手写大模型

2025-1-30 00:09
4556

因为是面向小白,实践向的,解释相关理论的会再发篇文章。

Q:我一点AI都不懂怎么办?

A:没事作者也是面向gpt编程,纯小白也可以尝试^_^

Q:为什么会想自己写一个大模型?现在的大模型已经很成熟了我为什么要自己写一个?

A:一个是最近的热点deepseek,感觉AI很有意思。但主要是觉得自己写一个很酷()事情都是从零到一的,我认为自己先写一个完全属于自己的模型,即使它一直报错,即使它回答的问题一直不对,在写完的那一刻的成就感也是无可比拟的。有成就感,有兴趣,才能一步步深入地学习了解^_^

python版本得高一点,我用的3.12,3.8不行。

如果老报错版本问题就直接下(没报错就用requirements.txt)

这里面的url是已经去重了的,正常流程如下

将抓取的 HTML 页面存储在 scraped 文件夹中,并压缩存档。

等挺久的,挂着睡觉了。默认它done了就是好了()

1738029502506.png

1738029524605.png

把extract_text.py里的save_parsed_file改成如下

1738048415160.png

从 HTML 中提取出文本内容并保存为 .txt 文件.

--如果中间有报错重新来的话,记得把原来提取的文件删掉,文件夹在scraped里

更改tokenize_text.py

1738067274379.png

下面的文件层级关系如下

1738164364319.png

用于处理数据集

实现 GPT 模型

用于实现 GPT 模型的一个基本组件——Transformer 块。

这是推理脚本,用于加载训练好的模型并进行推理(生成预测)

然后运行顺序:

1738165457988.png

1738165422521.png

精确的模型需要深入的理论和庞大的数据训练。虽然很人机,并且一点也不智能,但是已经向未知的领域迈出了第一步不是吗?

pip install numpy tqdm matplotlib
pip install numpy tqdm matplotlib
pip install numpy tqdm matplotlib
pip install numpy tqdm matplotlib
git clone https://github.com/JCPETERSON/OpenwebText.git
cd OpenwebText
git clone https://github.com/JCPETERSON/OpenwebText.git
cd OpenwebText
pip install -r requirements.txt
pip install -r requirements.txt
pip install beautifulsoup4 certifi chardet cssselect feedfinder2 feedparser htmlmin idna jieba3k lxml newspaper3k nltk numpy pandas pillow python-dateutil pytorch-pretrained-bert pytz pyyaml recordtype requests-file requests singledispatch six soupsieve spacy tinysegmenter tldextract tqdm urllib3 urlparse2 pycurl pebble chardet transformers
pip install beautifulsoup4 certifi chardet cssselect feedfinder2 feedparser htmlmin idna jieba3k lxml newspaper3k nltk numpy pandas pillow python-dateutil pytorch-pretrained-bert pytz pyyaml recordtype requests-file requests singledispatch six soupsieve spacy tinysegmenter tldextract tqdm urllib3 urlparse2 pycurl pebble chardet transformers
提取 URL
python extract_urls.py --single_file pushshift_dumps/RS_v2_2005-06.xz
 
想提取一个时间范围内的 URL
python extract_urls.py --year_start 2016 --year_end 2018
 
去重 URL
python deduplicate_urls.py --input_dir url_dumps
提取 URL
python extract_urls.py --single_file pushshift_dumps/RS_v2_2005-06.xz
 
想提取一个时间范围内的 URL
python extract_urls.py --year_start 2016 --year_end 2018
 
去重 URL
python deduplicate_urls.py --input_dir url_dumps
python312 download.py D:\Tools\openwebtext\URLs\RS_2011-01.bz2.deduped.txt --n_procs 100 --scraper raw --chunk_size 100000 --compress --timeout 30
python312 download.py D:\Tools\openwebtext\URLs\RS_2011-01.bz2.deduped.txt --n_procs 100 --scraper raw --chunk_size 100000 --compress --timeout 30
pip install --upgrade newspaper3k
pip install --upgrade newspaper3k
def save_parsed_file(filename, text, out_dir):
    # 获取文件的完整路径
    file_path = os.path.join(out_dir, filename)
     
    # 确保目录存在,如果不存在则创建
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
     
    # 写入文件
    with open(file_path, 'w', encoding='utf-8') as handle:
        handle.write(text)
def save_parsed_file(filename, text, out_dir):
    # 获取文件的完整路径
    file_path = os.path.join(out_dir, filename)
     
    # 确保目录存在,如果不存在则创建
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
     
    # 写入文件
    with open(file_path, 'w', encoding='utf-8') as handle:
        handle.write(text)
python312 extract_text.py --html_archive scraped/RS_2011-01-1_data.xz --n_procs 100
python312 extract_text.py --html_archive scraped/RS_2011-01-1_data.xz --n_procs 100
python -m spacy download en_core_web_sm
python -m spacy download en_core_web_sm
import spacy
import io
import argparse
import glob
import os
import tqdm
from multiprocessing import Pool
from functools import partial
import chardet
 
def detect_encoding(file_path):
    """检测文件的实际编码"""
    with open(file_path, 'rb') as f:
        raw_data = f.read(1024# 读取文件的前 1KB 数据
    result = chardet.detect(raw_data)
    return result['encoding'] or 'utf-8'  # 如果检测失败,默认返回 'utf-8'
 
 
 
def save_tokenized_text(output_dir, filename, text):
    # 构建完整输出路径
    text_file = os.path.join(output_dir, filename)
 
    # 确保目标目录存在
    os.makedirs(os.path.dirname(text_file), exist_ok=True)
 
    # 保存文件
    with io.open(text_file, 'w', encoding='utf-8') as fo:
        fo.write(text)
 
def tokenizeSpacy(args):
    nlp = spacy.load("en_core_web_sm"# 加载 spaCy 模型
    extraction_file_paths = glob.glob(args.input_glob)
 
    for extraction_file_path in extraction_file_paths:
        path, filename = os.path.split(extraction_file_path)
        text_file = os.path.join(
            args.output_dir, filename.replace('.txt', '.tokenized.txt'))
 
        # 确保输出目录存在
        os.makedirs(os.path.dirname(text_file), exist_ok=True)
 
        # 检测文件编码
        file_encoding = detect_encoding(extraction_file_path)
 
        try:
            # 打开输入文件和输出文件
            with io.open(extraction_file_path, 'r', encoding=file_encoding) as fi, \
                    io.open(text_file, 'w', encoding='utf-8') as fo:
 
                omitted_line_count = 0
                for line in fi:
                    if len(line.strip()) > 0# 忽略空行
                        doc = nlp(line)
                        fo.write(' '.join([x.text for x in doc]) + '\n')
                    else:
                        omitted_line_count += 1
 
            print(f'Omitted {omitted_line_count} empty lines from {filename}')
        except UnicodeDecodeError:
            print(f"Failed to decode {extraction_file_path} with encoding {file_encoding}. Skipping this file.")
 
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_glob', type=str, default='*.txt')
    parser.add_argument('--output_dir', type=str, default='tokenized')
    parser.add_argument('--tokenizer', type=str, default='spacy', choices=['spacy', 'gpt2'])
    parser.add_argument('--combine', type=int, default=1e8, help="min tokens per file in gpt2 mode")
    parser.add_argument('--file_bs', type=int, default=10000, help="files per batch in gpt2 mode")
 
    # 解析命令行参数
    args = parser.parse_args()
 
    # 确保输出目录存在
    os.makedirs(args.output_dir, exist_ok=True)
 
    # 根据 tokenizer 选择执行的函数
    if args.tokenizer == 'spacy':
        tokenizeSpacy(args)
    else:
        print("GPT-2 tokenizer is not implemented in this version.")
import spacy
import io
import argparse
import glob
import os
import tqdm
from multiprocessing import Pool
from functools import partial
import chardet
 
def detect_encoding(file_path):
    """检测文件的实际编码"""
    with open(file_path, 'rb') as f:
        raw_data = f.read(1024# 读取文件的前 1KB 数据
    result = chardet.detect(raw_data)
    return result['encoding'] or 'utf-8'  # 如果检测失败,默认返回 'utf-8'
 
 
 
def save_tokenized_text(output_dir, filename, text):
    # 构建完整输出路径
    text_file = os.path.join(output_dir, filename)
 
    # 确保目标目录存在
    os.makedirs(os.path.dirname(text_file), exist_ok=True)
 
    # 保存文件
    with io.open(text_file, 'w', encoding='utf-8') as fo:
        fo.write(text)
 
def tokenizeSpacy(args):
    nlp = spacy.load("en_core_web_sm"# 加载 spaCy 模型
    extraction_file_paths = glob.glob(args.input_glob)
 
    for extraction_file_path in extraction_file_paths:
        path, filename = os.path.split(extraction_file_path)
        text_file = os.path.join(
            args.output_dir, filename.replace('.txt', '.tokenized.txt'))
 
        # 确保输出目录存在
        os.makedirs(os.path.dirname(text_file), exist_ok=True)
 
        # 检测文件编码
        file_encoding = detect_encoding(extraction_file_path)
 
        try:
            # 打开输入文件和输出文件
            with io.open(extraction_file_path, 'r', encoding=file_encoding) as fi, \
                    io.open(text_file, 'w', encoding='utf-8') as fo:
 
                omitted_line_count = 0
                for line in fi:
                    if len(line.strip()) > 0# 忽略空行
                        doc = nlp(line)
                        fo.write(' '.join([x.text for x in doc]) + '\n')
                    else:
                        omitted_line_count += 1
 
            print(f'Omitted {omitted_line_count} empty lines from {filename}')
        except UnicodeDecodeError:
            print(f"Failed to decode {extraction_file_path} with encoding {file_encoding}. Skipping this file.")
 
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_glob', type=str, default='*.txt')
    parser.add_argument('--output_dir', type=str, default='tokenized')
    parser.add_argument('--tokenizer', type=str, default='spacy', choices=['spacy', 'gpt2'])
    parser.add_argument('--combine', type=int, default=1e8, help="min tokens per file in gpt2 mode")
    parser.add_argument('--file_bs', type=int, default=10000, help="files per batch in gpt2 mode")
 
    # 解析命令行参数
    args = parser.parse_args()
 
    # 确保输出目录存在
    os.makedirs(args.output_dir, exist_ok=True)
 
    # 根据 tokenizer 选择执行的函数
    if args.tokenizer == 'spacy':
        tokenizeSpacy(args)
    else:
        print("GPT-2 tokenizer is not implemented in this version.")
python312 tokenize_text.py --input_glob "parsed/RS_2011-01/*.txt" --output_dir tokenized
python312 tokenize_text.py --input_glob "parsed/RS_2011-01/*.txt" --output_dir tokenized
gpt_project/
├── model/
│   ├── gpt.py
│   ├── transformer_block.py
├── data/
│   ├── dataset.py
│   ├── tokenizer.py  # 可选
│   ├── tokenized/  # 存放所有分词好的 .txt 文件
├── train/
│   ├── train.py  # 训练代码
|——train_model/
|
├── inference.py  # 生成文本
gpt_project/
├── model/
│   ├── gpt.py
│   ├── transformer_block.py
├── data/
│   ├── dataset.py
│   ├── tokenizer.py  # 可选
│   ├── tokenized/  # 存放所有分词好的 .txt 文件
├── train/
│   ├── train.py  # 训练代码
|——train_model/
|
├── inference.py  # 生成文本
#dataset.py
import torch
import os
from collections import Counter
from transformers import AutoTokenizer
 
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, directory_path, seq_length, tokenizer):
        self.seq_length = seq_length
        self.tokenizer = tokenizer
        self.data = []
        self.vocab = {}
        self.inverse_vocab = {}
         
        # 第一步:统计所有单词的频率
        word_counter = Counter()
         
        # 遍历 directory_path 目录中的所有 .tokenized.txt 文件
        for filename in os.listdir(directory_path):
            if filename.endswith(".tokenized.txt"):
                file_path = os.path.join(directory_path, filename)
                with open(file_path, "r", encoding="utf-8") as f:
                    words = f.read().split()
                    word_counter.update(words)
         
        # 第二步:创建词汇表,给每个单词分配一个 ID
        self.vocab = {word: idx + 1 for idx, (word, _) in enumerate(word_counter.items())}
        self.vocab['<pad>'] = 0  # 为 padding 添加一个 ID
        self.vocab['<unk>'] = len(self.vocab)  # 为未知单词添加一个 ID
         
        # 创建逆词汇表
        self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
         
        # 第三步:将文本转换为 token ID
        for filename in os.listdir(directory_path):
            if filename.endswith(".tokenized.txt"):
                file_path = os.path.join(directory_path, filename)
                with open(file_path, "r", encoding="utf-8") as f:
                    words = f.read().split()
                    # 将每个单词转换为 token ID,如果不在词汇表中则使用 <unk>
                    token_ids = [self.vocab.get(word, self.vocab['<unk>']) for word in words]
                    self.data.append(token_ids)
         
        # 将数据转化为训练所需的序列形式
        self.data = [self.pad_sequence(seq) for seq in self.data]
 
    def __len__(self):
        return len(self.data)
 
    def __getitem__(self, idx):
        input_text = self.data[idx]
         
        # 编码输入文本
        input_ids = torch.tensor(input_text)  # 转换为 tensor
        target_ids = input_ids.clone()  # 使用输入作为目标
        return input_ids, target_ids
 
    def pad_sequence(self, seq):
        """填充序列到 seq_length"""
        if len(seq) < self.seq_length:
            # 使用 pad token 填充
            seq += [self.vocab['<pad>']] * (self.seq_length - len(seq))
        else:
            # 如果超出 seq_length,则截断
            seq = seq[:self.seq_length]
        return seq
 
 
 
    '''
    def __getitem__(self, idx):
        input_ids = torch.tensor(self.data[idx], dtype=torch.long)
         
        # 如果输入序列长度小于 seq_length,进行填充
        padding_length = self.seq_length - input_ids.size(0)
        if padding_length > 0:
            padding = torch.tensor([self.vocab['<pad>']] * padding_length, dtype=torch.long)
            input_ids = torch.cat([input_ids, padding], dim=0)
         
        # 设置 target_ids 为 input_ids 的下一个 token(即语言模型的训练目标)
        target_ids = input_ids[1:].clone()
        target_ids = torch.cat([target_ids, torch.tensor([self.vocab['<pad>']], dtype=torch.long)])
 
        return input_ids, target_ids
'''
#dataset.py
import torch
import os
from collections import Counter
from transformers import AutoTokenizer
 
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, directory_path, seq_length, tokenizer):
        self.seq_length = seq_length
        self.tokenizer = tokenizer
        self.data = []
        self.vocab = {}
        self.inverse_vocab = {}
         
        # 第一步:统计所有单词的频率
        word_counter = Counter()
         
        # 遍历 directory_path 目录中的所有 .tokenized.txt 文件
        for filename in os.listdir(directory_path):
            if filename.endswith(".tokenized.txt"):
                file_path = os.path.join(directory_path, filename)
                with open(file_path, "r", encoding="utf-8") as f:
                    words = f.read().split()
                    word_counter.update(words)
         
        # 第二步:创建词汇表,给每个单词分配一个 ID
        self.vocab = {word: idx + 1 for idx, (word, _) in enumerate(word_counter.items())}
        self.vocab['<pad>'] = 0  # 为 padding 添加一个 ID
        self.vocab['<unk>'] = len(self.vocab)  # 为未知单词添加一个 ID
         
        # 创建逆词汇表
        self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
         
        # 第三步:将文本转换为 token ID
        for filename in os.listdir(directory_path):
            if filename.endswith(".tokenized.txt"):
                file_path = os.path.join(directory_path, filename)
                with open(file_path, "r", encoding="utf-8") as f:
                    words = f.read().split()
                    # 将每个单词转换为 token ID,如果不在词汇表中则使用 <unk>
                    token_ids = [self.vocab.get(word, self.vocab['<unk>']) for word in words]
                    self.data.append(token_ids)
         
        # 将数据转化为训练所需的序列形式
        self.data = [self.pad_sequence(seq) for seq in self.data]
 
    def __len__(self):
        return len(self.data)
 
    def __getitem__(self, idx):
        input_text = self.data[idx]
         
        # 编码输入文本
        input_ids = torch.tensor(input_text)  # 转换为 tensor
        target_ids = input_ids.clone()  # 使用输入作为目标
        return input_ids, target_ids
 
    def pad_sequence(self, seq):
        """填充序列到 seq_length"""
        if len(seq) < self.seq_length:
            # 使用 pad token 填充
            seq += [self.vocab['<pad>']] * (self.seq_length - len(seq))
        else:
            # 如果超出 seq_length,则截断
            seq = seq[:self.seq_length]
        return seq
 
 
 
    '''
    def __getitem__(self, idx):
        input_ids = torch.tensor(self.data[idx], dtype=torch.long)
         
        # 如果输入序列长度小于 seq_length,进行填充
        padding_length = self.seq_length - input_ids.size(0)
        if padding_length > 0:
            padding = torch.tensor([self.vocab['<pad>']] * padding_length, dtype=torch.long)
            input_ids = torch.cat([input_ids, padding], dim=0)
         
        # 设置 target_ids 为 input_ids 的下一个 token(即语言模型的训练目标)
        target_ids = input_ids[1:].clone()
        target_ids = torch.cat([target_ids, torch.tensor([self.vocab['<pad>']], dtype=torch.long)])
 
        return input_ids, target_ids
'''
# gpt.py
import torch
import torch.nn as nn
import os
import sys
import torch.nn.functional as F
 
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)
 
from model.transformer_block import TransformerBlock
 
class GPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, num_heads, hidden_dim, max_length):
        super(GPT, self).__init__()
        self.hidden_dim = hidden_dim  # 添加 hidden_dim 变量
 
 
 
class GPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, num_layers, max_length):
        super(GPT, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)
        self.blocks = nn.ModuleList([
            TransformerBlock(embed_size, num_heads, embed_size * 4)
            for _ in range(num_layers)
        ])
        self.fc_out = nn.Linear(embed_size, vocab_size)
 
    def forward(self, x):
        batch_size, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(batch_size, seq_length)
        x = self.embedding(x) + self.position_embedding(positions)
        for block in self.blocks:
            x = block(x)
        return self.fc_out(x)
     
    def generate(self, input_ids, max_length=100, temperature=1.0, top_k=50):
        self.eval()  # 设置为评估模式
         
        # 获取初始输出
        generated_ids = input_ids
        for _ in range(max_length):
            outputs = self(generated_ids)
            logits = outputs  # 假设模型的输出是 logits
            logits = logits[:, -1, :]  # 只关注最新生成的 token
 
            # 应用温度采样
            logits = logits / temperature
             
            # Top-K 采样
            if top_k > 0:
                top_k_values, top_k_indices = torch.topk(logits, top_k)
                top_k_probs = F.softmax(top_k_values, dim=-1)
                next_token = torch.multinomial(top_k_probs, 1)
                next_token = top_k_indices.gather(-1, next_token)
            else:
                # 默认采样
                probs = F.softmax(logits, dim=-1)
                next_token = torch.multinomial(probs, 1)
 
            # 添加生成的 token 到输入序列
            generated_ids = torch.cat([generated_ids, next_token], dim=-1)
 
        return generated_ids
    
# gpt.py
import torch
import torch.nn as nn
import os
import sys
import torch.nn.functional as F
 
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
print("Adding to sys.path:", project_root)
sys.path.append(project_root)
 
from model.transformer_block import TransformerBlock
 
class GPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, num_heads, hidden_dim, max_length):
        super(GPT, self).__init__()
        self.hidden_dim = hidden_dim  # 添加 hidden_dim 变量
 
 
 
class GPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, num_layers, max_length):

[招生]科锐逆向工程师培训(2025年3月11日实地,远程教学同时开班, 第52期)!

最后于 2025-1-30 17:24 被道总行走江离编辑 ,原因:
收藏
免费 1
支持
分享
最新回复 (4)
雪    币: 38
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
2
6
2025-2-4 17:24
0
雪    币: 29
活跃值: (75)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
3
受教了
2025-2-4 19:56
0
雪    币: 3430
活跃值: (5744)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
4
哈哈,看的我不知道说什么,不过很好玩!
2025-2-5 03:06
0
雪    币: 425
活跃值: (55)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
5
zylrocket 哈哈,看的我不知道说什么,不过很好玩!
这几天学了一点感觉其实还是直接本地部署现有的方便,不过自己搭一个后面理解一下方便入门,后面改模型什么更容易上手
2025-2-6 03:59
0
游客
登录 | 注册 方可回帖
返回