Logo de fravelz

Notas de Hacking

Fravelz

Threading en Python


1) ¿Qué es threading? {#qué-es-threading}

threading es el módulo estándar de Python para concurrencia mediante hilos (threads). Permite ejecutar varias tareas aparentemente al mismo tiempo dentro del mismo proceso. Es ideal para operaciones I/O-bound (espera de red, disco, etc.). Importarlo:

import threading

2) Conceptos clave {#conceptos-clave}

  • Thread (hilo): unidad de ejecución.
  • Daemon thread: hilo que no impide que el proceso termine; se mata cuando el programa principal acaba.
  • GIL (Global Interpreter Lock): en CPython, impide que varios hilos ejecuten bytecode Python al mismo tiempo — eso limita el paralelismo real para tareas CPU-bound.
  • Race condition: condición de carrera cuando varios hilos acceden/actualizan recursos compartidos sin sincronización.
  • Thread-safe: código que puede ejecutarse en varios hilos sin errores por concurrencia.

3) Crear y usar hilos — ejemplos básicos {#crear-y-usar-hilos-ejemplos-básicos}

a) Usando Thread con target {#a-usando-thread-con-target}

import threading

def worker(n):
print(f"Worker {n} empieza")
time.sleep(1)
print(f"Worker {n} termina")

threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,), name=f"worker-{i}")
t.start()
threads.append(t)

for t in threads:
t.join()  # esperar que termine cada hilo

print("Todos terminados")

