created working reinforcement learning model
This commit is contained in:
@@ -1,96 +1,252 @@
|
||||
import random
|
||||
from collections import deque
|
||||
from typing import Any
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
from tensorflow.python.keras import Sequential, regularizers
|
||||
from tensorflow.python.keras.layers import Dense
|
||||
from keras.engine.input_layer import InputLayer
|
||||
from keras.layers import BatchNormalization
|
||||
from tensorflow.python.keras import Sequential, regularizers, Input
|
||||
from tensorflow.python.keras.layers import Dense, Lambda, Dropout
|
||||
from tensorflow.python.keras.optimizer_v2.adam import Adam
|
||||
|
||||
from minimax.minimaxAlgo import MiniMax
|
||||
from utilities import Board
|
||||
from utilities.constants import WHITE, GREEN
|
||||
from utilities.gameManager import GameManager
|
||||
|
||||
|
||||
class ReinforcementLearning():
|
||||
|
||||
def __init__(self, action_space, state_space, env):
|
||||
self.action_space = action_space
|
||||
self.state_space = state_space
|
||||
self.env = env
|
||||
def __init__(self, actionSpace: list, board: Board, colour: int, gameManager: GameManager) -> None:
|
||||
"""
|
||||
Constructor for the ReinforcementLearning class
|
||||
:param actionSpace: the number of possible actions
|
||||
:param board: the game board
|
||||
"""
|
||||
self.gameManager = gameManager
|
||||
self.actionSpace = actionSpace
|
||||
self.board = board
|
||||
self.state = self.board.board
|
||||
self.colour = colour
|
||||
self.score = 0
|
||||
self.epsilon = 1
|
||||
self.gamma = .95
|
||||
self.batch_size = 64
|
||||
self.epsilon_min = .01
|
||||
self.epsilon_decay = .995
|
||||
self.learning_rate = 0.001
|
||||
self.memory = deque(maxlen=100000)
|
||||
self.model = self._buildModel()
|
||||
self.batchSize = 256
|
||||
self.maxSize = 32
|
||||
self.epsilonMin = .01
|
||||
self.epsilonDecay = .995
|
||||
self.learningRate = 0.001
|
||||
self.memory = deque(maxlen=10000000)
|
||||
self.model = self._buildMainModel()
|
||||
|
||||
def AI(self, episode):
|
||||
loss = []
|
||||
def AI(self, board: Board) -> tuple:
|
||||
"""
|
||||
Learns to play the draughts game
|
||||
:return: the loss
|
||||
"""
|
||||
self.board = board
|
||||
self.state = self._convertState(self.board.board)
|
||||
self.actionSpace = self._encodeMoves(self.colour, self.board)
|
||||
if len(self.actionSpace) == 0:
|
||||
return self.score, None
|
||||
|
||||
max_steps = 1000
|
||||
action = self._act()
|
||||
reward, nextState, done = self.board.step(action, self.colour)
|
||||
self.score += reward
|
||||
self.state = self._convertState(nextState.board)
|
||||
self._remember(deepcopy(self.board), action, reward, self.state, done)
|
||||
self._replay()
|
||||
|
||||
for e in range(episode):
|
||||
state = self.env.reset()
|
||||
state = np.reshape(state, (1, self.state_space))
|
||||
score = 0
|
||||
for i in range(max_steps):
|
||||
action = self.act(state)
|
||||
reward, next_state, done = self.env.step(action)
|
||||
score += reward
|
||||
next_state = np.reshape(next_state, (1, self.state_space))
|
||||
self.remember(state, action, reward, next_state, done)
|
||||
state = next_state
|
||||
self.replay()
|
||||
if done:
|
||||
print("episode: {}/{}, score: {}".format(e, episode, score))
|
||||
break
|
||||
loss.append(score)
|
||||
return self.score, nextState
|
||||
|
||||
def _buildModel(self):
|
||||
def _buildMainModel(self) -> Sequential:
|
||||
"""
|
||||
Build the model for the AI
|
||||
:return: the model
|
||||
"""
|
||||
# Board model
|
||||
board_model = Sequential()
|
||||
modelLayers = [
|
||||
Lambda(lambda x: tf.reshape(x, [-1, 32])),
|
||||
Dense(256, activation='relu'),
|
||||
Dropout(0.2),
|
||||
Dense(128, activation='relu'),
|
||||
Dropout(0.2),
|
||||
Dense(64, activation='relu'),
|
||||
Dropout(0.2),
|
||||
Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
|
||||
Dropout(0.2),
|
||||
Dense(16, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
|
||||
Dropout(0.2),
|
||||
Dense(1, activation='linear', kernel_regularizer=regularizers.l2(0.01))
|
||||
]
|
||||
boardModel = Sequential(modelLayers)
|
||||
|
||||
# input dimensions is 32 board position values
|
||||
board_model.add(Dense(64, activation='relu', input_dim=32))
|
||||
# boardModel.add(BatchNormalization())
|
||||
boardModel.compile(optimizer=Adam(learning_rate=0.0001), loss='mean_squared_error')
|
||||
boardModel.build(input_shape=(None, None))
|
||||
|
||||
# use regularizers, to prevent fitting noisy labels
|
||||
board_model.add(Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
|
||||
board_model.add(Dense(16, activation='relu', kernel_regularizer=regularizers.l2(0.01))) # 16
|
||||
board_model.add(Dense(8, activation='relu', kernel_regularizer=regularizers.l2(0.01))) # 8
|
||||
print(boardModel.summary())
|
||||
|
||||
# output isn't squashed, because it might lose information
|
||||
board_model.add(Dense(1, activation='linear', kernel_regularizer=regularizers.l2(0.01)))
|
||||
board_model.compile(optimizer='nadam', loss='binary_crossentropy')
|
||||
return boardModel
|
||||
|
||||
return board_model
|
||||
|
||||
def remember(self, state, action, reward, next_state, done):
|
||||
self.memory.append((state, action, reward, next_state, done))
|
||||
|
||||
def replay(self):
|
||||
if len(self.memory) < self.batch_size:
|
||||
def _replay(self) -> None:
|
||||
"""
|
||||
trains the model
|
||||
:return: None (void)
|
||||
"""
|
||||
if len(self.memory) < self.batchSize:
|
||||
# Not enough data to replay and test the model
|
||||
return
|
||||
|
||||
minibatch = random.sample(self.memory, self.batch_size)
|
||||
states = np.array([i[0] for i in minibatch])
|
||||
actions = np.array([i[1] for i in minibatch])
|
||||
rewards = np.array([i[2] for i in minibatch])
|
||||
next_states = np.array([i[3] for i in minibatch])
|
||||
dones = np.array([i[4] for i in minibatch])
|
||||
# Get a random sample from the memory
|
||||
minibatch = random.sample(self.memory, int(self.maxSize))
|
||||
|
||||
states = np.squeeze(states)
|
||||
next_states = np.squeeze(next_states)
|
||||
# Extract states, rewards, dones
|
||||
states = [m[0] for m in minibatch]
|
||||
rewards = [m[2] for m in minibatch]
|
||||
dones = [m[4] for m in minibatch]
|
||||
|
||||
targets = rewards + self.gamma * (np.amax(self.model.predict_on_batch(next_states), axis=1)) * (1 - dones)
|
||||
targets_full = self.model.predict_on_batch(states)
|
||||
# Encoded moves
|
||||
encodedMoves = []
|
||||
for state in states:
|
||||
encodedMoves.append(self._encodeMoves(self.colour, state))
|
||||
|
||||
ind = np.array([i for i in range(self.batch_size)])
|
||||
targets_full[[ind], [actions]] = targets
|
||||
# Calculate targets
|
||||
targets = []
|
||||
for i, moves in enumerate(encodedMoves):
|
||||
if dones[i]:
|
||||
target = rewards[i]
|
||||
else:
|
||||
target = rewards[i] + self.gamma * self._maxNextQ()
|
||||
|
||||
self.model.fit(states, targets_full, epochs=1, verbose=0)
|
||||
if self.epsilon > self.epsilon_min:
|
||||
self.epsilon *= self.epsilon_decay
|
||||
targets.append(target)
|
||||
|
||||
def act(self, state):
|
||||
encodedMoves = np.array([np.pad(m, (0, self.maxSize - len(m)), 'constant', constant_values=(1, 1))
|
||||
for m in encodedMoves])
|
||||
targets = np.array(targets)
|
||||
self.model.fit(self._normalise(encodedMoves), self._normalise(targets), epochs=20)
|
||||
if self.epsilon > self.epsilonMin:
|
||||
self.epsilon *= self.epsilonDecay
|
||||
|
||||
def _remember(self, state: np.array, action: int, reward: float, nextState: np.array, done: bool) -> None:
|
||||
"""
|
||||
Remembers what it has learnt
|
||||
:param state: the current state
|
||||
:param action: the action taken
|
||||
:param reward: the reward for the action
|
||||
:param nextState: the next state
|
||||
:param done: whether the game is finished
|
||||
:return: None (void)
|
||||
"""
|
||||
self.memory.append((state, action, reward, nextState, done))
|
||||
|
||||
def _act(self) -> Any:
|
||||
"""
|
||||
Chooses an action based on the available moves
|
||||
:return: the action
|
||||
"""
|
||||
if np.random.rand() <= self.epsilon:
|
||||
return random.randrange(self.action_space)
|
||||
act_values = self.model.predict(state)
|
||||
return np.argmax(act_values[0])
|
||||
# choose a random action from the action spaces list
|
||||
mm = MiniMax()
|
||||
value, newBoard = mm.AI(3, self.colour, self.gameManager)
|
||||
if newBoard is None:
|
||||
return random.choice(self.actionSpace)
|
||||
where = self._boardDiff(self.board, newBoard)
|
||||
return self._encode(where[0]+1, where[1]+1)
|
||||
|
||||
if len(self.actionSpace) == 1:
|
||||
return self.actionSpace[0]
|
||||
encodedMoves = np.squeeze(self.actionSpace)
|
||||
encodedMoves = np.pad(encodedMoves, (0, self.maxSize - len(encodedMoves)), 'constant', constant_values=(1, 1))
|
||||
act_values = self.model.predict(self._normalise(encodedMoves))
|
||||
return self.actionSpace[np.argmax(act_values[0])]
|
||||
|
||||
def resetScore(self):
|
||||
self.score = 0
|
||||
|
||||
def _convertState(self, board: list) -> list:
|
||||
"""
|
||||
Converts the board into a 2D list of numbers
|
||||
:param board: 2D list of pieces
|
||||
:return: new 2D list of numbers
|
||||
"""
|
||||
num_board = []
|
||||
|
||||
for row in board:
|
||||
num_row = []
|
||||
for piece in row:
|
||||
if piece == 0:
|
||||
num_row.append(0)
|
||||
continue
|
||||
|
||||
if piece.colour == 1:
|
||||
num_row.append(1)
|
||||
continue
|
||||
|
||||
num_row.append(2)
|
||||
|
||||
num_board.append(num_row)
|
||||
|
||||
return num_board
|
||||
|
||||
def _encode(self, start: tuple, end: tuple) -> int:
|
||||
"""
|
||||
Encodes the move into an integer
|
||||
:param start: tuple of start position
|
||||
:param end: tuple of end position
|
||||
:return: encoded move
|
||||
"""
|
||||
start_row = start[0]
|
||||
start_col = end[0]
|
||||
|
||||
end_row = start[-1]
|
||||
end_col = end[-1]
|
||||
|
||||
# Concatenate into integer
|
||||
return int(str(start_row) + str(start_col) + str(end_row) + str(end_col))
|
||||
|
||||
def _maxNextQ(self) -> float:
|
||||
colour = WHITE if self.colour == GREEN else GREEN
|
||||
encodedMoves = self._encodeMoves(colour, self.board)
|
||||
if len(encodedMoves) == 0:
|
||||
return -1
|
||||
paddedMoves = np.array(np.pad(encodedMoves, (0, self.maxSize - len(encodedMoves)), 'constant', constant_values=(1, 1)))
|
||||
# paddedMoves = np.reshape(paddedMoves, (32, 8, 8))
|
||||
# paddedMoves = paddedMoves / np.max(paddedMoved
|
||||
# paddedMoves = paddedMoves.reshape(32,)
|
||||
# pm = tf.convert_to_tensor(paddedMoves, dtype=tf.float32)
|
||||
# pm = tf.reshape(pm, [32])
|
||||
print(paddedMoves.shape)
|
||||
nextQValues = self.model.predict_on_batch(self._normalise(paddedMoves))
|
||||
return np.max(nextQValues)
|
||||
|
||||
def _encodeMoves(self, colour: int, board: Board) -> list:
|
||||
"""
|
||||
Encodes the moves into a list encoded moves
|
||||
:param colour: colour of the player
|
||||
:param board: the board
|
||||
:return: list of encoded moves
|
||||
"""
|
||||
encodedMoves = []
|
||||
moves = board.getAllMoves(colour)
|
||||
for move in moves:
|
||||
where = self._boardDiff(board, move)
|
||||
encodedMoves.append(self._encode(where[0]+1, where[1]+1))
|
||||
return encodedMoves
|
||||
|
||||
def _boardDiff(self, board, move):
|
||||
cnvState = np.array(self._convertState(board.board))
|
||||
cnvMove = np.array(self._convertState(move.board))
|
||||
diff = np.subtract(cnvMove, cnvState)
|
||||
diff = np.nonzero(diff)
|
||||
return diff
|
||||
|
||||
def _normalise(self, data):
|
||||
"""
|
||||
Normalise the data
|
||||
"""
|
||||
for i in range(len(data)):
|
||||
data[i] = data[i] / np.linalg.norm(data[i])
|
||||
return data
|
||||
|
||||
Reference in New Issue
Block a user