My way to 1B digits of pi

what about a little info? i started calculating 28/10/2025 thanks to chatgpt i made code for calculating pi witch i ran on my server IBM SYSTEM X3559 M4 for 10 days i had 16gigs of ram + 45gigs swap CPU i used is intel xeon E5-2620 the program i used was made in python (more info in the code section) right now i plan to go even further with Y-cruncher im not gonna do any world records but it's fun to see how it works etc..

digits of pi (953 MB) Log from terminal (358KB)

How I done it


#!/usr/bin/env python3
# pi_parallel_live.py
# Parallel Pi computation with live staged logs:
#  - many small tasks to keep all cores busy
#  - parallel pairwise merge rounds with step logs
#  - digit writing progress
#  - full console dump saved at end

import sys, math, time, multiprocessing as mp
from math import ceil

# ---------------------------
# CONFIG
# ---------------------------
DIGITS = 100_000_000_0        # adjust to what your RAM can handle (e.g. 50_000_000)
TASK_MULTIPLIER = 30       # tasks per core (higher -> more, smaller tasks)
N_CORES = 12               # number of worker processes to use
CHUNK = 100_000            # digits per chunk when writing file
OUTFILE = "pi_digits.txt"
LOGFILE = "pi_live_log.txt"
DEBUG_TERM_PRINT_EVERY = 10000   # print every Nth term (base-case) to reduce spam
# ---------------------------

# ---------------------------
# Logger to capture console output
# ---------------------------
class LiveLogger:
    def __init__(self):
        self.buffer = []
    def write(self, msg):
        # Ensure prints still appear live on stdout
        sys.__stdout__.write(msg)
        self.buffer.append(msg)
    def flush(self):
        sys.__stdout__.flush()
    def dump_to_file(self, filename):
        with open(filename, "w", encoding="utf-8") as f:
            f.writelines(self.buffer)

# ---------------------------
# gmpy2 if available
# ---------------------------
try:
    import gmpy2
    from gmpy2 import mpz, isqrt
    GMP = True
    # precision context is irrelevant for integers but keep default
    print("gmpy2 detected: using faster big-int ops.")
except Exception:
    GMP = False
    print("gmpy2 not available, falling back to Python ints.")
    isqrt = math.isqrt  # alias

def int_isqrt(n):
    return isqrt(n) if GMP else math.isqrt(n)

# ---------------------------
# Binary splitting (top-level for multiprocessing)
# bs returns (P, Q, T) triple for range [a,b)
# We'll make worker return (task_id, (P,Q,T))
# ---------------------------
def bs(a, b, progress_queue=None, depth=0):
    # base case
    if b - a == 1:
        k = a
        if k == 0:
            P, Q, T = 1, 1, 13591409
        else:
            P = (6*k - 5) * (2*k - 1) * (6*k - 1)
            Q = k * k * k * 640320**3
            T = P * (13591409 + 545140134 * k)
            if (k & 1):
                T = -T
        # occasional debug print for base-case progress (not every base case)
        if k % DEBUG_TERM_PRINT_EVERY == 0:
            print(f"[TERM] k={k}, depth={depth}")
        if progress_queue is not None:
            progress_queue.put(1)
        return (P, Q, T)
    # recursive split
    m = (a + b) // 2
    P1, Q1, T1 = bs(a, m, progress_queue, depth+1)
    P2, Q2, T2 = bs(m, b, progress_queue, depth+1)
    # combine
    P = P1 * P2
    Q = Q1 * Q2
    T = T1 * Q2 + P1 * T2
    return (P, Q, T)

def worker_bs(args):
    # args: (task_id, a, b, progress_queue)
    task_id, a, b, progress_queue = args
    # minimal status
    print(f"[WORKER] task {task_id} computing range {a}-{b} (length {b-a})")
    res = bs(a, b, progress_queue, depth=0)
    print(f"[WORKER] task {task_id} done")
    return (task_id, res)

# ---------------------------
# merge_pair: merge two numeric triples
# Input: (P,Q,T), (P2,Q2,T2) -> returns (Pnew,Qnew,Tnew)
# Implemented top-level for multiprocessing
# ---------------------------
def merge_pair(pair):
    # pair: ((id1, (P1,Q1,T1)), (id2, (P2,Q2,T2))) OR direct numeric tuples depending on call
    # We'll support numeric tuple input: ((P1,Q1,T1), (P2,Q2,T2))
    (P1, Q1, T1), (P2, Q2, T2) = pair
    P = P1 * P2
    Q = Q1 * Q2
    T = T1 * Q2 + P1 * T2
    return (P, Q, T)

