97 lines
3.5 KiB
Python
97 lines
3.5 KiB
Python
import random
|
|
from collections import deque
|
|
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
from tensorflow.python.keras import Sequential, regularizers
|
|
from tensorflow.python.keras.layers import Dense
|
|
|
|
|
|
class ReinforcementLearning():
|
|
|
|
def __init__(self, action_space, state_space, env):
|
|
self.action_space = action_space
|
|
self.state_space = state_space
|
|
self.env = env
|
|
self.epsilon = 1
|
|
self.gamma = .95
|
|
self.batch_size = 64
|
|
self.epsilon_min = .01
|
|
self.epsilon_decay = .995
|
|
self.learning_rate = 0.001
|
|
self.memory = deque(maxlen=100000)
|
|
self.model = self._buildModel()
|
|
|
|
def AI(self, episode):
|
|
loss = []
|
|
|
|
max_steps = 1000
|
|
|
|
for e in range(episode):
|
|
state = self.env.reset()
|
|
state = np.reshape(state, (1, self.state_space))
|
|
score = 0
|
|
for i in range(max_steps):
|
|
action = self.act(state)
|
|
reward, next_state, done = self.env.step(action)
|
|
score += reward
|
|
next_state = np.reshape(next_state, (1, self.state_space))
|
|
self.remember(state, action, reward, next_state, done)
|
|
state = next_state
|
|
self.replay()
|
|
if done:
|
|
print("episode: {}/{}, score: {}".format(e, episode, score))
|
|
break
|
|
loss.append(score)
|
|
|
|
def _buildModel(self):
|
|
# Board model
|
|
board_model = Sequential()
|
|
|
|
# input dimensions is 32 board position values
|
|
board_model.add(Dense(64, activation='relu', input_dim=32))
|
|
|
|
# use regularizers, to prevent fitting noisy labels
|
|
board_model.add(Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
|
|
board_model.add(Dense(16, activation='relu', kernel_regularizer=regularizers.l2(0.01))) # 16
|
|
board_model.add(Dense(8, activation='relu', kernel_regularizer=regularizers.l2(0.01))) # 8
|
|
|
|
# output isn't squashed, because it might lose information
|
|
board_model.add(Dense(1, activation='linear', kernel_regularizer=regularizers.l2(0.01)))
|
|
board_model.compile(optimizer='nadam', loss='binary_crossentropy')
|
|
|
|
return board_model
|
|
|
|
def remember(self, state, action, reward, next_state, done):
|
|
self.memory.append((state, action, reward, next_state, done))
|
|
|
|
def replay(self):
|
|
if len(self.memory) < self.batch_size:
|
|
return
|
|
|
|
minibatch = random.sample(self.memory, self.batch_size)
|
|
states = np.array([i[0] for i in minibatch])
|
|
actions = np.array([i[1] for i in minibatch])
|
|
rewards = np.array([i[2] for i in minibatch])
|
|
next_states = np.array([i[3] for i in minibatch])
|
|
dones = np.array([i[4] for i in minibatch])
|
|
|
|
states = np.squeeze(states)
|
|
next_states = np.squeeze(next_states)
|
|
|
|
targets = rewards + self.gamma * (np.amax(self.model.predict_on_batch(next_states), axis=1)) * (1 - dones)
|
|
targets_full = self.model.predict_on_batch(states)
|
|
|
|
ind = np.array([i for i in range(self.batch_size)])
|
|
targets_full[[ind], [actions]] = targets
|
|
|
|
self.model.fit(states, targets_full, epochs=1, verbose=0)
|
|
if self.epsilon > self.epsilon_min:
|
|
self.epsilon *= self.epsilon_decay
|
|
|
|
def act(self, state):
|
|
if np.random.rand() <= self.epsilon:
|
|
return random.randrange(self.action_space)
|
|
act_values = self.model.predict(state)
|
|
return np.argmax(act_values[0])
|