b) Subclassing lhread {#b-subclassing-lhread}

import threading, time

class MiHilo(threading.Thread):
def __init__(self, n):
super().__init__(name=f"MiHilo-{n}")
self.n = n

def run(self):
print(f"{self.name} comenzar")
time.sleep(1)
print(f"{self.name} terminar")

h = MiHilo(1)
h.start()
h.join()

4) Daemon threads y uso correcto {#daemon-threads-y-uso-correcto}

t = threading.Thread(target=worker, args=(1,), daemon=True)
t.start()
# Si el main thread termina, el hilo daemon se termina abruptamente.

Usa daemon para tareas de fondo opcionales (logs en tiempo real, watchers). Para trabajo crítico, no uses daemon; asegúrate de join().


5) Sincronización — primitivas importantes {#sincronización-primitivas-importantes}

Lock (mutual exclusion) {#lock-mutual-exclusion}

lock = threading.Lock()
with lock:
# región crítica pass

RLock (reentrant lock) — permite re-entradas desde mismo hilo {#rlock-reentrant-lock-permite-re-entradas-desde-mismo-hilo}

rlock = threading.RLock()

Event — señalización simple (flag) {#event-señalización-simple-flag}

event = threading.Event()

# hilo espera:
event.wait()  # bloquea hasta que event.set()
# hilo que despierta:
event.set()

Condition — esperas con notificación {#condition-esperas-con-notificación}

cond = threading.Condition()
with cond:
cond.wait()  # espera notificación
# otro hilo:
cond.notify()

Semaphore — contador {#semaphore-contador}

sem = threading.Semaphore(3)  # máximo 3 entradas simultáneas
with sem:
# acceso limitado pass

Barrier — sincronizar N hilos en un punto {#barrier-sincronizar-n-hilos-en-un-punto}

bar = threading.Barrier(3)
bar.wait()  # todos los hilos esperan aquí hasta que lleguen los 3

6) Comunicación segura entre hilos: queue.Queue {#comunicación-segura-entre-hilos-queue.queue}

queue.Queue es thread-safe y la forma recomendada para pasar datos entre hilos (producer-consumer).

import threading, queue, time

q = queue.Queue()

def producer():
for i in range(5):
q.put(i)
print("produjo", i)
time.sleep(0.2)
q.put(None)  # sentinel para indicar fin

def consumer():
while True:
item = q.get()
if item is None:
break
print("consumió", item)
q.task_done()

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); q.join()

7) Manejo de excepciones en hilos {#manejo-de-excepciones-en-hilos}

Las excepciones en Thread no se propagan al hilo principal. Opciones:

  • Usar concurrent.futures.ThreadPoolExecutor para obtener Future y atrapar excepciones.
  • Capturar y guardar excepciones en el propio hilo y consultarlas después. Ejemplo con ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor

def trabajo(x):
if x == 3:
raise ValueError("boom")
return x*2

with ThreadPoolExecutor(max_workers=3) as ex:
futures = [ex.submit(trabajo, i) for i in range(5)]
for f in futures:
try:
print(f.result())
except Exception as e:
print("error en hilo:", e)

8) ThreadPoolExecutor (alta abstracción) {#threadpoolexecutor-alta-abstracción}

Más cómodo que crear hilos manualmente.

from concurrent.futures import ThreadPoolExecutor, as_completed

def tarea(n):
return n*n

with ThreadPoolExecutor(max_workers=4) as pool:
futures = [pool.submit(tarea, i) for i in range(10)]
for f in as_completed(futures):
print(f.result())

9) GIL — cuándo usar threading vs multiprocessing {#gil-cuándo-usar-threading-vs-multiprocessing}

  • I/O-bound: usa threading (o asyncio) — hilos dan mejoras reales (espera de I/O libera GIL).
  • CPU-bound: threading no escala por GIL; usa multiprocessing (procesos) o extensiones nativas (numpy, C) que sueltan GIL. Explicación corta: el GIL permite que solo un hilo ejecute bytecode Python simultáneamente; por eso multiples hilos no aceleran cálculos puros en CPython.

10) Operaciones atómicas y seguridad {#operaciones-atómicas-y-seguridad}

  • Algunos objetos y operaciones son atómicas en CPython (por ejemplo, asignación simple de variable, operaciones sobre tipos integrales?) — no confíes en ello.
  • Ejemplo inseguro (race):
# NO usar sin lock
counter = 0
def incr():
global counter
for _ in range(10000):
counter += 1  # no es atómico: leer-modificar-escribir

Siempre protege con Lock sí hay acceso concurrente.


11) Cancelación y parada de hilos {#cancelación-y-parada-de-hilos}

No existe Thread.kill() seguro. Patrones para parar:

  • Usar threading.Event() como bandera de parada:
stop_event = threading.Event()

def worker():
while not stop_event.is_set():
# trabajo pass
# detener:
stop_event.set()
  • Usar sentinels en queue (None).

12) Thread-local storage {#thread-local-storage}

Datos separados por hilo:

import threading
local = threading.local()
def worker(val):
local.x = val
print(local.x)

t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))
t1.start(); t2.start()

Cada hilo ve su propio local.x.


13) Debugging y utilidades {#debugging-y-utilidades}

  • threading.enumerate() → lista hilos activos.
  • threading.active_count() → cuenta.
  • threading.current_thread().name → nombre actual.
  • Poner logs (no prints) con logging y threadName en el formato para seguir hilos.
  • Para debug avanzado: faulthandler.dump_traceback_later() o threading.settrace() (poco común).

Ejemplo de logging:

import logging, threading, time
logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
def worker():
logging.info("start")
time.sleep(1)
logging.info("end")

t = threading.Thread(target=worker, name="hilo-1")
t.start(); t.join()

14) Buenas prácticas {#buenas-prácticas}

  • Para I/O concurrency, prefiere ThreadPoolExecutor o asyncio según el caso.
  • Evita variables globales mutables; usa queue.Queue y Locks.
  • No uses daemon=True para tareas que deben terminar correctamente.
  • Mantén regiones críticas lo más cortas posible (reduce contención).
  • Sí necesitas paralelismo real para CPU-bound, usa multiprocessing o librerías que suelten el GIL.
  • Añade timeouts a join() y bloqueos (lock.acquire(timeout=…)) sí corres riesgos de deadlock.

15) Ejemplos prácticos (útiles) {#ejemplos-prácticos-útiles}

a) Producer-consumer con Queue (resumen) {#a-producer-consumer-con-queue-resumen}

Ya mostrado en la sección de Queue. Es el patrón más útil y seguro para comunicación.

b) Pool de threads simple con Thread y Queue {#b-pool-de-threads-simple-con-thread-y-queue}

import threading, queue, time

def worker(q):
while True:
fn, args = q.get()
if fn is None:
break
try:
fn(*args)
finally:
q.task_done()

q = queue.Queue()
threads = []
for _ in range(4):
t = threading.Thread(target=worker, args=(q,))
t.start()
threads.append(t)

# Encolar tareas
for i in range(10):
q.put((print, (f"task {i}",)))

q.join()
# parar hilos
for _ in threads:
q.put((None, None))
for t in threads:
t.join()

c) Uso real con requests (I/O-bound) {#c-uso-real-con-requests-io-bound}

import requests
from concurrent.futures import ThreadPoolExecutor

urls = ["https://example.com"]*10

def fetch(url):
r = requests.get(url)
return len(r.content)

with ThreadPoolExecutor(max_workers=5) as ex:
results = list(ex.map(fetch, urls))
print(results)

16) Limitaciones y alternativas {#limitaciones-y-alternativas}

  • threading no es la mejor opción para CPU-bound por el GIL.
  • Alternativas:
  • multiprocessing — procesos (paralelismo real).
  • asyncio — concurrencia en un solo hilo usando corutinas (muy eficiente para I/O con muchas conexiones).
  • concurrent.futures.ProcessPoolExecutor — API parecida a ThreadPool pero con procesos.
---

17) Resumen {#resumen}

  • Usa threading para I/O-bound.
  • Protege recursos compartidos con Lock, RLock, o usa Queue.
  • Para excepciones y manejo fácil, usa ThreadPoolExecutor.
  • No hay forma segura de matar hilos; implementa mecanismo cooperativo (Event/sentinels).
  • Sí necesitas paralelismo CPU puro, usa multiprocessing.