# ---------------------------
# progress monitor (top-level)
# Tracks term completions and merge completions
# ---------------------------
def progress_monitor(total_terms, total_merge_ops, term_queue, merge_queue):
    done_terms = 0
    done_merges = 0
    last_percent_terms = -1
    last_percent_merges = -1
    start_time = time.time()
    while done_terms < total_terms or done_merges < total_merge_ops:
        # collect term counts
        while not term_queue.empty():
            try:
                term_queue.get_nowait()
                done_terms += 1
            except:
                break
        pct_terms = int(done_terms / total_terms * 100) if total_terms>0 else 100
        if pct_terms != last_percent_terms:
            elapsed = time.time() - start_time
            eta = int(elapsed / (pct_terms+1e-6) * (100 - pct_terms)) if pct_terms>0 else 0
            print(f"[TERMS] {pct_terms}% complete, ETA ~{eta}s")
            last_percent_terms = pct_terms
        # collect merges
        while not merge_queue.empty():
            try:
                merge_queue.get_nowait()
                done_merges += 1
            except:
                break
        pct_merges = int(done_merges / total_merge_ops * 100) if total_merge_ops>0 else 100
        if pct_merges != last_percent_merges:
            print(f"[MERGE] {pct_merges}% done ({done_merges}/{total_merge_ops})")
            last_percent_merges = pct_merges
        time.sleep(0.5)
    # final
    print("[PROGRESS] All term and merge notifications received.")

# ---------------------------
# Utilities
# ---------------------------
def pairwise(iterable):
    """Yield consecutive pairs from iterable: (a0,a1),(a2,a3),... last single if odd."""
    it = iter(iterable)
    while True:
        try:
            a = next(it)
        except StopIteration:
            break
        try:
            b = next(it)
            yield (a, b)
        except StopIteration:
            yield (a, )
            break

# ---------------------------
# Main compute: create tasks, run workers, parallel merge rounds with logs
# ---------------------------
def compute_pi_parallel_live(digits, n_cores, task_multiplier):
    terms = digits // 14 + 10
    print(f"[START] Target digits={digits}, estimated Chudnovsky terms={terms}")
    # create tasks
    n_tasks = n_cores * task_multiplier
    if n_tasks < n_cores:
        n_tasks = n_cores
    step = (terms + n_tasks - 1) // n_tasks
    ranges = []
    task_id = 0
    for a in range(0, terms, step):
        b = min(a + step, terms)
        ranges.append((task_id, a, b))
        task_id += 1
    n_tasks = len(ranges)
    print(f"[TASKS] Created {n_tasks} tasks (step {step} terms each approx) for {n_cores} cores")

    manager = mp.Manager()
    term_queue = manager.Queue()
    merge_queue = manager.Queue()

    # start progress monitor process
    total_merge_ops_estimate = max(0, n_tasks - 1)  # total merges to combine to one
    monitor = mp.Process(target=progress_monitor, args=(terms, total_merge_ops_estimate, term_queue, merge_queue))
    monitor.start()

    # run worker tasks
    pool = mp.Pool(n_cores)
    worker_args = [(tid, a, b, term_queue) for (tid, a, b) in ranges]
    results = pool.map(worker_bs, worker_args)
    pool.close()
    pool.join()

    # results: list of (task_id, (P,Q,T))
    # sort by task_id to have deterministic order
    results.sort(key=lambda x: x[0])
    print(f"[DONE] All worker tasks finished. Collected {len(results)} partial results.")

    # Prepare list of numeric tuples for merging (and keep labels for logs)
    labeled = [(str(tid), triple) for (tid, triple) in results]  # labeled list: (label, (P,Q,T))
    # We'll merge in rounds. Each round we will create pairs and run merge_pair in parallel.
    round_index = 1
    # create a separate pool for merging (we'll reuse n_cores)
    merge_pool = mp.Pool(n_cores)

    # flatten to list of (label, triple)
    current = labeled  # list of (label, (P,Q,T))
    # total merges count used by progress monitor:
    total_merges = 0
    # We'll compute actual total merges and notify merge_queue as merges complete below.
    # Do pairwise rounds
    while len(current) > 1:
        pairs = []
        labels_for_pairs = []
        singles = []
        it = iter(current)
        while True:
            try:
                a = next(it)
            except StopIteration:
                break
            try:
                b = next(it)
                # a and b are (label, triple)
                pairs.append((a[1], b[1]))  # numeric pairs for merge_pair
                labels_for_pairs.append((a[0], b[0]))
            except StopIteration:
                singles.append(a)
                break
        # number of merges this round:
        merges_this_round = len(pairs)
        total_merges += merges_this_round
        print(f"[MERGE] Round {round_index}: {merges_this_round} parallel merges (singles carried: {len(singles)})")
        # run merges in parallel: returns list of numeric triples
        if merges_this_round > 0:
            merged_numeric = merge_pool.map(merge_pair, pairs)
        else:
            merged_numeric = []
        # build new 'current' list with new labels
        new_current = []
        for i, triple in enumerate(merged_numeric):
            idleft, idright = labels_for_pairs[i]
            newlabel = f"({idleft}+{idright})"
            print(f"[MERGE] Round {round_index}: merged {idleft} + {idright} -> {newlabel}")
            # push a merge notification to monitor queue
            merge_queue.put(1)
            new_current.append((newlabel, triple))
        # carry singles forward
        for s in singles:
            new_current.append(s)
        # next round
        current = new_current
        round_index += 1

    merge_pool.close()
    merge_pool.join()
    monitor.terminate()

    # current should contain single final result
    if not current:
        raise RuntimeError("No merge results found")
    final_label, final_triple = current[0]
    print(f"[MERGE] Final result label: {final_label}")

    # Final sqrt and division to get scaled pi integer
    print("[FINAL] Computing integer sqrt(10005 * 10^(2*digits))")
    t0 = time.time()
    scale = 10 ** digits
    sqrt_term = int_isqrt(10005 * scale * scale)
    t1 = time.time()
    print(f"[FINAL] Integer sqrt done in {t1 - t0:.2f}s")

    print("[FINAL] Final multiplication and division to get pi * 10^digits (this may be heavy)")
    P_final, Q_final, T_final = final_triple
    pi_scaled = (426880 * sqrt_term * Q_final) // T_final

    return pi_scaled

