tf.GradientTape Usage
medium.com의 [링크] 내용을 정리함.
The Basics
tf.GradientTape은 TensorFlow 계산을 추적하고 주어진 변수에 관한 기울기(gradient)를 계산할 수 있게 한다.
1.0 - Introduction
아래 예제처럼 tf.GradientTape으로 계산을 추적하고 기울기를 계산할 수 있다.
Console>>
import tensorflow as tf
x = tf.constant(5.0)
with tf.GradientTape() as tape:
tape.watch(x)
y = x**3
print(tape.gradient(y, x).numpy()) # -> 75.0
- 기본적으로, GradientTape은 상수(constant)를 추적하지 않는다. 따라서 *tape.watch(variable)로 지시하여야만 한다.
- 그러면 관찰(watch)하고 있는 변수에서 계산을 수행할 수 있다. 계산은 이를 규빙($x^3$)하는 것에서부터 신경망을 통해 전달하는 것까지 어떤것이라도 될 수 있다.
- tape.gradient(target, source)로 변수와 관한 기울기를 계산한다. tape.gradient는 .numpy()로 ndarray 형식으로 변환할 수 있는 EagerTensor를 반환한다.
어느때라도 계산내에서 여러개의 변수를 사용하려면, tape.gradient에 이들 변수의 list 또는 tuple을 주면 된다. 케라스 모델을 최적화할 때, 변수 목록(list)로 model.trainable_variables를 전달한다.
1.1 - Automatically Watching Variables
만약 $x$가 상수가 아닌 훈련가능한 변수라면, tape에 이 변수를 관찰하라고 명시적으로 표시할 필요가 없다. GradientTape은 자동적으로 모든 훈련가능한 변수를 관찰한다.
x = tf.Variable(6.0, trainable=True)
with tf.GradientTape() as tape:
y = x**3
print(tape.gradient(y, x).numpy()) # -> 108.0
만약 첫번째 라인을 아래와 같이 변경하고 재실행하면,
x = tf.constant(3.0)
or
x = tf.Variable(3.0, trainable=False)
예제는 GradientTape이 $x$를 관찰하고 있지 않으므로 오류를 발생시킬 것이다.
1.2 - watch_accessed_variables=False
만약 GradientTape*이 모든 변수를 자동적으로 관찰하지 않도록 하려면, tape의 *watch_accessed_variables 인자에 False를 설정할 수 있다.
x = tf.Variable(3.0, trainable=True)
with tf.GradientTape(watch_accessed_variables=False) as tape:
y = x**3
print(tape.gradient(y, x)) # -> None
watch_accessed_variables를 비활성화하는 것은 관찰하기를 원하는 변수 전반에 대해 세밀한 제어를 할 수 있게 한다.
만약 너무 많은 훈련 가능한 변수가 있고 한번에 모두 최적화 할 수 없다면, 실수를 방지하기 위해 watch_accessed_variables를 비활성화 할 것이다.
1.3 - High-Order Derivatives(고차미분)
고차미분을 계산하려면, 중첩된 GradientTape을 사용할 수 있다.
x = tf.Variable(3.0, trainable=True)
with tf.GradientTape() as tape1:
with tf.GradientTape() as tape2:
y = x ** 3
order_1 = tape2.gradient(y, x)
order_2 = tape1.gradient(order_1, x)
print(order_2.numpy()) # -> 18.0
고차미분은 일반적으로 GradientTape 내부에서 기울기 계산이 필요할 때 뿐이다. 그렇지 않으면, GradientTape이 기울기에서 모든 완료된 계산을 관찰하여 계산이 느려질 것이다.
1.4 - persistent=True
아래 예제를 실행하면
a = tf.Variable(6.0, trainable=True)
b = tf.Variable(2.0, trainable=True)
with tf.GradientTape() as tape:
y1 = a ** 2
y2 = b ** 3
print(tape.gradient(y1, a).numpy())
print(tape.gradient(y2, b).numpy())
다음의 결과를 예상하게 된다
12.0
12.0
하지만 실제로는 두번째 tape.gradient를 호출하는 것은 오류를 발생시킨다.
이는 tape.gradient를 호출한 이후 즉시, GradientTape은 계산을 위해 내부에 저당된 모든 정보를 해제(release)하기 때문이다.
만약 이를 지나가게 하려면, persistent=True로 설정하면 된다.
a = tf.Variable(6.0, trainable=True)
b = tf.Variable(2.0, trainable=True)
with tf.GradientTape(persistent=True) as tape:
y1 = a ** 2
y2 = b ** 3
print(tape.gradient(y1, a).numpy())
print(tape.gradient(y2, b).numpy())
그려면 아래와 같은 예상했던 결과를 얻게된다.
12.0
12.0
1.5 - stop_recording()
tape.stop_recording()은 임시로 tape 기록을 멈추어 더 나은 계산 속도를 이끌어 낸다.
x = tf.Variable(3.0, trainable=True)
with tf.GradientTape() as tape:
y = x**3
with tape.stop_recording():
print(tape.gradient(y, x).numpy()) # -> 27.0
긴 함수에서는 함수의 중간에서 기울기를 계산하기 위한 stop_recording 블록을 여러번 사용하는 것이 함수의 끝에서 모든 기울기를 계산하는 것보다 더 가독성을 높혀준다.
아래는 stop_recording 블록을 사용한 예제이다.
a = tf.Variable(6.0, trainable=True)
b = tf.Variable(2.0, trainable=True)
with tf.GradientTape(persistent=True) as tape:
y1 = a ** 2
with tape.stop_recording():
print(tape.gradient(y1, a).numpy())
y2 = b ** 3
with tape.stop_recording():
print(tape.gradient(y2, b).numpy())
반면, 다음은 함수의 끝에서 모든 기울기를 사용한 예제이다.
a = tf.Variable(6.0, trainable=True)
b = tf.Variable(2.0, trainable=True)
with tf.GradientTape(persistent=True) as tape:
y1 = a ** 2
y2 = b ** 3
print(tape.gradient(y1, a).numpy())
print(tape.gradient(y2, b).numpy())
1.6 - Other Methods
GradientTape은 다음처름 편리한 몇가지 메서드도 포함한다.
- .jacobian
: tape의 컨텍스내 기록된 연산을 사용하여 야코비안을 계산. - .batch_jacobian
: 예제별로 야코비안을 계산하고 스택으로 만듦. - .reset
: tape내 기록된 모든 정보를 지움. - .watched_variables
: 생성을 위해 tape이 관찰하고 있는 변수를 반환
Advenced Uses
2.0 - Linear Regression (선형회귀)
GradientTape의 더 나은 사용을 시작하기 위해, ML : linear regression의 고전적인 'Hello World!'를 보자
우선, 필수적인 변수와 함수를 선언한다.
import numpy as np
import random
# Loss function
def loss(real_y, pred_y):
return tf.abs(real_y - pred_y)
# Training data
x_train = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
y_train = np.asarray([i*10+5 for i in x_train]) # y = 10x+5
# Trainable variables
a = tf.Variable(random.random(), trainable=True)
b = tf.Variable(random.random(), trainable=True)
그리고, step function을 정의한다. step function은 훈련가능한 변수 a와 b를 갱신하기 우해 매 epoch마다 실행된다
def step(real_x, real_y):
with tf.GradientTape(persistent=True) as tape:
# Make prediction
pred_y = a * real_x + b
# Calculate loss
reg_loss = loss(real_y, pred_y)
# Calculate gradients
a_gradients, b_gradients = tape.gradient(reg_loss, (a, b))
# Update variables
a.assign_sub(a_gradients * 0.001)
b.assign_sub(b_gradients * 0.001)
마지막으로 step function을 100,000회쯤 호출하고 추정된 변수를 출력한다.
for _ in range(100000):
step(x_train, y_train)
print(f'y ≈ {a.numpy()}x + {b.numpy()}')
결과는 아래처럼 나타난다.
Result>>
y ≈ 9.986780166625977x + 4.990530490875244
위 결과는 목표인 $10x + 5$와 매우 가까운 값을 보여준다.
전체 소스는 아래와 같다.
import random
import numpy as np
# Loss function
def loss(real_y, pred_y):
return tf.abs(real_y - pred_y)
# Training data
x_train = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
y_train = np.asarray([i*10+5 for i in x_train]) # y = 10x+5
# Trainable variables
a = tf.Variable(random.random(), trainable=True)
b = tf.Variable(random.random(), trainable=True)
# Step function
def step(real_x, real_y):
with tf.GradientTape(persistent=True) as tape:
# Make prediction
pred_y = a * real_x + b
# Calculate loss
reg_loss = loss(real_y, pred_y)
# Calculate gradients
a_gradients, b_gradients = tape.gradient(reg_loss, (a, b))
# Update variables
a.assign_sub(a_gradients * 0.001)
b.assign_sub(b_gradients * 0.001)
# Training loop
for _ in range(10000):
step(x_train, y_train)
print(f'y ≈ {a.numpy()}x + {b.numpy()}')
2.1 - Polynomial Regression (다항회귀)
어떠 다항식에도 도작하도록 하기 위해 이전 예제를 확장하자.
단지 사용한 변수와 최적화하는 수식을 바꾸고 설정하면 된다.
# Loss function
def loss(real_y, pred_y):
return tf.abs(real_y - pred_y)
# Training data
x_train = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
y_train = np.asarray([6*i**2 + 8*i + 2 for i in x_train]) # y = 6x^2 + 8x + 2
# Trainable variables
a = tf.Variable(random.random(), trainable=True)
b = tf.Variable(random.random(), trainable=True)
c = tf.Variable(random.random(), trainable=True)
# Step function
def step(real_x, real_y):
with tf.GradientTape(persistent=True) as tape:
# Make prediction
pred_y = a*real_x**2 + b*real_x + c
# Calculate loss
poly_loss = loss(real_y, pred_y)
# Calculate gradients
a_gradients, b_gradients, c_gradients = tape.gradient(poly_loss, (a, b, c))
# Update variables
a.assign_sub(a_gradients * 0.001)
b.assign_sub(b_gradients * 0.001)
c.assign_sub(c_gradients * 0.001)
# Training loop
for _ in range(10000):
step(x_train, y_train)
print(f'y ≈ {a.numpy()}x^2 + {b.numpy()}x + {c.numpy()}')
위 예제는 10,000 epoch를 수행한 후 결과는 다음과 같다.
Result>>
y ≈ 6.418105602264404x^2 + 7.572245121002197x + 2.0106215476989746
2.2 - Classifying MNIST
다항회귀는 흥미롭지만, 그 실제 아이디어는 신경망을 최적화하는 것이다.
이전 예제를 조금 바꾸는 것으로 이를 수행할 수 있다.
표준 절차인 데이터 로딩, 전처리 그리고 하이퍼파라미터 설정의 순서를 따라 진행한다.
from tensorflow.keras.datasets import mnist
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomNormal
# Load and pre-process training data
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = (x_train / 255).reshape((-1, 28, 28, 1))
y_train = tf.keras.utils.to_categorical(y_train, 10)
x_test = (x_test / 255).reshape((-1, 28, 28, 1))
y_test = tf.keras.utils.to_categorical(y_test, 10)
# Hyperparameters
batch_size = 128
epochs = 50
optimizer = Adam(lr=0.001)
weight_init = RandomNormal()
모델을 만든다.
from tensorflow.keras.layers import Conv2D, Flatten, Dense, Dropout, MaxPooling2D
from tensorflow.keras.models import Sequential
# Build model
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', kernel_initializer=weight_init, input_shape=(28, 28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer=weight_init))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu', kernel_initializer=weight_init))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax', kernel_initializer=weight_init))
step function을 정의한다.
def step(real_x, real_y):
with tf.GradientTape() as tape:
# Make prediction
pred_y = model(real_x.reshape((-1, 28, 28, 1)))
# Calculate loss
model_loss = tf.keras.losses.categorical_crossentropy(real_y, pred_y)
# Calculate gradients
model_gradients = tape.gradient(model_loss, model.trainable_variables)
# Update model
optimizer.apply_gradients(zip(model_gradients, model.trainable_variables))
optimizer.apply_gradients(zip(gradients, variables))는 직접 변수에 계산된 기울기를 적용한다.
step function 훈련으로 훈련 loop를 설정하고 모델의 정확도를 계산할 수 있다.
# Training loop
bat_per_epoch = math.floor(len(x_train) / batch_size)
for epoch in range(epochs):
print('=', end='')
for i in range(bat_per_epoch):
n = i*batch_size
step(x_train[n:n+batch_size], y_train[n:n+batch_size])
# Calculate accuracy
model.compile(optimizer=optimizer, loss=tf.keras.losses.categorical_crossentropy, metrics=['acc']) # Compile just for evaluation
print('\n', model.evaluate(x_test, y_test, verbose=0)[1])
예제를 실행하면 0.99정도의 정확도를 보일 것이다.
위 예제처럼 GradientTape 프로그래밍 스타일과 동일하게 다수의 모델과 손실함수를 가진 복잡한 수학적 수식으로 큰 노력없이 확장할 수 있다.