1. TensorFlow Cheat Sheet
- 1. TensorFlow Cheat Sheet
- 1.1 Quick Reference
- 1.2 Getting Started
- 1.3 Tensors
- 1.4 Variables
- 1.5 Automatic Differentiation (Autograd)
- 1.6 Keras API
- 1.6.1 Model Building
- 1.6.2 Model Building Approaches
- 1.6.3 ResNet-style Skip Connection Example
- 1.6.4 Layers
- 1.6.5 Activation Functions
- 1.6.6 Loss Functions
- 1.6.7 Optimizers
- 1.6.8 Metrics
- 1.6.9 Model Compilation
- 1.6.10 Training
- 1.6.11 Training Pipeline
- 1.6.12 Evaluation
- 1.6.13 Prediction
- 1.6.14 Saving and Loading Models
- 1.6.15 Model Persistence Flow
- 1.6.16 SavedModel Format (Recommended)
- 1.6.17 HDF5 Format
- 1.6.18 Weights Only (Checkpoint Format)
- 1.6.19 Model Architecture Only
- 1.6.20 Custom Objects
- 1.6.21 Checkpoint Management
- 1.6.22 Callbacks
- 1.6.23 Callback Execution Flow
- 1.6.24 Common Callbacks
- 1.6.25 Custom Callback
- 1.6.26 Viewing TensorBoard
- 1.6.27 Regularization
- 1.6.28 Custom Layers
- 1.6.29 Custom Loss Functions
- 1.6.30 Custom Metrics
- 1.6.31 Custom Training Loops
- 1.6.32 Custom Training Flow
- 1.6.33 Basic Custom Training Loop
- 1.6.34 Advanced Custom Training with Regularization
- 1.6.35 Custom Training with Learning Rate Scheduling
- 1.7 Data Input Pipelines (tf.data)
- 1.8 Distributed Training
- 1.9 Data Augmentation
- 1.10 TensorFlow Hub
- 1.11 TensorFlow Lite
- 1.12 TensorFlow Serving
- 1.13 TensorFlow Extended (TFX)
- 1.14 TensorFlow Probability
- 1.15 TensorFlow Datasets (TFDS)
- 1.16 TensorFlow Addons
- 1.17 Eager Execution
- 1.18 tf.function
- 1.19 Custom Training with GradientTape
- 1.20 Complete End-to-End Example
- 1.21 Custom Callbacks
- 1.22 Mixed Precision Training
- 1.23 Profiling
- 1.24 Common Deep Learning Architectures
- 1.25 Best Practices
- 1.26 Common Issues and Debugging
This cheat sheet provides an exhaustive overview of TensorFlow 2.x, covering essential concepts, validated examples, and best practices for efficient deep learning model building, training, evaluation, and deployment.
1.1 Quick Reference
| Task | Code | Use Case |
|---|---|---|
| Create Tensor | tf.constant([[1, 2]]) | Fixed values |
| Create Variable | tf.Variable([1.0, 2.0]) | Trainable parameters |
| Matrix Multiply | tf.matmul(a, b) | Linear transformations |
| Reduce Sum | tf.reduce_sum(tensor, axis=1) | Aggregate along dimension |
| Gradient | tape.gradient(loss, vars) | Backpropagation |
| Dense Layer | Dense(128, activation='relu') | Fully connected layer |
| Conv2D Layer | Conv2D(32, (3,3), activation='relu') | Image feature extraction |
| LSTM Layer | LSTM(64, return_sequences=True) | Sequential data processing |
| Compile Model | model.compile(optimizer='adam', loss='mse') | Setup training |
| Train Model | model.fit(X, y, epochs=10) | Train on data |
| Save Model | model.save('model.h5') | Persist trained model |
| Load Model | tf.keras.models.load_model('model.h5') | Load saved model |
| Predict | model.predict(X_test) | Inference |
| Create Dataset | tf.data.Dataset.from_tensor_slices((X, y)) | Efficient data pipeline |
| Batch Dataset | dataset.batch(32).prefetch(tf.data.AUTOTUNE) | Optimize throughput |
1.1.1 Activation Functions Quick Reference
relu: f(x) = max(0, x) β Most common, fast
sigmoid: f(x) = 1/(1+e^(-x)) β Binary classification output
tanh: f(x) = (e^x-e^(-x))/(e^x+e^(-x)) β Range [-1,1]
softmax: f(x) = e^xi / Ξ£e^xj β Multi-class classification
elu: f(x) = x if x>0 else Ξ±(e^x-1) β Smooth approximation
swish: f(x) = x * sigmoid(x) β Self-gated, smooth
1.1.2 Loss Functions Quick Reference
Binary Classification: binary_crossentropy
Multi-class (one-hot): categorical_crossentropy
Multi-class (integers): sparse_categorical_crossentropy
Regression: mean_squared_error, mean_absolute_error
Ranking/Metric Learning: hinge, triplet_loss
1.1.3 Optimizer Quick Reference
Adam: Adaptive learning, momentum β Default choice
SGD: Simple, stable β Fine-tuning, convergence
RMSprop: Adaptive learning β RNNs, non-stationary
Adagrad: Adaptive per-parameter β Sparse data
1.2 Getting Started
1.2.1 Installation
# Install TensorFlow (CPU and GPU support included)
pip install tensorflow
# Verify installation
python -c "import tensorflow as tf; print(tf.__version__)"
1.2.2 Importing TensorFlow
import tensorflow as tf
import numpy as np
1.2.3 Environment Setup
# Check TensorFlow version
print(f"TensorFlow version: {tf.__version__}")
# Check GPU availability
gpus = tf.config.list_physical_devices('GPU')
print(f"Num GPUs Available: {len(gpus)}")
# Enable memory growth for GPUs (prevents OOM errors)
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print(f"Memory growth enabled for {len(gpus)} GPU(s)")
except RuntimeError as e:
print(e)
# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)
1.2.4 TensorFlow Execution Flow
ββββββββββββββββββββ
β Load Data β
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Preprocess βββββ tf.data pipeline
β & Augment β (map, batch, prefetch)
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Build Model βββββ Sequential, Functional,
β β or Subclassing API
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Compile Model βββββ optimizer, loss, metrics
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Train Model βββββ model.fit() or custom loop
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Evaluate βββββ model.evaluate()
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Predict βββββ model.predict()
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Save Model βββββ SavedModel or HDF5
ββββββββββββββββββββ
1.3 Tensors
1.3.1 Tensor Hierarchy
TensorFlow Tensors
β
ββββββββ΄βββββββ
β β
Constant Variable
(Immutable) (Mutable)
β β
ββββββββ¬βββββββ
β
GPU/CPU Memory
1.3.2 Creating Tensors
# Constant tensors
a = tf.constant([[1, 2], [3, 4]]) # From list
b = tf.zeros((2, 3)) # All zeros
c = tf.ones((3, 2)) # All ones
d = tf.eye(3) # Identity matrix
e = tf.fill((2, 3), 9) # Fill with specific value
# Random tensors
f = tf.random.normal((2, 2), mean=0.0, stddev=1.0) # Normal distribution
g = tf.random.uniform((2, 2), minval=0, maxval=10) # Uniform distribution
h = tf.random.truncated_normal((2, 2)) # Truncated normal
# From NumPy arrays
arr = np.array([[1, 2], [3, 4]])
tensor_from_np = tf.convert_to_tensor(arr)
# Sequences
range_tensor = tf.range(start=0, limit=10, delta=2) # [0, 2, 4, 6, 8]
linspace_tensor = tf.linspace(0.0, 1.0, num=5) # [0.0, 0.25, 0.5, 0.75, 1.0]
# One-hot encoding
labels = tf.constant([0, 1, 2, 0])
one_hot = tf.one_hot(labels, depth=3) # Shape: (4, 3)
1.3.3 Tensor Attributes
tensor = tf.constant([[1, 2], [3, 4]])
print(tensor.shape) # Shape of the tensor
print(tensor.dtype) # Data type of the tensor
print(tensor.device) # Device where the tensor is stored (CPU or GPU)
print(tensor.numpy()) # Convert to a NumPy array
1.3.4 Tensor Operations
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
# Element-wise operations
add_result = a + b # [[6, 8], [10, 12]]
sub_result = a - b # [[-4, -4], [-4, -4]]
mul_result = a * b # [[5, 12], [21, 32]]
div_result = a / b # [[0.2, 0.33], [0.43, 0.5]]
pow_result = tf.pow(a, 2) # [[1, 4], [9, 16]]
# Matrix operations
matmul_result = tf.matmul(a, b) # [[19, 22], [43, 50]]
transpose_result = tf.transpose(a) # [[1, 3], [2, 4]]
determinant = tf.linalg.det(a) # -2.0
inverse = tf.linalg.inv(a) # [[-2, 1], [1.5, -0.5]]
# Shape operations
reshape_result = tf.reshape(a, [1, 4]) # [[1, 2, 3, 4]]
squeeze_result = tf.squeeze([[[1], [2]]]) # [1, 2]
expand_result = tf.expand_dims([1, 2], axis=0) # [[1, 2]]
# Concatenation and stacking
concat_result = tf.concat([a, b], axis=0) # Shape: (4, 2)
stack_result = tf.stack([a, b], axis=0) # Shape: (2, 2, 2)
# Reduction operations
sum_all = tf.reduce_sum(a) # 10.0
sum_axis0 = tf.reduce_sum(a, axis=0) # [4, 6]
mean_all = tf.reduce_mean(a) # 2.5
max_val = tf.reduce_max(a) # 4.0
min_val = tf.reduce_min(a) # 1.0
# Argmax and Argmin
argmax_axis1 = tf.argmax(a, axis=1) # [1, 1] (indices)
argmin_axis0 = tf.argmin(a, axis=0) # [0, 0] (indices)
# Comparison operations
greater = tf.greater(a, 2.0) # [[False, False], [True, True]]
equal = tf.equal(a, b) # [[False, False], [False, False]]
# Clipping
clipped = tf.clip_by_value(a, 2.0, 3.0) # [[2, 2], [3, 3]]
# Casting
float_tensor = tf.cast(tf.constant([1, 2, 3]), tf.float32)
int_tensor = tf.cast(tf.constant([1.5, 2.8]), tf.int32) # [1, 2]
1.3.5 Tensor Operation Flow
Input Tensors (a, b)
β
ββββββββ΄βββββββ
β β
Element-wise Matrix Ops
Operations (matmul)
β β
βββ +, -, *, / β
βββ pow, sqrt β
βββ log, exp β
β Transpose,
β Inverse
β
Reduction Ops
(sum, mean, max)
β
β
Output Tensor
1.3.6 Indexing and Slicing
tensor = tf.constant([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print(tensor[0]) # First row
print(tensor[:, 1]) # Second column
print(tensor[0:2, 1:3]) # Slicing
1.3.7 Data Types
tf.float16,tf.float32,tf.float64: Floating-point numbers.tf.int8,tf.int16,tf.int32,tf.int64: Signed integers.tf.uint8,tf.uint16,tf.uint32,tf.uint64: Unsigned integers.tf.bool: Boolean.tf.string: String.tf.complex64,tf.complex128: Complex numbers.tf.qint8,tf.qint32,tf.quint8: Quantized integers.
1.4 Variables
Variables are mutable tensors used to store model parameters (weights and biases).
# Create a variable
var = tf.Variable([1.0, 2.0], name='my_variable')
# Assign new values
var.assign([3.0, 4.0]) # Replace entire value
var.assign_add([1.0, 1.0]) # Add to current value: [4.0, 5.0]
var.assign_sub([0.5, 0.5]) # Subtract from current value: [3.5, 4.5]
# Accessing properties
print(var.numpy()) # Convert to NumPy array
print(var.shape) # Shape of the variable
print(var.dtype) # Data type
print(var.trainable) # Whether variable is trainable
# Non-trainable variables (e.g., for batch norm moving averages)
non_trainable_var = tf.Variable([1.0, 2.0], trainable=False)
# Initialize variables
initial_value = tf.random.normal((3, 3))
weight_var = tf.Variable(initial_value, name='weights')
1.4.1 Variable Lifecycle
Create Variable
β
β
ββββββββββββββββ
β Initialize β
ββββββββ¬ββββββββ
β
ββββββββββββββββ
β Training βββββ
β Updates β β
ββββββββ¬ββββββββ β
β β
βββββββββββββ (Multiple epochs)
β
ββββββββββββββββ
β Save β
β Checkpoint β
ββββββββββββββββ
1.5 Automatic Differentiation (Autograd)
GradientTape records operations for automatic differentiation (computing gradients).
1.5.1 Gradient Computation Flow
Forward Pass
β
ββββββΌβββββ
β Record βββββ GradientTape
β Ops β (stores computation graph)
ββββββ¬βββββ
β
Loss Value
β
β
ββββββββββββββ
β Compute βββββ tape.gradient()
β Gradients β
ββββββ¬ββββββββ
β
Update Weights
1.5.2 Basic Gradient Computation
# Simple gradient
x = tf.Variable(3.0)
with tf.GradientTape() as tape:
y = x**2 # y = 9.0
dy_dx = tape.gradient(y, x) # dy/dx = 2*x = 6.0
print(f"Gradient: {dy_dx.numpy()}")
1.5.3 Multiple Gradients (Persistent Tape)
x = tf.Variable(3.0)
with tf.GradientTape(persistent=True) as tape:
y = x**2 # y = 9.0
z = y * 2 # z = 18.0
dy_dx = tape.gradient(y, x) # dy/dx = 2*x = 6.0
dz_dx = tape.gradient(z, x) # dz/dx = 4*x = 12.0
print(f"dy/dx: {dy_dx.numpy()}")
print(f"dz/dx: {dz_dx.numpy()}")
del tape # Clean up persistent tape
1.5.4 Watching Non-Variable Tensors
# Constants aren't watched by default
x = tf.constant(3.0)
with tf.GradientTape() as tape:
tape.watch(x) # Explicitly watch the constant
y = x**2
dy_dx = tape.gradient(y, x)
print(f"Gradient: {dy_dx.numpy()}")
1.5.5 Gradients for Multiple Variables
x = tf.Variable(2.0)
y = tf.Variable(3.0)
with tf.GradientTape() as tape:
z = x**2 + y**2 # z = 4 + 9 = 13
# Compute gradients for both variables
gradients = tape.gradient(z, [x, y]) # [dz/dx, dz/dy] = [4.0, 6.0]
print(f"dz/dx: {gradients[0].numpy()}")
print(f"dz/dy: {gradients[1].numpy()}")
1.5.6 Nested GradientTapes (Higher-Order Derivatives)
x = tf.Variable(3.0)
with tf.GradientTape() as outer_tape:
with tf.GradientTape() as inner_tape:
y = x**3 # y = 27
# First derivative: dy/dx = 3*x^2
dy_dx = inner_tape.gradient(y, x)
# Second derivative: dΒ²y/dxΒ² = 6*x
d2y_dx2 = outer_tape.gradient(dy_dx, x)
print(f"First derivative: {dy_dx.numpy()}") # 27.0
print(f"Second derivative: {d2y_dx2.numpy()}") # 18.0
1.6 Keras API
1.6.1 Model Building
TensorFlow provides three ways to build models, each with different levels of flexibility.
1.6.2 Model Building Approaches
Model Building APIs
β
ββββββββββββββββΌβββββββββββββββ
β β β
Sequential Functional API Subclassing
(Simplest) (Flexible) (Most Control)
β β β
βββ Linear βββ Multiple βββ Custom
β stack β inputs/ β logic
β β outputs β
βββ Quick βββ Shared βββ Dynamic
prototypes β layers graphs
βββ Complex
topologies
1.6.2.1 Sequential Model
Best for simple, linear stack of layers.
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
# Method 1: Pass layers as list
model = Sequential([
Dense(128, activation='relu', input_shape=(784,)),
Dropout(0.2),
Dense(64, activation='relu'),
Dense(10, activation='softmax')
])
# Method 2: Add layers incrementally
model = Sequential()
model.add(Dense(128, activation='relu', input_shape=(784,)))
model.add(Dropout(0.2))
model.add(Dense(64, activation='relu'))
model.add(Dense(10, activation='softmax'))
# View model architecture
model.summary()
1.6.2.2 Functional API
Best for complex models with multiple inputs/outputs, shared layers, or non-linear topology.
from tensorflow.keras.layers import Input, Dense, Concatenate
from tensorflow.keras.models import Model
# Single input/output
inputs = Input(shape=(784,))
x = Dense(128, activation='relu')(inputs)
x = Dense(64, activation='relu')(x)
outputs = Dense(10, activation='softmax')(x)
model = Model(inputs=inputs, outputs=outputs)
# Multiple inputs
input1 = Input(shape=(64,), name='input1')
input2 = Input(shape=(32,), name='input2')
x1 = Dense(32, activation='relu')(input1)
x2 = Dense(32, activation='relu')(input2)
# Merge branches
merged = Concatenate()([x1, x2])
output = Dense(10, activation='softmax')(merged)
model = Model(inputs=[input1, input2], outputs=output)
1.6.2.3 Model Subclassing
Best for custom models with dynamic behavior.
import tensorflow as tf
class MyModel(tf.keras.Model):
def __init__(self, num_classes=10):
super(MyModel, self).__init__()
self.dense1 = tf.keras.layers.Dense(128, activation='relu')
self.dropout = tf.keras.layers.Dropout(0.2)
self.dense2 = tf.keras.layers.Dense(64, activation='relu')
self.classifier = tf.keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=None):
x = self.dense1(inputs)
if training:
x = self.dropout(x, training=training)
x = self.dense2(x)
return self.classifier(x)
model = MyModel(num_classes=10)
# Build model by calling it
_ = model(tf.zeros((1, 784)))
model.summary()
1.6.3 ResNet-style Skip Connection Example
from tensorflow.keras.layers import Input, Dense, Add
from tensorflow.keras.models import Model
inputs = Input(shape=(64,))
x = Dense(64, activation='relu')(inputs)
x = Dense(64, activation='relu')(x)
# Skip connection
outputs = Add()([inputs, x])
model = Model(inputs=inputs, outputs=outputs)
1.6.4 Layers
1.6.4.1 Core Layers
from tensorflow.keras.layers import Dense, Activation, Dropout, Flatten
# Dense (Fully Connected) Layer
dense = Dense(units=64, activation='relu', use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros')
# Activation Layer
activation = Activation('relu') # or use directly in Dense
# Dropout (regularization)
dropout = Dropout(rate=0.5) # Drops 50% of inputs during training
# Flatten (convert 2D/3D to 1D)
flatten = Flatten() # (batch, height, width, channels) β (batch, features)
1.6.4.2 Convolutional Layers
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
# 2D Convolution
conv2d = Conv2D(filters=32, kernel_size=(3, 3), strides=(1, 1),
padding='same', activation='relu')
# Max Pooling
maxpool = MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='valid')
# Average Pooling
avgpool = AveragePooling2D(pool_size=(2, 2))
# Global Pooling (reduces to 1 value per channel)
from tensorflow.keras.layers import GlobalAveragePooling2D, GlobalMaxPooling2D
gap = GlobalAveragePooling2D()
gmp = GlobalMaxPooling2D()
1.6.4.3 CNN Architecture Example
Input Image (224Γ224Γ3)
β
β
βββββββββββββββ
β Conv2D β filters=32, kernel=3Γ3
β + ReLU β
ββββββββ¬βββββββ
β
βββββββββββββββ
β MaxPool2D β pool=2Γ2 β (112Γ112Γ32)
ββββββββ¬βββββββ
β
βββββββββββββββ
β Conv2D β filters=64, kernel=3Γ3
β + ReLU β
ββββββββ¬βββββββ
β
βββββββββββββββ
β Flatten β
ββββββββ¬βββββββ
β
βββββββββββββββ
β Dense β units=128
ββββββββ¬βββββββ
β
βββββββββββββββ
β Output β units=10 (classes)
βββββββββββββββ
1.6.4.4 Recurrent Layers
from tensorflow.keras.layers import LSTM, GRU, SimpleRNN, Bidirectional
# LSTM Layer
lstm = LSTM(units=128, return_sequences=True, # Return full sequence
return_state=False, # Don't return hidden states
dropout=0.2, recurrent_dropout=0.2)
# GRU Layer
gru = GRU(units=64, return_sequences=False)
# Bidirectional wrapper
bilstm = Bidirectional(LSTM(64, return_sequences=True))
# Stacked RNNs
model = Sequential([
LSTM(128, return_sequences=True, input_shape=(timesteps, features)),
LSTM(64, return_sequences=False),
Dense(10, activation='softmax')
])
1.6.4.5 Normalization Layers
from tensorflow.keras.layers import BatchNormalization, LayerNormalization
# Batch Normalization (normalize across batch)
batchnorm = BatchNormalization(momentum=0.99, epsilon=0.001)
# Layer Normalization (normalize across features)
layernorm = LayerNormalization()
# Usage in model
model = Sequential([
Dense(64),
BatchNormalization(),
Activation('relu')
])
1.6.4.6 Embedding Layers
from tensorflow.keras.layers import Embedding
# For text/categorical data
embedding = Embedding(input_dim=10000, # Vocabulary size
output_dim=128, # Embedding dimension
input_length=100) # Sequence length
# Example usage
model = Sequential([
Embedding(10000, 128, input_length=100),
LSTM(64),
Dense(1, activation='sigmoid')
])
1.6.4.7 Merge Layers
from tensorflow.keras.layers import Add, Multiply, Concatenate, Average
# Add (element-wise sum)
add_layer = Add()([tensor1, tensor2])
# Multiply (element-wise product)
multiply_layer = Multiply()([tensor1, tensor2])
# Concatenate (along axis)
concat_layer = Concatenate(axis=-1)([tensor1, tensor2])
# Average
avg_layer = Average()([tensor1, tensor2])
1.6.5 Activation Functions
'relu': Rectified Linear Unit.'sigmoid': Sigmoid function.'tanh': Hyperbolic tangent function.'softmax': Softmax function.'elu': Exponential Linear Unit.'selu': Scaled Exponential Linear Unit.'linear': Linear (identity) activation.'LeakyReLU': Leaky Rectified Linear Unit.'PReLU': Parametric Rectified Linear Unit.'gelu': Gaussian Error Linear Unit.'swish': Swish activation function.
1.6.6 Loss Functions
tf.keras.losses.CategoricalCrossentropy: Categorical cross-entropy.tf.keras.losses.SparseCategoricalCrossentropy: Sparse categorical cross-entropy.tf.keras.losses.BinaryCrossentropy: Binary cross-entropy.tf.keras.losses.MeanSquaredError: Mean squared error.tf.keras.losses.MeanAbsoluteError: Mean absolute error.tf.keras.losses.Hinge: Hinge loss.tf.keras.losses.KLDivergence: Kullback-Leibler Divergence loss.tf.keras.losses.Huber: Huber loss.
1.6.7 Optimizers
tf.keras.optimizers.SGD: Stochastic Gradient Descent.tf.keras.optimizers.Adam: Adaptive Moment Estimation.tf.keras.optimizers.RMSprop: Root Mean Square Propagation.tf.keras.optimizers.Adagrad: Adaptive Gradient Algorithm.tf.keras.optimizers.Adadelta: Adaptive Delta.tf.keras.optimizers.Adamax: Adamax optimizer.tf.keras.optimizers.Nadam: Nesterov Adam optimizer.tf.keras.optimizers.Ftrl: Follow The Regularized Leader optimizer.
1.6.8 Metrics
tf.keras.metrics.Accuracy: Accuracy.tf.keras.metrics.BinaryAccuracy: Binary accuracy.tf.keras.metrics.CategoricalAccuracy: Categorical accuracy.tf.keras.metrics.SparseCategoricalAccuracy: Sparse categorical accuracy.tf.keras.metrics.TopKCategoricalAccuracy: Top-K categorical accuracy.tf.keras.metrics.MeanSquaredError: Mean squared error.tf.keras.metrics.MeanAbsoluteError: Mean absolute error.tf.keras.metrics.Precision: Precision.tf.keras.metrics.Recall: Recall.tf.keras.metrics.AUC: Area Under the Curve.tf.keras.metrics.F1Score: F1 score.
1.6.9 Model Compilation
model.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
1.6.10 Training
1.6.11 Training Pipeline
Data Loading
β
β
Preprocessing
β
β
ββββββββββββββ
β Epoch 1 β
βββββββ¬βββββββ
β
ββββββββββββββββββββ
β Forward Pass βββββ Compute predictions
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Compute Loss βββββ Loss function
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Backward Pass βββββ Compute gradients
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Update Weights βββββ Optimizer step
ββββββββββ¬ββββββββββ
β
ββββ Repeat for all batches
# Prepare data
data = np.random.random((1000, 784))
labels = np.random.randint(10, size=(1000,))
one_hot_labels = tf.keras.utils.to_categorical(labels, num_classes=10)
# Split into train and validation
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(
data, one_hot_labels, test_size=0.2, random_state=42
)
# Train the model
history = model.fit(
X_train, y_train,
batch_size=32,
epochs=10,
validation_data=(X_val, y_val),
verbose=1 # 0=silent, 1=progress bar, 2=one line per epoch
)
# Access training history
print(f"Training accuracy: {history.history['accuracy']}")
print(f"Validation loss: {history.history['val_loss']}")
# Train with validation split (alternative)
history = model.fit(
data, one_hot_labels,
batch_size=32,
epochs=10,
validation_split=0.2, # Use 20% of data for validation
shuffle=True
)
1.6.12 Evaluation
# Evaluate on test data
test_loss, test_accuracy = model.evaluate(X_val, y_val, verbose=0)
print(f'Test Loss: {test_loss:.4f}')
print(f'Test Accuracy: {test_accuracy:.4f}')
# Evaluate with multiple metrics
test_loss, test_acc, test_precision = model.evaluate(
X_val, y_val, verbose=0
)
# Batch-wise evaluation
for batch_data, batch_labels in test_dataset:
loss, acc = model.evaluate(batch_data, batch_labels, verbose=0)
1.6.13 Prediction
# Make predictions
predictions = model.predict(X_val) # Returns probabilities
print(f"Prediction shape: {predictions.shape}") # (samples, classes)
# Get predicted classes
predicted_classes = np.argmax(predictions, axis=1)
print(f"Predicted classes: {predicted_classes[:10]}")
# Get prediction confidence
confidence = np.max(predictions, axis=1)
print(f"Confidence scores: {confidence[:10]}")
# Predict single sample
single_sample = X_val[0:1] # Keep batch dimension
prediction = model.predict(single_sample, verbose=0)
predicted_class = np.argmax(prediction)
# Batch prediction
batch_predictions = model.predict(X_val[:100], batch_size=32)
1.6.14 Saving and Loading Models
1.6.15 Model Persistence Flow
Trained Model
β
ββββββ΄βββββββββ
β β
SavedModel HDF5
(TF native) (.h5)
β β
βββ Better βββ Keras only
β for β Single file
β serving β Legacy
β β
ββββββββ¬βββββββ
β
Load & Use
(Inference/
Fine-tuning)
1.6.16 SavedModel Format (Recommended)
# Save entire model (architecture + weights + optimizer state)
model.save('saved_model/my_model')
# Load model
loaded_model = tf.keras.models.load_model('saved_model/my_model')
# Verify loaded model
predictions = loaded_model.predict(X_test[:5])
# SavedModel contains:
# - saved_model.pb (architecture + graph)
# - variables/ (weights)
# - assets/ (additional files)
1.6.17 HDF5 Format
# Save entire model to HDF5
model.save('my_model.h5')
# Load from HDF5
loaded_model = tf.keras.models.load_model('my_model.h5')
# Save only weights
model.save_weights('weights.h5')
# Load only weights (model architecture must exist)
model.load_weights('weights.h5')
1.6.18 Weights Only (Checkpoint Format)
# Save weights in TensorFlow checkpoint format
model.save_weights('./checkpoints/model_checkpoint')
# Creates:
# - model_checkpoint.index
# - model_checkpoint.data-00000-of-00001
# Load weights
model.load_weights('./checkpoints/model_checkpoint')
# Transfer learning: load specific layers
pretrained_model = tf.keras.applications.VGG16(weights='imagenet')
new_model = tf.keras.Sequential([
pretrained_model,
Dense(10, activation='softmax')
])
1.6.19 Model Architecture Only
# Save architecture to JSON
model_json = model.to_json()
with open('model_architecture.json', 'w') as json_file:
json_file.write(model_json)
# Load architecture
with open('model_architecture.json', 'r') as json_file:
loaded_model_json = json_file.read()
loaded_model = tf.keras.models.model_from_json(loaded_model_json)
# Then load weights separately
loaded_model.load_weights('weights.h5')
loaded_model.compile(optimizer='adam', loss='categorical_crossentropy')
1.6.20 Custom Objects
# If model has custom layers/functions
class CustomLayer(tf.keras.layers.Layer):
pass
# Save with custom objects
model.save('custom_model.h5')
# Load with custom objects
loaded_model = tf.keras.models.load_model(
'custom_model.h5',
custom_objects={'CustomLayer': CustomLayer}
)
1.6.21 Checkpoint Management
# Create checkpoint manager
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(
optimizer=optimizer,
model=model
)
manager = tf.train.CheckpointManager(
checkpoint,
checkpoint_dir,
max_to_keep=3 # Keep only 3 latest checkpoints
)
# Save checkpoint during training
for epoch in range(10):
# ... training code ...
save_path = manager.save()
print(f"Saved checkpoint: {save_path}")
# Restore latest checkpoint
checkpoint.restore(manager.latest_checkpoint)
if manager.latest_checkpoint:
print(f"Restored from {manager.latest_checkpoint}")
1.6.22 Callbacks
Callbacks are functions called at specific points during training for monitoring and automation.
1.6.23 Callback Execution Flow
Training Start
β
β
on_train_begin()
β
ββββββΌβββββββββ
β Epoch βββββββββββββ
β Start β β
ββββββ¬βββββββββ β
β β
on_epoch_begin() β
β β
ββββββΌβββββββββ β
β Batch ββββββββ β
β Start β β β
ββββββ¬βββββββββ β β
β β β
on_batch_begin() β β
β β β
Forward + Backward β β
β β β
on_batch_end() β β
β β β
βββββββββββββββββ β
(more batches) β
β β
on_epoch_end() β
β β
βββ Check metrics β
βββ Save checkpoint β
βββ Adjust LR β
β β
ββββββββββββββββββββββ
(more epochs or early stop)
β
on_train_end()
1.6.24 Common Callbacks
from tensorflow.keras.callbacks import (
ModelCheckpoint, EarlyStopping, TensorBoard,
ReduceLROnPlateau, CSVLogger, LearningRateScheduler
)
# ModelCheckpoint: Save model during training
checkpoint_callback = ModelCheckpoint(
filepath='./checkpoints/model-{epoch:02d}-{val_loss:.2f}.h5',
save_best_only=True, # Only save when val_loss improves
monitor='val_loss', # Metric to monitor
mode='min', # 'min' for loss, 'max' for accuracy
verbose=1,
save_weights_only=False # Save entire model
)
# EarlyStopping: Stop training when metric stops improving
early_stopping_callback = EarlyStopping(
monitor='val_loss',
patience=5, # Stop after 5 epochs without improvement
restore_best_weights=True, # Restore weights from best epoch
verbose=1,
min_delta=0.001 # Minimum change to qualify as improvement
)
# TensorBoard: Visualize training metrics
tensorboard_callback = TensorBoard(
log_dir='./logs',
histogram_freq=1, # Frequency for weight histograms
write_graph=True,
update_freq='epoch', # Update frequency
profile_batch='10,20' # Profile batches 10-20
)
# ReduceLROnPlateau: Reduce learning rate when metric plateaus
reduce_lr_callback = ReduceLROnPlateau(
monitor='val_loss',
factor=0.5, # Reduce LR by half
patience=3, # Wait 3 epochs before reducing
min_lr=1e-7, # Minimum learning rate
verbose=1
)
# CSVLogger: Log training metrics to CSV
csv_logger = CSVLogger('training_log.csv', append=True)
# LearningRateScheduler: Custom learning rate schedule
def scheduler(epoch, lr):
if epoch < 10:
return lr
else:
return lr * tf.math.exp(-0.1)
lr_scheduler = LearningRateScheduler(scheduler, verbose=1)
# Use callbacks in training
history = model.fit(
X_train, y_train,
epochs=50,
batch_size=32,
validation_data=(X_val, y_val),
callbacks=[
checkpoint_callback,
early_stopping_callback,
tensorboard_callback,
reduce_lr_callback,
csv_logger
]
)
1.6.25 Custom Callback
import time
class CustomCallback(tf.keras.callbacks.Callback):
def on_train_begin(self, logs=None):
print("Starting training...")
self.train_start_time = time.time()
def on_epoch_begin(self, epoch, logs=None):
print(f"\nStarting epoch {epoch + 1}")
self.epoch_start_time = time.time()
def on_epoch_end(self, epoch, logs=None):
epoch_time = time.time() - self.epoch_start_time
print(f"Epoch {epoch + 1} completed in {epoch_time:.2f}s")
print(f"Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}")
# Custom logic: Save model if accuracy > threshold
if logs['accuracy'] > 0.95:
print("High accuracy achieved! Saving model...")
self.model.save(f'high_acc_model_epoch_{epoch + 1}.h5')
def on_train_batch_end(self, batch, logs=None):
# Print every 100 batches
if batch % 100 == 0:
print(f"Batch {batch}, Loss: {logs['loss']:.4f}")
def on_train_end(self, logs=None):
total_time = time.time() - self.train_start_time
print(f"\nTraining completed in {total_time:.2f}s")
# Use custom callback
model.fit(
X_train, y_train,
epochs=10,
callbacks=[CustomCallback()]
)
1.6.26 Viewing TensorBoard
# Start TensorBoard server
tensorboard --logdir=./logs --port=6006
# Open in browser: http://localhost:6006
1.6.27 Regularization
tf.keras.regularizers.l1(0.01): L1 regularization.tf.keras.regularizers.l2(0.01): L2 regularization.tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01): L1 and L2 regularization.
from tensorflow.keras import regularizers
from tensorflow.keras.layers import Dense
model = Sequential([
Dense(128, activation='relu', input_shape=(784,),
kernel_regularizer=regularizers.l1(0.01), # L1 regularization
bias_regularizer=regularizers.l2(0.01)), # L2 regularization
Dense(10, activation='softmax')
])
1.6.28 Custom Layers
import tensorflow as tf
class MyCustomLayer(tf.keras.layers.Layer):
def __init__(self, units=32):
super(MyCustomLayer, self).__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(shape=(input_shape[-1], self.units),
initializer='random_normal',
trainable=True)
self.b = self.add_weight(shape=(self.units,),
initializer='zeros',
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
1.6.29 Custom Loss Functions
import tensorflow as tf
def my_custom_loss(y_true, y_pred):
squared_difference = tf.square(y_true - y_pred)
return tf.reduce_mean(squared_difference, axis=-1)
1.6.30 Custom Metrics
import tensorflow as tf
class MyCustomMetric(tf.keras.metrics.Metric):
def __init__(self, name='my_custom_metric', **kwargs):
super(MyCustomMetric, self).__init__(name=name, **kwargs)
self.sum = self.add_weight(name='sum', initializer='zeros')
self.count = self.add_weight(name='count', initializer='zeros')
def update_state(self, y_true, y_pred, sample_weight=None):
values = tf.abs(y_true - y_pred)
if sample_weight is not None:
sample_weight = tf.cast(sample_weight, self.dtype)
values = tf.multiply(values, sample_weight)
self.sum.assign_add(tf.reduce_sum(values))
self.count.assign_add(tf.cast(tf.size(y_true), self.dtype))
def result(self):
return self.sum / self.count
def reset_state(self):
self.sum.assign(0.0)
self.count.assign(0.0)
1.6.31 Custom Training Loops
Custom training loops provide fine-grained control over the training process.
1.6.32 Custom Training Flow
Initialize
(optimizer, loss, metrics)
β
β
ββββββββββββββ
β Epoch βββββββββββββ
ββββββββ¬ββββββ β
β β
ββββββββββββββ β
β Get Batch ββββββββ β
ββββββββ¬ββββββ β β
β β β
ββββββββββββββββββ β β
β Forward Pass β β β
β (GradientTape) β β β
ββββββββββ¬ββββββββ β β
β β β
ββββββββββββββββββ β β
β Compute Loss β β β
ββββββββββ¬ββββββββ β β
β β β
ββββββββββββββββββ β β
β Compute β β β
β Gradients β β β
ββββββββββ¬ββββββββ β β
β β β
ββββββββββββββββββ β β
β Apply β β β
β Gradients β β β
ββββββββββ¬ββββββββ β β
β β β
ββββββββββββββββββ β β
β Update Metrics β β β
ββββββββββ¬ββββββββ β β
β β β
ββββββββββββ β
β (more batches)β
β β
ββββββββββββββββββ β
β Log Results β β
ββββββββββ¬ββββββββ β
β β
βββββββββββββββββ
(more epochs)
1.6.33 Basic Custom Training Loop
import tensorflow as tf
# Initialize components
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()
train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
# Compute gradients
gradients = tape.gradient(loss, model.trainable_variables)
# Update weights
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# Update metrics
train_acc_metric.update_state(labels, predictions)
return loss
@tf.function
def val_step(images, labels):
predictions = model(images, training=False)
loss = loss_fn(labels, predictions)
val_acc_metric.update_state(labels, predictions)
return loss
# Training loop
epochs = 10
for epoch in range(epochs):
print(f"\nEpoch {epoch + 1}/{epochs}")
# Training phase
train_losses = []
for batch, (images, labels) in enumerate(train_dataset):
loss = train_step(images, labels)
train_losses.append(loss.numpy())
if batch % 100 == 0:
print(f"Batch {batch}, Loss: {loss.numpy():.4f}")
# Validation phase
val_losses = []
for images, labels in val_dataset:
val_loss = val_step(images, labels)
val_losses.append(val_loss.numpy())
# Print epoch results
train_acc = train_acc_metric.result()
val_acc = val_acc_metric.result()
print(f"Train Loss: {np.mean(train_losses):.4f}, Train Acc: {train_acc:.4f}")
print(f"Val Loss: {np.mean(val_losses):.4f}, Val Acc: {val_acc:.4f}")
# Reset metrics
train_acc_metric.reset_states()
val_acc_metric.reset_states()
1.6.34 Advanced Custom Training with Regularization
@tf.function
def train_step_with_regularization(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
# Main loss
loss = loss_fn(labels, predictions)
# Add L2 regularization
l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in model.trainable_variables
if 'bias' not in v.name])
total_loss = loss + 0.001 * l2_loss
gradients = tape.gradient(total_loss, model.trainable_variables)
# Gradient clipping
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_acc_metric.update_state(labels, predictions)
return loss, total_loss
# Training with logging
for epoch in range(epochs):
for images, labels in train_dataset:
loss, total_loss = train_step_with_regularization(images, labels)
1.6.35 Custom Training with Learning Rate Scheduling
# Define learning rate schedule
initial_lr = 0.001
decay_steps = 1000
decay_rate = 0.96
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=initial_lr,
decay_steps=decay_steps,
decay_rate=decay_rate
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
# Or manual learning rate adjustment
for epoch in range(epochs):
# Adjust learning rate manually
if epoch > 0 and epoch % 5 == 0:
new_lr = optimizer.learning_rate * 0.5
optimizer.learning_rate.assign(new_lr)
print(f"Learning rate adjusted to: {new_lr.numpy()}")
for images, labels in train_dataset:
loss = train_step(images, labels)
1.7 Data Input Pipelines (tf.data)
1.7.1 tf.data Pipeline Flow
Raw Data
β
β
βββββββββββββ
β Create βββββ from_tensor_slices
β Dataset β from_generator
βββββββ¬ββββββ list_files
β
βββββββββββββ
β Shuffle βββββ Randomize order
βββββββ¬ββββββ
β
βββββββββββββ
β Map βββββ Preprocess
β β Augment
βββββββ¬ββββββ
β
βββββββββββββ
β Cache βββββ Store in memory/disk
βββββββ¬ββββββ
β
βββββββββββββ
β Batch βββββ Create batches
βββββββ¬ββββββ
β
βββββββββββββ
β Prefetch βββββ Prepare next batch
βββββββ¬ββββββ
β
Training
1.7.2 Creating Datasets
import tensorflow as tf
import numpy as np
# From NumPy arrays
X = np.random.random((1000, 28, 28, 1))
y = np.random.randint(10, size=(1000,))
dataset = tf.data.Dataset.from_tensor_slices((X, y))
# From tensors
tensor_x = tf.constant([[1, 2], [3, 4]])
tensor_y = tf.constant([0, 1])
dataset = tf.data.Dataset.from_tensor_slices((tensor_x, tensor_y))
# From list of files
file_dataset = tf.data.Dataset.list_files("path/to/data/*.jpg")
# From CSV
dataset = tf.data.experimental.make_csv_dataset(
"data.csv",
batch_size=32,
label_name="target",
num_epochs=1
)
# From generator
def data_generator():
for i in range(1000):
yield (i, i**2)
dataset = tf.data.Dataset.from_generator(
data_generator,
output_signature=(
tf.TensorSpec(shape=(), dtype=tf.int32),
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
# Range dataset
range_dataset = tf.data.Dataset.range(100)
1.7.3 Dataset Transformations
# Create a dataset
dataset = tf.data.Dataset.from_tensor_slices((X, y))
# Shuffle: Randomize order (buffer_size should be >= dataset size for full shuffle)
dataset = dataset.shuffle(buffer_size=1000, seed=42)
# Map: Apply preprocessing function
def preprocess(image, label):
image = tf.cast(image, tf.float32) / 255.0 # Normalize
return image, label
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
# Filter: Keep only certain elements
dataset = dataset.filter(lambda x, y: y < 5) # Keep only classes 0-4
# Batch: Create batches
dataset = dataset.batch(32, drop_remainder=False)
# Repeat: Repeat dataset (for multiple epochs)
dataset = dataset.repeat(count=10) # Repeat 10 times
# Cache: Store dataset in memory or disk
dataset = dataset.cache() # In-memory
# dataset = dataset.cache("/path/to/cache") # On disk
# Prefetch: Prepare next batches while training current batch
dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
# Take: Get first N elements
small_dataset = dataset.take(100)
# Skip: Skip first N elements
remaining_dataset = dataset.skip(100)
1.7.4 Optimized Pipeline
def create_dataset(X, y, batch_size=32, shuffle=True, augment=False):
"""Create an optimized tf.data pipeline"""
dataset = tf.data.Dataset.from_tensor_slices((X, y))
if shuffle:
dataset = dataset.shuffle(buffer_size=len(X))
# Preprocessing
def preprocess(image, label):
image = tf.cast(image, tf.float32) / 255.0
if augment:
# Data augmentation
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, max_delta=0.1)
return image, label
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.cache() # Cache after preprocessing
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
return dataset
# Usage
train_dataset = create_dataset(X_train, y_train, batch_size=64, augment=True)
val_dataset = create_dataset(X_val, y_val, batch_size=64, shuffle=False)
1.7.5 Advanced Transformations
# Zip: Combine multiple datasets
dataset1 = tf.data.Dataset.range(5)
dataset2 = tf.data.Dataset.range(5, 10)
zipped = tf.data.Dataset.zip((dataset1, dataset2))
# FlatMap: Map and flatten
dataset = tf.data.Dataset.from_tensor_slices([1, 2, 3])
dataset = dataset.flat_map(
lambda x: tf.data.Dataset.from_tensor_slices([x, x*2, x*3])
)
# Interleave: Parallel processing of multiple files
files = tf.data.Dataset.list_files("data/*.csv")
dataset = files.interleave(
lambda x: tf.data.TextLineDataset(x),
cycle_length=4, # Process 4 files in parallel
num_parallel_calls=tf.data.AUTOTUNE
)
# Window: Create sliding windows
dataset = tf.data.Dataset.range(10)
dataset = dataset.window(size=3, shift=1, drop_remainder=True)
dataset = dataset.flat_map(lambda x: x.batch(3))
1.7.6 Reading TFRecord Files
raw_dataset = tf.data.TFRecordDataset("my_data.tfrecord")
# Define a feature description
feature_description = {
'feature0': tf.io.FixedLenFeature([], tf.int64),
'feature1': tf.io.FixedLenFeature([], tf.string),
'feature2': tf.io.FixedLenFeature([10], tf.float32),
}
def _parse_function(example_proto):
# Parse the input tf.train.Example proto using the feature description.
return tf.io.parse_single_example(example_proto, feature_description)
parsed_dataset = raw_dataset.map(_parse_function)
1.8 Distributed Training
1.8.1 Distribution Strategies Overview
Distribution Strategies
β
βββββββββ΄ββββββββ¬ββββββββββββββ¬βββββββββββββββ
β β β β
Mirrored MultiWorker Parameter TPU
Strategy Mirrored Server Strategy
β Strategy Strategy
β β β β
Single Multiple Multiple Cloud
Machine Machines Machines TPUs
Multiple Multiple PS + Workers
GPUs GPUs
1.8.2 MirroredStrategy (Single Machine, Multiple GPUs)
Synchronous training on multiple GPUs on a single machine.
import tensorflow as tf
# Create strategy
strategy = tf.distribute.MirroredStrategy()
print(f"Number of devices: {strategy.num_replicas_in_sync}")
# Build and compile model inside strategy scope
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Prepare dataset
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.shuffle(10000).batch(GLOBAL_BATCH_SIZE)
# Train (distributed automatically)
model.fit(train_dataset, epochs=10)
1.8.3 Custom Training Loop with MirroredStrategy
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
reduction=tf.keras.losses.Reduction.NONE
)
def compute_loss(labels, predictions):
per_example_loss = loss_fn(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss,
global_batch_size=GLOBAL_BATCH_SIZE)
@tf.function
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
# Training loop
for epoch in range(10):
total_loss = 0.0
num_batches = 0
for batch in train_dataset:
loss = distributed_train_step(batch)
total_loss += loss
num_batches += 1
print(f"Epoch {epoch + 1}, Loss: {total_loss / num_batches:.4f}")
1.8.4 MultiWorkerMirroredStrategy
import os, json
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# ... build and compile model ...
1.8.5 ParameterServerStrategy
import os, json
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"],
'ps': ["localhost:34567"]
},
'task': {'type': 'worker', 'index': 0}
})
strategy = tf.distribute.experimental.ParameterServerStrategy()
with strategy.scope():
# ... build and compile model ...
1.8.6 TPUStrategy
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.TPUStrategy(resolver)
with strategy.scope():
# ... build and compile model ...
1.9 Data Augmentation
Data augmentation artificially increases training data by applying transformations.
1.9.1 Image Augmentation Techniques
Original Image
β
ββββββ΄βββββ¬βββββββββ¬βββββββββ¬ββββββββββ
β β β β β
Flip Rotation Zoom Brightness Contrast
β β β β β
ββββββ¬βββββ΄βββββββββ΄βββββββββ΄ββββββββββ
β
Augmented Images
(More training data)
1.9.2 Using tf.keras.layers for Augmentation
# Create augmentation layers (applied during training)
data_augmentation = tf.keras.Sequential([
tf.keras.layers.RandomFlip("horizontal"),
tf.keras.layers.RandomRotation(0.2), # Β±20% = Β±72 degrees
tf.keras.layers.RandomZoom(0.2), # Β±20% zoom
tf.keras.layers.RandomTranslation(0.1, 0.1), # 10% shift
tf.keras.layers.RandomContrast(0.2),
])
# Build model with augmentation
model = tf.keras.Sequential([
data_augmentation, # Only active during training
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(10, activation='softmax')
])
1.9.3 Using tf.image for Augmentation
def augment_image(image, label):
"""Custom augmentation function"""
# Random flip
image = tf.image.random_flip_left_right(image)
image = tf.image.random_flip_up_down(image)
# Random brightness
image = tf.image.random_brightness(image, max_delta=0.2)
# Random contrast
image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
# Random saturation (for color images)
image = tf.image.random_saturation(image, lower=0.8, upper=1.2)
# Random hue (for color images)
image = tf.image.random_hue(image, max_delta=0.1)
# Random crop
image = tf.image.random_crop(image, size=[224, 224, 3])
# Clip values to [0, 1]
image = tf.clip_by_value(image, 0.0, 1.0)
return image, label
# Apply to dataset
train_dataset = train_dataset.map(augment_image,
num_parallel_calls=tf.data.AUTOTUNE)
1.9.4 Advanced Augmentation (MixUp, CutMix)
def mixup(image1, label1, image2, label2, alpha=0.2):
"""MixUp augmentation"""
# Sample mixing coefficient
lambda_val = tf.random.uniform([], 0, 1)
lambda_val = tf.maximum(lambda_val, 1 - lambda_val)
# Mix images
mixed_image = lambda_val * image1 + (1 - lambda_val) * image2
# Mix labels
mixed_label = lambda_val * label1 + (1 - lambda_val) * label2
return mixed_image, mixed_label
def cutmix(image1, label1, image2, label2, alpha=1.0):
"""CutMix augmentation"""
# Sample cutting area
lambda_val = tf.random.uniform([], 0, 1)
# Get image size
h, w = tf.shape(image1)[0], tf.shape(image1)[1]
# Calculate cut size
cut_h = tf.cast(tf.cast(h, tf.float32) * tf.sqrt(1 - lambda_val), tf.int32)
cut_w = tf.cast(tf.cast(w, tf.float32) * tf.sqrt(1 - lambda_val), tf.int32)
# Random position
cx = tf.random.uniform([], 0, w, dtype=tf.int32)
cy = tf.random.uniform([], 0, h, dtype=tf.int32)
# Calculate boundaries
x1 = tf.clip_by_value(cx - cut_w // 2, 0, w)
y1 = tf.clip_by_value(cy - cut_h // 2, 0, h)
x2 = tf.clip_by_value(cx + cut_w // 2, 0, w)
y2 = tf.clip_by_value(cy + cut_h // 2, 0, h)
# Create mask
mask = tf.ones([h, w, 3])
mask = tf.tensor_scatter_nd_update(
mask,
[[y1, x1], [y2, x2]],
[tf.zeros([y2-y1, x2-x1, 3])]
)
# Apply cutmix
mixed_image = image1 * mask + image2 * (1 - mask)
# Mix labels based on area
lambda_val = 1 - ((x2 - x1) * (y2 - y1)) / (h * w)
mixed_label = lambda_val * label1 + (1 - lambda_val) * label2
return mixed_image, mixed_label
1.9.5 Image Preprocessing
# Normalization
def normalize_image(image):
"""Normalize image to [0, 1] or [-1, 1]"""
# To [0, 1]
image = tf.cast(image, tf.float32) / 255.0
# Or to [-1, 1]
# image = (tf.cast(image, tf.float32) - 127.5) / 127.5
return image
# Standardization (ImageNet mean/std)
def standardize_image(image):
"""Standardize using ImageNet statistics"""
mean = tf.constant([0.485, 0.456, 0.406])
std = tf.constant([0.229, 0.224, 0.225])
image = tf.cast(image, tf.float32) / 255.0
image = (image - mean) / std
return image
# Resize and crop
def resize_and_crop(image, size=224):
"""Resize and center crop"""
# Resize to slightly larger
image = tf.image.resize(image, [size + 32, size + 32])
# Center crop
image = tf.image.resize_with_crop_or_pad(image, size, size)
return image
# Combined preprocessing
def preprocess_pipeline(image, label, training=False):
"""Complete preprocessing pipeline"""
# Decode if needed
if image.dtype == tf.string:
image = tf.image.decode_jpeg(image, channels=3)
# Resize
image = tf.image.resize(image, [224, 224])
if training:
# Augmentation for training
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, max_delta=0.1)
image = tf.image.random_contrast(image, lower=0.9, upper=1.1)
# Normalize
image = tf.cast(image, tf.float32) / 255.0
return image, label
1.10 TensorFlow Hub
Pre-trained models for transfer learning and feature extraction.
1.10.1 Using Pre-trained Models
import tensorflow_hub as hub
# Method 1: As a layer
model = tf.keras.Sequential([
hub.KerasLayer(
"https://tfhub.dev/google/imagenet/mobilenet_v2_100_224/feature_vector/4",
trainable=False, # Freeze weights
input_shape=(224, 224, 3)
),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# Method 2: Load as a model
embedding_model = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
# Use for embeddings
sentences = ["Hello world", "Machine learning is great"]
embeddings = embedding_model(sentences)
# Method 3: KerasLayer with trainable=True for fine-tuning
feature_extractor = hub.KerasLayer(
"https://tfhub.dev/google/imagenet/mobilenet_v2_100_224/feature_vector/4",
trainable=True, # Fine-tune
input_shape=(224, 224, 3)
)
model = tf.keras.Sequential([
feature_extractor,
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(10, activation='softmax')
])
1.11 TensorFlow Lite
TensorFlow Lite enables on-device machine learning for mobile and IoT devices.
1.11.1 TensorFlow Lite Workflow
Trained Model
β
β
ββββββββββββββββ
β Convert βββββ TFLite Converter
β to .tfliteβ
ββββββββ¬ββββββββ
β
ββββββββββββββββ
β Optimize βββββ Quantization
β (Optional) β Pruning
ββββββββ¬ββββββββ
β
.tflite Model
(Smaller, Faster)
β
β
ββββββββββββββββ
β Deploy to βββββ Mobile (Android/iOS)
β Device β IoT (Raspberry Pi)
ββββββββββββββββ Microcontrollers
1.11.2 Converting to TensorFlow Lite
# Convert from Keras model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
# Save to file
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# Convert from SavedModel
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
tflite_model = converter.convert()
# Convert from concrete function
converter = tf.lite.TFLiteConverter.from_concrete_functions([concrete_func])
tflite_model = converter.convert()
1.11.3 Post-Training Quantization
# Dynamic range quantization (weights only)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()
# Full integer quantization (weights + activations)
def representative_dataset():
for i in range(100):
# Use representative data samples
yield [np.random.random((1, 224, 224, 3)).astype(np.float32)]
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_quant_model = converter.convert()
# Float16 quantization
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_float16_model = converter.convert()
1.11.4 Inference with TensorFlow Lite
# Load TFLite model
interpreter = tf.lite.Interpreter(model_path='model.tflite')
# Or from model content
# interpreter = tf.lite.Interpreter(model_content=tflite_model)
# Allocate tensors
interpreter.allocate_tensors()
# Get input and output details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(f"Input shape: {input_details[0]['shape']}")
print(f"Input dtype: {input_details[0]['dtype']}")
# Prepare input data
input_shape = input_details[0]['shape']
input_data = np.array(np.random.random_sample(input_shape), dtype=np.float32)
# Set input tensor
interpreter.set_tensor(input_details[0]['index'], input_data)
# Run inference
interpreter.invoke()
# Get output
output_data = interpreter.get_tensor(output_details[0]['index'])
print(f"Prediction: {output_data}")
1.11.5 Batch Inference
# Resize input tensor for batch processing
interpreter = tf.lite.Interpreter(model_path='model.tflite')
interpreter.resize_tensor_input(input_details[0]['index'], [10, 224, 224, 3])
interpreter.allocate_tensors()
# Now can process batch of 10 images
batch_data = np.random.random((10, 224, 224, 3)).astype(np.float32)
interpreter.set_tensor(input_details[0]['index'], batch_data)
interpreter.invoke()
output = interpreter.get_tensor(output_details[0]['index'])
1.11.6 Model Analysis
# Analyze model size
import os
model_size = os.path.getsize('model.tflite')
quant_model_size = os.path.getsize('model_quant.tflite')
print(f"Original model: {model_size / 1024:.2f} KB")
print(f"Quantized model: {quant_model_size / 1024:.2f} KB")
print(f"Size reduction: {(1 - quant_model_size/model_size) * 100:.1f}%")
1.12 TensorFlow Serving
1.12.1 Exporting a SavedModel
tf.saved_model.save(model, "path/to/saved_model")
1.12.2 Serving with TensorFlow Serving
-
Install TensorFlow Serving:
# See TensorFlow Serving installation guide for details -
Start the server:
tensorflow_model_server --port=8500 --rest_api_port=8501 --model_name=my_model --model_base_path=/path/to/saved_model -
Send requests (using
requestslibrary in Python):
import requests
import json
data = json.dumps({"instances": [[1.0, 2.0, ...]]}) # Example input data
headers = {"content-type": "application/json"}
json_response = requests.post('http://localhost:8501/v1/models/my_model:predict', data=data, headers=headers)
predictions = json.loads(json_response.text)['predictions']
1.13 TensorFlow Extended (TFX)
TFX is a platform for building and deploying production ML pipelines. It includes components for:
- Data validation (
tensorflow_data_validation) - Data transformation (
tensorflow_transform) - Model training (
tensorflow) - Model analysis (
tensorflow_model_analysis) - Model serving (
tensorflow_serving) - Pipeline orchestration (Apache Beam, Apache Airflow, Kubeflow Pipelines)
1.14 TensorFlow Probability
1.14.1 Installation
pip install tensorflow-probability
1.14.2 Distributions
import tensorflow_probability as tfp
tfd = tfp.distributions
# Normal distribution
normal_dist = tfd.Normal(loc=0., scale=1.)
samples = normal_dist.sample(10)
log_prob = normal_dist.log_prob(0.)
# Bernoulli distribution
bernoulli_dist = tfd.Bernoulli(probs=0.7)
samples = bernoulli_dist.sample(10)
# Categorical distribution
categorical_dist = tfd.Categorical(probs=[0.2, 0.3, 0.5])
samples = categorical_dist.sample(10)
1.14.3 Bijectors
import tensorflow_probability as tfp
tfb = tfp.bijectors
# Affine bijector
affine_bijector = tfb.Affine(shift=2., scale_diag=[3., 4.])
transformed_tensor = affine_bijector.forward(tf.constant([[1., 2.]]))
# Exp bijector
exp_bijector = tfb.Exp()
transformed_tensor = exp_bijector.forward(tf.constant([0., 1., 2.]))
1.14.4 Markov Chain Monte Carlo (MCMC)
import tensorflow_probability as tfp
tfd = tfp.distributions
tfm = tfp.mcmc
# Define a target distribution (e.g., a normal distribution)
target_log_prob_fn = tfd.Normal(loc=0., scale=1.).log_prob
# Define a kernel (e.g., Hamiltonian Monte Carlo)
kernel = tfm.HamiltonianMonteCarlo(
target_log_prob_fn=target_log_prob_fn,
step_size=0.1,
num_leapfrog_steps=3)
# Run the MCMC sampler
samples, _ = tfm.sample_chain(
num_results=1000,
current_state=0.,
kernel=kernel)
1.15 TensorFlow Datasets (TFDS)
1.15.1 Installation
pip install tensorflow-datasets
1.15.2 Loading Datasets
import tensorflow_datasets as tfds
# Load a dataset
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
# Print dataset information
print(ds_info)
1.15.3 Processing Datasets
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
1.16 TensorFlow Addons
1.16.1 Installation
pip install tensorflow-addons
1.16.2 Usage (Example: WeightNormalization)
import tensorflow_addons as tfa
model = tf.keras.Sequential([
tfa.layers.WeightNormalization(tf.keras.layers.Dense(64, activation="relu"), data_init=False),
tf.keras.layers.Dense(10, activation="softmax"),
])
1.17 Eager Execution
Eager execution is enabled by default in TensorFlow 2.x. You can check if it's enabled:
tf.executing_eagerly() # Returns True
1.18 tf.function
Converts Python functions into TensorFlow graphs for improved performance.
1.18.1 tf.function Benefits
Python Function
(Eager Execution)
β
β
@tf.function
β
ββββββββ΄βββββββ
β β
Graph Mode Autograph
(Faster) (PythonβTF)
β β
βββ Faster βββ Handles
β execution β control flow
βββ GPU βββ Converts
β optimized β if/for/while
βββ Can save βββ Pythonic code
as SavedModel
1.18.2 Basic Usage
# Regular function (eager execution)
def regular_function(x, y):
return x**2 + y**2
# Decorated function (graph execution)
@tf.function
def graph_function(x, y):
return x**2 + y**2
# Usage
x = tf.constant(3.0)
y = tf.constant(4.0)
result = graph_function(x, y)
print(result) # 25.0
# Performance comparison
import time
# Eager execution
start = time.time()
for _ in range(1000):
_ = regular_function(x, y)
eager_time = time.time() - start
# Graph execution
start = time.time()
for _ in range(1000):
_ = graph_function(x, y)
graph_time = time.time() - start
print(f"Eager: {eager_time:.4f}s, Graph: {graph_time:.4f}s")
1.18.3 Input Signatures
# Specify input types and shapes
@tf.function(input_signature=[
tf.TensorSpec(shape=[None, 28, 28], dtype=tf.float32),
tf.TensorSpec(shape=[None], dtype=tf.int32)
])
def train_step(images, labels):
# Function body
return images, labels
# This ensures the function is only traced once
1.18.4 Control Flow
@tf.function
def conditional_function(x):
if x > 0:
return x * 2
else:
return x * 3
@tf.function
def loop_function(n):
result = 0
for i in tf.range(n):
result += i
return result
# Using Python loops (unrolled at trace time)
@tf.function
def python_loop():
result = 0
for i in range(10): # Python range
result += i
return result
# Using TensorFlow loops (dynamic)
@tf.function
def tf_loop(n):
result = tf.constant(0)
for i in tf.range(n): # TensorFlow range
result += i
return result
1.18.5 Debugging tf.function
# Disable tf.function for debugging
@tf.function
def buggy_function(x):
tf.print("Debug: x =", x) # Use tf.print instead of print
result = x * 2
return result
# Run in eager mode for debugging
tf.config.run_functions_eagerly(True)
result = buggy_function(tf.constant(5.0))
tf.config.run_functions_eagerly(False)
# Or use py_function for regular Python debugging
@tf.function
def with_python_debug(x):
tf.py_function(lambda: print(f"Python print: {x}"), [], [])
return x * 2
1.18.6 AutoGraph Examples
@tf.function
def fibonacci(n):
a, b = 0, 1
for _ in tf.range(n):
a, b = b, a + b
return a
@tf.function
def dynamic_rnn(input_sequence):
state = 0
for element in input_sequence:
state = state + element
return state
# Conditional with variables
@tf.function
def clip_gradients(gradients, max_norm):
clipped = []
for grad in gradients:
if grad is not None:
clipped.append(tf.clip_by_norm(grad, max_norm))
else:
clipped.append(None)
return clipped
1.18.7 Polymorphic Functions
@tf.function
def polymorphic_function(x):
return x + 1
# First call traces with shape (2,)
result1 = polymorphic_function(tf.constant([1, 2]))
# Second call reuses trace with shape (2,)
result2 = polymorphic_function(tf.constant([3, 4]))
# Third call traces with new shape (3,)
result3 = polymorphic_function(tf.constant([5, 6, 7]))
# Check number of traces
print(polymorphic_function.experimental_get_tracing_count())
1.18.8 XLA Compilation
# Enable XLA compilation for additional performance
@tf.function(jit_compile=True)
def xla_optimized(x, y):
return tf.matmul(x, y)
# Or use experimental_compile (deprecated)
@tf.function(experimental_compile=True)
def xla_function(x):
return x**2 + tf.sin(x)
1.19 Custom Training with GradientTape
import tensorflow as tf
# Define the model, optimizer, and loss function
model = tf.keras.Sequential([tf.keras.layers.Dense(10, input_shape=(784,), activation='softmax')])
optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.CategoricalCrossentropy()
# Define a training step
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images)
loss = loss_fn(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# Training loop
epochs = 10
for epoch in range(epochs):
for images, labels in dataset:
loss = train_step(images, labels)
print(f"Epoch {epoch+1}, Loss: {loss.numpy():.4f}")
1.20 Complete End-to-End Example
1.20.1 Image Classification Pipeline
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split
# 1. Data Preparation
def load_and_preprocess_data():
"""Load and preprocess image data"""
# Load MNIST dataset
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
# Normalize pixel values
X_train = X_train.astype('float32') / 255.0
X_test = X_test.astype('float32') / 255.0
# Reshape for CNN (add channel dimension)
X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]
# Split training data into train and validation
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.2, random_state=42
)
return (X_train, y_train), (X_val, y_val), (X_test, y_test)
# 2. Create Data Pipeline
def create_dataset(X, y, batch_size=64, shuffle=True, augment=False):
"""Create optimized tf.data pipeline"""
dataset = tf.data.Dataset.from_tensor_slices((X, y))
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
if augment:
def augment_fn(image, label):
image = tf.image.random_brightness(image, max_delta=0.1)
image = tf.clip_by_value(image, 0.0, 1.0)
return image, label
dataset = dataset.map(augment_fn, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
return dataset
# 3. Build Model
def create_model(input_shape=(28, 28, 1), num_classes=10):
"""Create CNN model"""
model = tf.keras.Sequential([
# Convolutional layers
tf.keras.layers.Conv2D(32, (3, 3), activation='relu',
input_shape=input_shape, padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
tf.keras.layers.BatchNormalization(),
# Dense layers
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation='softmax')
])
return model
# 4. Setup Callbacks
def create_callbacks():
"""Create training callbacks"""
callbacks = [
tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=5,
restore_best_weights=True,
verbose=1
),
tf.keras.callbacks.ModelCheckpoint(
'best_model.h5',
monitor='val_accuracy',
save_best_only=True,
verbose=1
),
tf.keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=3,
min_lr=1e-7,
verbose=1
),
tf.keras.callbacks.TensorBoard(
log_dir='./logs',
histogram_freq=1
)
]
return callbacks
# 5. Main Training Function
def train_model():
"""Complete training pipeline"""
# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)
# Load data
print("Loading data...")
(X_train, y_train), (X_val, y_val), (X_test, y_test) = load_and_preprocess_data()
print(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
# Create datasets
train_dataset = create_dataset(X_train, y_train, batch_size=128, augment=True)
val_dataset = create_dataset(X_val, y_val, batch_size=128, shuffle=False)
test_dataset = create_dataset(X_test, y_test, batch_size=128, shuffle=False)
# Build model
print("\nBuilding model...")
model = create_model()
model.summary()
# Compile model
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Train model
print("\nTraining model...")
history = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=30,
callbacks=create_callbacks(),
verbose=1
)
# Evaluate on test set
print("\nEvaluating on test set...")
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
# Save final model
model.save('final_model.h5')
print("\nModel saved as 'final_model.h5'")
return model, history
# 6. Run Training
if __name__ == '__main__':
model, history = train_model()
# Make predictions
print("\nMaking predictions...")
(_, _), (_, _), (X_test, y_test) = load_and_preprocess_data()
predictions = model.predict(X_test[:10])
predicted_classes = np.argmax(predictions, axis=1)
print(f"Predicted: {predicted_classes}")
print(f"Actual: {y_test[:10]}")
1.20.2 Text Classification Example
def text_classification_pipeline():
"""Complete text classification pipeline"""
# 1. Load and preprocess text data
max_words = 10000
max_len = 200
# Load IMDB dataset
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.imdb.load_data(
num_words=max_words
)
# Pad sequences
X_train = tf.keras.preprocessing.sequence.pad_sequences(
X_train, maxlen=max_len
)
X_test = tf.keras.preprocessing.sequence.pad_sequences(
X_test, maxlen=max_len
)
# 2. Build text model
model = tf.keras.Sequential([
tf.keras.layers.Embedding(max_words, 128, input_length=max_len),
tf.keras.layers.Bidirectional(
tf.keras.layers.LSTM(64, return_sequences=True)
),
tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(1, activation='sigmoid')
])
# 3. Compile and train
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
history = model.fit(
X_train, y_train,
epochs=10,
batch_size=128,
validation_split=0.2,
callbacks=[
tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)
]
)
# 4. Evaluate
test_loss, test_acc = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {test_acc:.4f}")
return model, history
# Run text classification
# model, history = text_classification_pipeline()
1.21 Custom Callbacks
import time
class CustomCallback(tf.keras.callbacks.Callback):
"""Custom callback with comprehensive monitoring"""
def on_train_begin(self, logs=None):
print("=" * 50)
print("Training Started")
print("=" * 50)
self.train_start_time = time.time()
self.epoch_times = []
def on_epoch_begin(self, epoch, logs=None):
print(f"\n{'='*50}")
print(f"Epoch {epoch + 1} Starting")
print(f"{'='*50}")
self.epoch_start_time = time.time()
def on_epoch_end(self, epoch, logs=None):
epoch_time = time.time() - self.epoch_start_time
self.epoch_times.append(epoch_time)
print(f"\nEpoch {epoch + 1} Results:")
print(f" Time: {epoch_time:.2f}s")
print(f" Loss: {logs.get('loss', 0):.4f}")
print(f" Accuracy: {logs.get('accuracy', 0):.4f}")
print(f" Val Loss: {logs.get('val_loss', 0):.4f}")
print(f" Val Accuracy: {logs.get('val_accuracy', 0):.4f}")
# Save model if validation accuracy > threshold
if logs.get('val_accuracy', 0) > 0.95:
self.model.save(f'high_acc_model_epoch_{epoch + 1}.h5')
print(f" β Saved model (val_acc > 0.95)")
def on_train_batch_end(self, batch, logs=None):
# Print progress every 100 batches
if batch % 100 == 0:
print(f" Batch {batch}: loss={logs.get('loss', 0):.4f}", end='\r')
def on_train_end(self, logs=None):
total_time = time.time() - self.train_start_time
avg_epoch_time = np.mean(self.epoch_times)
print(f"\n{'='*50}")
print("Training Completed")
print(f"{'='*50}")
print(f"Total Time: {total_time:.2f}s")
print(f"Average Epoch Time: {avg_epoch_time:.2f}s")
print(f"Total Epochs: {len(self.epoch_times)}")
# Use custom callback
model.fit(
X_train, y_train,
epochs=10,
validation_data=(X_val, y_val),
callbacks=[CustomCallback()]
)
1.22 Mixed Precision Training
from tensorflow.keras.mixed_precision import experimental as mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_policy(policy)
# Build model with mixed precision
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10, activation='softmax', dtype='float32') # Output layer should be float32
])
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
optimizer = mixed_precision.LossScaleOptimizer(optimizer)
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images)
loss = tf.keras.losses.categorical_crossentropy(labels, predictions)
scaled_loss = optimizer.get_scaled_loss(loss)
scaled_gradients = tape.gradient(scaled_loss, model.trainable_variables)
gradients = optimizer.get_unscaled_gradients(scaled_gradients)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
1.23 Profiling
import tensorflow as tf
# Profile the training steps 2 to 5
tf.profiler.experimental.start('logdir')
for step in range(10):
# Your training step here
with tf.profiler.experimental.Trace('train', step_num=step):
# ... your training code ...
pass
tf.profiler.experimental.stop()
Then, use TensorBoard to visualize the profiling results:
tensorboard --logdir logdir
1.24 Common Deep Learning Architectures
1.24.1 Convolutional Neural Network (CNN) for Image Classification
def create_cnn_model(input_shape=(224, 224, 3), num_classes=10):
"""Create a CNN for image classification"""
model = tf.keras.Sequential([
# Convolutional Block 1
tf.keras.layers.Conv2D(32, (3, 3), activation='relu',
input_shape=input_shape, padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Dropout(0.25),
# Convolutional Block 2
tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Dropout(0.25),
# Convolutional Block 3
tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Dropout(0.4),
# Dense Layers
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation='softmax')
])
return model
model = create_cnn_model()
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
1.24.2 Recurrent Neural Network (RNN) for Sequence Processing
def create_lstm_model(vocab_size=10000, embedding_dim=128, max_length=100):
"""Create an LSTM for text classification"""
model = tf.keras.Sequential([
tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
tf.keras.layers.SpatialDropout1D(0.2),
tf.keras.layers.LSTM(128, dropout=0.2, recurrent_dropout=0.2,
return_sequences=True),
tf.keras.layers.LSTM(64, dropout=0.2, recurrent_dropout=0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(1, activation='sigmoid')
])
return model
model = create_lstm_model()
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
1.24.3 Transfer Learning with Pre-trained Models
def create_transfer_learning_model(num_classes=10):
"""Create a model using transfer learning"""
# Load pre-trained model without top layer
base_model = tf.keras.applications.MobileNetV2(
input_shape=(224, 224, 3),
include_top=False,
weights='imagenet'
)
# Freeze base model
base_model.trainable = False
# Add custom top layers
model = tf.keras.Sequential([
base_model,
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation='softmax')
])
model.compile(
optimizer=tf.keras.optimizers.Adam(1e-4),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model, base_model
model, base_model = create_transfer_learning_model()
# Fine-tune: Unfreeze and train with lower learning rate
# After initial training:
# base_model.trainable = True
# model.compile(optimizer=tf.keras.optimizers.Adam(1e-5), ...)
1.24.4 Autoencoder for Dimensionality Reduction
def create_autoencoder(input_dim=784, encoding_dim=32):
"""Create an autoencoder"""
# Encoder
encoder_input = tf.keras.layers.Input(shape=(input_dim,))
encoded = tf.keras.layers.Dense(128, activation='relu')(encoder_input)
encoded = tf.keras.layers.Dense(64, activation='relu')(encoded)
encoded = tf.keras.layers.Dense(encoding_dim, activation='relu')(encoded)
# Decoder
decoded = tf.keras.layers.Dense(64, activation='relu')(encoded)
decoded = tf.keras.layers.Dense(128, activation='relu')(decoded)
decoder_output = tf.keras.layers.Dense(input_dim, activation='sigmoid')(decoded)
# Autoencoder model
autoencoder = tf.keras.Model(encoder_input, decoder_output)
# Encoder model (for encoding only)
encoder = tf.keras.Model(encoder_input, encoded)
autoencoder.compile(optimizer='adam', loss='mse')
return autoencoder, encoder
1.25 Best Practices
1.25.1 Development Workflow
Problem Definition
β
β
ββββββββββββββββββββ
β Data Collection β
β & Exploration β
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Preprocessing βββββ Normalization
β & Augmentation β Data augmentation
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Model Selection βββββ Start simple
β β Use transfer learning
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Training βββββ Monitor metrics
β β Use callbacks
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Evaluation βββββ Validation set
β β Cross-validation
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Hyperparameter βββββ Grid/Random search
β Tuning β Bayesian optimization
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββ
β Deployment βββββ SavedModel/TFLite
ββββββββββββββββββββ
1.25.2 Performance Optimization
Data Pipeline: - Use tf.data API for efficient data loading - Apply prefetch() to overlap data preprocessing and training - Use cache() to cache preprocessed data in memory - Set num_parallel_calls=tf.data.AUTOTUNE for parallel processing
Model Training: - Use @tf.function to convert Python functions to graphs - Enable mixed precision training with tf.keras.mixed_precision - Use distributed training strategies for multiple GPUs - Apply XLA compilation with jit_compile=True
Memory Management: - Use appropriate batch sizes - Enable GPU memory growth: tf.config.experimental.set_memory_growth() - Use gradient accumulation for large models - Apply gradient checkpointing to trade computation for memory
1.25.3 Model Design
Architecture Choices: - Start with simple models and increase complexity - Use pre-trained models and transfer learning when possible - Apply batch normalization after convolutional/dense layers - Use dropout for regularization (0.2-0.5 for most cases)
Regularization Techniques: - L1/L2 regularization: kernel_regularizer=tf.keras.regularizers.l2(0.01) - Dropout: tf.keras.layers.Dropout(0.5) - Early stopping: EarlyStopping(patience=5, restore_best_weights=True) - Data augmentation for computer vision tasks
Hyperparameters: - Learning rate: Start with 0.001, use learning rate schedulers - Batch size: 32-256 (larger for GPUs, depends on memory) - Optimizer: Adam for most cases, SGD with momentum for fine-tuning - Epochs: Use early stopping instead of fixed number
1.25.4 Debugging & Monitoring
Training Monitoring: - Use TensorBoard for visualization - Monitor both training and validation metrics - Watch for overfitting (val_loss increases while train_loss decreases) - Check gradient norms to detect vanishing/exploding gradients
Common Issues: - NaN losses: Reduce learning rate, check for log(0) or division by zero - Slow convergence: Increase learning rate, check data preprocessing - Overfitting: Add regularization, increase training data - Underfitting: Increase model capacity, train longer
1.25.5 Code Quality
Organization: - Separate data preprocessing, model definition, and training code - Use configuration files for hyperparameters - Version control your code and model checkpoints - Document model architecture and training procedures
Reproducibility: - Set random seeds: tf.random.set_seed(42) - Save model configurations and hyperparameters - Track experiments with MLflow or Weights & Biases - Keep detailed training logs
Testing: - Validate model with unit tests - Test data pipeline with small batches - Verify model outputs on known examples - Benchmark inference speed and memory usage
1.26 Common Issues and Debugging
1.26.1 Debugging Decision Tree
Training Issue?
β
ββββββββ΄βββββββ¬ββββββββββββ¬βββββββββββ
β β β β
Loss is Loss is Training Shape/Type
NaN Not Going is Slow Errors
β Down β β
β β β β
Check LR Check Model Profile Check Tensor
Gradient Capacity Code Dimensions
Clipping Learning Use GPU Use tf.shape
Data Rate tf.data Cast types
Data Mixed
Precision
1.26.2 Out of Memory (OOM) Errors
# Enable memory growth
gpus = tf.config.list_physical_devices('GPU')
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Reduce batch size
BATCH_SIZE = 16 # Instead of 64
# Use mixed precision
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
# Gradient accumulation for large effective batch size
accumulation_steps = 4
for step, (images, labels) in enumerate(dataset):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions) / accumulation_steps
gradients = tape.gradient(loss, model.trainable_variables)
if (step + 1) % accumulation_steps == 0:
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# Clear session to free memory
tf.keras.backend.clear_session()
1.26.3 NaN (Not a Number) Losses
# Check for NaN in data
def check_for_nan(data, labels):
assert not np.isnan(data).any(), "Found NaN in data"
assert not np.isnan(labels).any(), "Found NaN in labels"
# Use gradient clipping
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001, clipnorm=1.0)
# Or clip gradients manually
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
# Clip gradients
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# Add numerical stability to loss functions
# Instead of: tf.math.log(y_pred)
# Use: tf.math.log(y_pred + 1e-7)
# Use stable loss implementations
loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
# Add assertions
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
tf.debugging.check_numerics(loss, "Loss is NaN or Inf")
return loss
1.26.4 Slow Training
# Profile training
tf.profiler.experimental.start('logdir')
for step in range(100):
with tf.profiler.experimental.Trace('train', step_num=step):
train_step(images, labels)
tf.profiler.experimental.stop()
# Optimize data pipeline
dataset = dataset.cache() # Cache in memory
dataset = dataset.prefetch(tf.data.AUTOTUNE)
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
# Use XLA compilation
@tf.function(jit_compile=True)
def train_step(images, labels):
# ... training code ...
pass
# Mixed precision training
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
1.26.5 Shape Mismatches
# Inspect tensor shapes
x = tf.constant([[1, 2], [3, 4]])
print(f"Shape: {x.shape}") # TensorShape([2, 2])
print(f"Dynamic shape: {tf.shape(x)}") # Tensor([2, 2])
# Debug shape issues
@tf.function
def debug_shapes(x, y):
tf.print("x shape:", tf.shape(x))
tf.print("y shape:", tf.shape(y))
return x + y
# Assert shapes
def model_forward(x):
tf.debugging.assert_rank(x, 4, message="Input must be 4D (NHWC)")
tf.debugging.assert_equal(tf.shape(x)[1:3], [224, 224],
message="Image size must be 224x224")
return model(x)
# Reshape tensors
x = tf.reshape(x, [-1, 28, 28, 1]) # -1 infers batch dimension
1.26.6 Gradient Issues
# Check gradient norms
@tf.function
def train_step_with_gradient_monitoring(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
# Check gradient norms
gradient_norm = tf.sqrt(sum([tf.reduce_sum(g**2) for g in gradients
if g is not None]))
tf.print("Gradient norm:", gradient_norm)
# Clip gradients if too large
if gradient_norm > 10.0:
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# Use batch normalization
model = tf.keras.Sequential([
Conv2D(32, 3, activation='relu'),
BatchNormalization(),
# ...
])
# Use skip connections (ResNet-style)
def residual_block(x, filters):
shortcut = x
x = Conv2D(filters, 3, padding='same', activation='relu')(x)
x = Conv2D(filters, 3, padding='same')(x)
x = Add()([shortcut, x])
x = Activation('relu')(x)
return x
1.26.7 Overfitting vs Underfitting
# Monitor overfitting
def plot_training_history(history):
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.legend()
plt.title('Loss')
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.legend()
plt.title('Accuracy')
plt.show()
# Combat overfitting
model = tf.keras.Sequential([
Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
Dropout(0.5),
BatchNormalization(),
Dense(10, activation='softmax')
])
# Early stopping
early_stop = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
# Data augmentation
data_augmentation = tf.keras.Sequential([
tf.keras.layers.RandomFlip("horizontal"),
tf.keras.layers.RandomRotation(0.1),
tf.keras.layers.RandomZoom(0.1),
])
1.26.8 Debugging Tools
# Use tf.print for debugging inside @tf.function
@tf.function
def debug_function(x):
tf.print("Input:", x, output_stream=sys.stdout)
result = x * 2
tf.print("Output:", result)
return result
# Use assertions
@tf.function
def safe_divide(x, y):
tf.debugging.assert_positive(y, message="Divisor must be positive")
tf.debugging.assert_all_finite(x, message="x contains NaN or Inf")
return x / y
# Enable eager execution for debugging
tf.config.run_functions_eagerly(True)
# ... debug code ...
tf.config.run_functions_eagerly(False)
# Use tf.py_function to call Python code
def python_debug_function(x):
print(f"Python print: {x.numpy()}")
return x
@tf.function
def tf_function_with_python(x):
tf.py_function(python_debug_function, [x], [])
return x * 2
# Check model summary
model.summary()
# Visualize model architecture
tf.keras.utils.plot_model(model, to_file='model.png', show_shapes=True)