# ---------------------------
# Write pi to file (chunked) with progress logs
# ---------------------------
def write_pi_chunks(pi_int, digits, chunk_size):
    s = str(pi_int).rjust(digits + 1, "0")
    whole = s[:-digits]
    frac = s[-digits:]
    out_path = OUTFILE
    total_chunks = ceil(len(frac) / chunk_size)
    with open(out_path, "w") as f:
        f.write(whole + ".\n")
        for i in range(total_chunks):
            start = i * chunk_size
            end = min(start + chunk_size, len(frac))
            f.write(frac[start:end] + "\n")
            print(f"[DIGITS] wrote chunk {i+1}/{total_chunks}")
    print(f"[DIGITS] Finished writing {len(frac)} fractional digits to {out_path}")

# ---------------------------
# MAIN
# ---------------------------
def main():
    # set logger
    logger = LiveLogger()
    sys.stdout = logger

    # ensure multiprocessing uses fork on Linux for efficiency (optional)
    try:
        mp.set_start_method("fork")
    except RuntimeError:
        pass

    t_start = time.time()
    pi_int = compute_pi_parallel_live(DIGITS, N_CORES, TASK_MULTIPLIER)
    t_mid = time.time()
    print(f"[DONE] Computation done in {t_mid - t_start:.2f}s. Now writing digits to file...")

    write_pi_chunks(pi_int, DIGITS, CHUNK)

    t_end = time.time()
    print(f"[ALL DONE] Total elapsed time: {t_end - t_start:.2f}s")

    # dump full console log
    sys.stdout = sys.__stdout__  # restore stdout so final message prints to console
    logger.dump_to_file(LOGFILE)
    print(f"[LOG] Full live console saved to: {LOGFILE}")

if __name__ == "__main__":
    main()
                

this is the code i used for generating 1B digits of pi this code was made by chatgpt it's using chudnovsky algorythm this code was made to support only linux so can't use it on windows based pc due to windows not using fork also i want to warn you if you want to use this code for big numbers of pi i don't recommed that becouse this code is unoptimalised and can take few hours for thing that Y-Cruncher can do in few mins and there are errors in this code if you compare seme verified pi from internet and my calculation you notice that the pi is wrong yes the calculation is wrong so i cannot call it pi but for me as starter it's at least something if you'll have any other questions please contact me on contact@d0mino.cz .