2023-07-28 19:34:53 +01:00
|
|
|
import random
|
|
|
|
from collections import deque
|
2023-08-22 16:31:16 +01:00
|
|
|
from typing import Any
|
|
|
|
from copy import deepcopy
|
2023-07-28 19:34:53 +01:00
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
import tensorflow as tf
|
2023-08-22 16:31:16 +01:00
|
|
|
from keras.engine.input_layer import InputLayer
|
|
|
|
from keras.layers import BatchNormalization
|
|
|
|
from tensorflow.python.keras import Sequential, regularizers, Input
|
|
|
|
from tensorflow.python.keras.layers import Dense, Lambda, Dropout
|
|
|
|
from tensorflow.python.keras.optimizer_v2.adam import Adam
|
|
|
|
|
|
|
|
from minimax.minimaxAlgo import MiniMax
|
|
|
|
from utilities import Board
|
|
|
|
from utilities.constants import WHITE, GREEN
|
|
|
|
from utilities.gameManager import GameManager
|
2023-07-28 19:34:53 +01:00
|
|
|
|
|
|
|
|
|
|
|
class ReinforcementLearning():
|
|
|
|
|
2023-08-22 16:31:16 +01:00
|
|
|
def __init__(self, actionSpace: list, board: Board, colour: int, gameManager: GameManager) -> None:
|
|
|
|
"""
|
|
|
|
Constructor for the ReinforcementLearning class
|
2023-09-18 20:11:39 +01:00
|
|
|
:param actionSpace: The number of possible actions
|
|
|
|
:param board: The game board
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
self.gameManager = gameManager
|
|
|
|
self.actionSpace = actionSpace
|
|
|
|
self.board = board
|
|
|
|
self.state = self.board.board
|
|
|
|
self.colour = colour
|
|
|
|
self.score = 0
|
2023-07-28 19:34:53 +01:00
|
|
|
self.epsilon = 1
|
|
|
|
self.gamma = .95
|
2023-09-18 20:11:39 +01:00
|
|
|
self.batchSize = 512
|
2023-08-22 16:31:16 +01:00
|
|
|
self.maxSize = 32
|
|
|
|
self.epsilonMin = .01
|
|
|
|
self.epsilonDecay = .995
|
2023-09-06 15:06:20 +01:00
|
|
|
self.learningRate = 0.0001
|
2023-08-22 16:31:16 +01:00
|
|
|
self.memory = deque(maxlen=10000000)
|
2023-09-06 15:06:20 +01:00
|
|
|
self.model = self.buildMainModel()
|
|
|
|
print(self.model.summary())
|
2023-08-22 16:31:16 +01:00
|
|
|
|
2023-09-18 20:11:39 +01:00
|
|
|
def AITrain(self, board: Board) -> tuple:
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
Learns to play the draughts game
|
2023-09-18 20:11:39 +01:00
|
|
|
:return: The loss
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
self.board = board
|
|
|
|
self.state = self._convertState(self.board.board)
|
2023-09-06 15:06:20 +01:00
|
|
|
self.actionSpace = self.encodeMoves(self.colour, self.board)
|
2023-08-22 16:31:16 +01:00
|
|
|
if len(self.actionSpace) == 0:
|
|
|
|
return self.score, None
|
|
|
|
|
|
|
|
action = self._act()
|
|
|
|
reward, nextState, done = self.board.step(action, self.colour)
|
|
|
|
self.score += reward
|
|
|
|
self.state = self._convertState(nextState.board)
|
|
|
|
self._remember(deepcopy(self.board), action, reward, self.state, done)
|
|
|
|
self._replay()
|
|
|
|
|
|
|
|
return self.score, nextState
|
|
|
|
|
2023-09-18 20:11:39 +01:00
|
|
|
def AITest(self, board: Board) -> Board:
|
|
|
|
"""
|
|
|
|
Runs the AI
|
|
|
|
:param board: The board
|
|
|
|
:return: The new board
|
|
|
|
"""
|
|
|
|
actionSpace = self.encodeMoves(WHITE, board)
|
|
|
|
if len(actionSpace) == 0:
|
|
|
|
print("Cannot make move")
|
|
|
|
return None
|
|
|
|
totalMoves = len(actionSpace)
|
|
|
|
# moves = np.squeeze(moves)
|
|
|
|
moves = np.pad(actionSpace, (0, self.maxSize - totalMoves), 'constant', constant_values=(1, 1))
|
|
|
|
act_values = self.model.predict(self.normalise(moves))
|
|
|
|
val = np.argmax(act_values[0])
|
|
|
|
val = val if val < totalMoves else totalMoves - 1
|
|
|
|
reward, newBoard, done = board.step(actionSpace[val], WHITE)
|
|
|
|
return newBoard
|
|
|
|
|
2023-09-06 15:06:20 +01:00
|
|
|
def buildMainModel(self) -> Sequential:
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
Build the model for the AI
|
2023-09-18 20:11:39 +01:00
|
|
|
:return: The model
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
2023-07-28 19:34:53 +01:00
|
|
|
# Board model
|
2023-08-22 16:31:16 +01:00
|
|
|
modelLayers = [
|
|
|
|
Lambda(lambda x: tf.reshape(x, [-1, 32])),
|
2023-09-06 15:06:20 +01:00
|
|
|
Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
|
2023-08-22 16:31:16 +01:00
|
|
|
Dropout(0.2),
|
2023-09-06 15:06:20 +01:00
|
|
|
Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
|
2023-08-22 16:31:16 +01:00
|
|
|
Dropout(0.2),
|
2023-09-06 15:06:20 +01:00
|
|
|
Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
|
2023-08-22 16:31:16 +01:00
|
|
|
Dropout(0.2),
|
2023-09-06 15:06:20 +01:00
|
|
|
Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
|
2023-08-22 16:31:16 +01:00
|
|
|
Dropout(0.2),
|
2023-09-06 15:06:20 +01:00
|
|
|
Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
|
2023-08-22 16:31:16 +01:00
|
|
|
Dropout(0.2),
|
2023-09-06 15:06:20 +01:00
|
|
|
Dense(16, activation='linear', kernel_regularizer=regularizers.l2(0.01))
|
2023-08-22 16:31:16 +01:00
|
|
|
]
|
|
|
|
boardModel = Sequential(modelLayers)
|
|
|
|
|
|
|
|
# boardModel.add(BatchNormalization())
|
2023-09-06 15:06:20 +01:00
|
|
|
boardModel.compile(optimizer=Adam(learning_rate=self.learningRate), loss='mean_squared_error')
|
2023-08-22 16:31:16 +01:00
|
|
|
boardModel.build(input_shape=(None, None))
|
|
|
|
|
|
|
|
return boardModel
|
|
|
|
|
|
|
|
def _replay(self) -> None:
|
|
|
|
"""
|
|
|
|
trains the model
|
2023-09-18 20:11:39 +01:00
|
|
|
:return: None
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
if len(self.memory) < self.batchSize:
|
|
|
|
# Not enough data to replay and test the model
|
2023-07-28 19:34:53 +01:00
|
|
|
return
|
|
|
|
|
2023-08-22 16:31:16 +01:00
|
|
|
# Get a random sample from the memory
|
|
|
|
minibatch = random.sample(self.memory, int(self.maxSize))
|
|
|
|
|
|
|
|
# Extract states, rewards, dones
|
|
|
|
states = [m[0] for m in minibatch]
|
|
|
|
rewards = [m[2] for m in minibatch]
|
|
|
|
dones = [m[4] for m in minibatch]
|
|
|
|
|
|
|
|
# Encoded moves
|
|
|
|
encodedMoves = []
|
|
|
|
for state in states:
|
2023-09-06 15:06:20 +01:00
|
|
|
encodedMoves.append(self.encodeMoves(self.colour, state))
|
2023-08-22 16:31:16 +01:00
|
|
|
|
|
|
|
# Calculate targets
|
|
|
|
targets = []
|
|
|
|
for i, moves in enumerate(encodedMoves):
|
|
|
|
if dones[i]:
|
|
|
|
target = rewards[i]
|
|
|
|
else:
|
|
|
|
target = rewards[i] + self.gamma * self._maxNextQ()
|
|
|
|
|
|
|
|
targets.append(target)
|
|
|
|
|
|
|
|
encodedMoves = np.array([np.pad(m, (0, self.maxSize - len(m)), 'constant', constant_values=(1, 1))
|
|
|
|
for m in encodedMoves])
|
|
|
|
targets = np.array(targets)
|
2023-09-06 15:06:20 +01:00
|
|
|
self.model.fit(self.normalise(encodedMoves), self.normalise(targets), epochs=20)
|
2023-08-22 16:31:16 +01:00
|
|
|
if self.epsilon > self.epsilonMin:
|
|
|
|
self.epsilon *= self.epsilonDecay
|
|
|
|
|
|
|
|
def _remember(self, state: np.array, action: int, reward: float, nextState: np.array, done: bool) -> None:
|
|
|
|
"""
|
|
|
|
Remembers what it has learnt
|
2023-09-18 20:11:39 +01:00
|
|
|
:param state: The current state
|
|
|
|
:param action: The action taken
|
|
|
|
:param reward: The reward for the action
|
|
|
|
:param nextState: The next state
|
|
|
|
:param done: Whether the game is finished
|
|
|
|
:return: None
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
self.memory.append((state, action, reward, nextState, done))
|
|
|
|
|
|
|
|
def _act(self) -> Any:
|
|
|
|
"""
|
|
|
|
Chooses an action based on the available moves
|
2023-09-18 20:11:39 +01:00
|
|
|
:return: The action
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
2023-07-28 19:34:53 +01:00
|
|
|
if np.random.rand() <= self.epsilon:
|
2023-08-22 16:31:16 +01:00
|
|
|
# choose a random action from the action spaces list
|
|
|
|
mm = MiniMax()
|
|
|
|
value, newBoard = mm.AI(3, self.colour, self.gameManager)
|
|
|
|
if newBoard is None:
|
|
|
|
return random.choice(self.actionSpace)
|
|
|
|
where = self._boardDiff(self.board, newBoard)
|
|
|
|
return self._encode(where[0]+1, where[1]+1)
|
|
|
|
|
|
|
|
if len(self.actionSpace) == 1:
|
|
|
|
return self.actionSpace[0]
|
|
|
|
encodedMoves = np.squeeze(self.actionSpace)
|
|
|
|
encodedMoves = np.pad(encodedMoves, (0, self.maxSize - len(encodedMoves)), 'constant', constant_values=(1, 1))
|
2023-09-18 20:11:39 +01:00
|
|
|
actValues = self.model.predict(self.normalise(encodedMoves))
|
|
|
|
val = np.argmax(actValues[0])
|
2023-09-06 15:06:20 +01:00
|
|
|
val = val if val < len(self.actionSpace) else len(self.actionSpace) - 1
|
|
|
|
return self.actionSpace[val]
|
2023-08-22 16:31:16 +01:00
|
|
|
|
2023-09-18 20:11:39 +01:00
|
|
|
def resetScore(self) -> None:
|
|
|
|
"""
|
|
|
|
Resets the score
|
|
|
|
:return: None
|
|
|
|
"""
|
2023-08-22 16:31:16 +01:00
|
|
|
self.score = 0
|
|
|
|
|
|
|
|
def _convertState(self, board: list) -> list:
|
|
|
|
"""
|
|
|
|
Converts the board into a 2D list of numbers
|
|
|
|
:param board: 2D list of pieces
|
|
|
|
:return: new 2D list of numbers
|
|
|
|
"""
|
|
|
|
num_board = []
|
|
|
|
|
|
|
|
for row in board:
|
|
|
|
num_row = []
|
|
|
|
for piece in row:
|
|
|
|
if piece == 0:
|
|
|
|
num_row.append(0)
|
|
|
|
continue
|
|
|
|
|
|
|
|
if piece.colour == 1:
|
|
|
|
num_row.append(1)
|
|
|
|
continue
|
|
|
|
|
|
|
|
num_row.append(2)
|
|
|
|
|
|
|
|
num_board.append(num_row)
|
|
|
|
|
|
|
|
return num_board
|
|
|
|
|
|
|
|
def _encode(self, start: tuple, end: tuple) -> int:
|
|
|
|
"""
|
|
|
|
Encodes the move into an integer
|
2023-09-18 20:11:39 +01:00
|
|
|
:param start: Tuple of start position
|
|
|
|
:param end: Tuple of end position
|
|
|
|
:return: Encoded move
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
start_row = start[0]
|
|
|
|
start_col = end[0]
|
|
|
|
|
|
|
|
end_row = start[-1]
|
|
|
|
end_col = end[-1]
|
|
|
|
|
|
|
|
# Concatenate into integer
|
|
|
|
return int(str(start_row) + str(start_col) + str(end_row) + str(end_col))
|
|
|
|
|
|
|
|
def _maxNextQ(self) -> float:
|
2023-09-18 20:11:39 +01:00
|
|
|
"""
|
|
|
|
Calculates the max Q value for the next state
|
|
|
|
:return: the max Q value
|
|
|
|
"""
|
2023-08-22 16:31:16 +01:00
|
|
|
colour = WHITE if self.colour == GREEN else GREEN
|
2023-09-06 15:06:20 +01:00
|
|
|
encodedMoves = self.encodeMoves(colour, self.board)
|
2023-08-22 16:31:16 +01:00
|
|
|
if len(encodedMoves) == 0:
|
|
|
|
return -1
|
|
|
|
paddedMoves = np.array(np.pad(encodedMoves, (0, self.maxSize - len(encodedMoves)), 'constant', constant_values=(1, 1)))
|
2023-09-06 15:06:20 +01:00
|
|
|
nextQValues = self.model.predict_on_batch(self.normalise(paddedMoves))
|
2023-08-22 16:31:16 +01:00
|
|
|
return np.max(nextQValues)
|
|
|
|
|
2023-09-06 15:06:20 +01:00
|
|
|
def encodeMoves(self, colour: int, board: Board) -> list:
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
Encodes the moves into a list encoded moves
|
2023-09-18 20:11:39 +01:00
|
|
|
:param colour: Colour of the player
|
|
|
|
:param board: The board
|
|
|
|
:return: list Of encoded moves
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
encodedMoves = []
|
|
|
|
moves = board.getAllMoves(colour)
|
|
|
|
for move in moves:
|
|
|
|
where = self._boardDiff(board, move)
|
|
|
|
encodedMoves.append(self._encode(where[0]+1, where[1]+1))
|
|
|
|
return encodedMoves
|
|
|
|
|
2023-09-18 20:11:39 +01:00
|
|
|
def _boardDiff(self, board: Board, move: Board) -> np.array:
|
|
|
|
"""
|
|
|
|
Finds the difference between the two boards
|
|
|
|
:param board: The current board
|
|
|
|
:param move: The new board
|
|
|
|
:return: the difference between the two boards
|
|
|
|
"""
|
2023-08-22 16:31:16 +01:00
|
|
|
cnvState = np.array(self._convertState(board.board))
|
|
|
|
cnvMove = np.array(self._convertState(move.board))
|
|
|
|
diff = np.subtract(cnvMove, cnvState)
|
|
|
|
diff = np.nonzero(diff)
|
|
|
|
return diff
|
|
|
|
|
2023-09-18 20:11:39 +01:00
|
|
|
def normalise(self, data: np.array) -> np.array:
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
|
|
|
Normalise the data
|
2023-09-18 20:11:39 +01:00
|
|
|
:param data: the data to normalise
|
|
|
|
:return: normalised data
|
2023-08-22 16:31:16 +01:00
|
|
|
"""
|
2023-09-06 15:06:20 +01:00
|
|
|
return data / 10000
|