Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions daemon/namenode/namenode.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package namenode

import (
"bufio"
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
"strconv"
"time"

Expand All @@ -12,6 +15,8 @@ import (
"github.com/rounakdatta/GoDFS/util"
)

const NameNodeState = "NN.STATE"

func removeElementFromSlice(elements []string, index int) []string {
return append(elements[:index], elements[index+1:]...)
}
Expand Down Expand Up @@ -61,7 +66,7 @@ func discoverDataNodes(nameNodeInstance *namenode.Service, listOfDataNodes *[]st
return nil
}

func InitializeNameNodeUtil(serverPort int, blockSize int, replicationFactor int, listOfDataNodes []string) {
func InitializeNameNodeUtil(serverPort int, blockSize int, replicationFactor int, isPrimary bool, listOfDataNodes []string) {
nameNodeInstance := namenode.NewService(uint64(blockSize), uint64(replicationFactor), uint16(serverPort))
err := discoverDataNodes(nameNodeInstance, &listOfDataNodes)
util.Check(err)
Expand All @@ -71,7 +76,14 @@ func InitializeNameNodeUtil(serverPort int, blockSize int, replicationFactor int
log.Printf("List of DataNode(s) in service is %q\n", listOfDataNodes)
log.Printf("NameNode port is %d\n", serverPort)

go heartbeatToDataNodes(listOfDataNodes, nameNodeInstance)
if isPrimary {
log.Printf("Starting processes for primary NameNode\n")
go heartbeatToDataNodes(listOfDataNodes, nameNodeInstance)
go scheduledFlushToDisk(nameNodeInstance)
} else {
log.Printf("Starting processes for secondary NameNode\n")
go getServiceStateFromPrimary(nameNodeInstance, listOfDataNodes)
}

err = rpc.Register(nameNodeInstance)
util.Check(err)
Expand Down Expand Up @@ -114,3 +126,53 @@ func heartbeatToDataNodes(listOfDataNodes []string, nameNode *namenode.Service)
}
}
}

func scheduledFlushToDisk(namenode *namenode.Service) {
for range time.Tick(time.Second * 10) {
flushServiceStateToDisk(namenode)
}
}

func flushServiceStateToDisk(namenode *namenode.Service) {
state, err := SerializeNameNodeImage(namenode)
util.Check(err)

fileWriteHandler, err := os.Create(NameNodeState)
util.Check(err)

fileWriter := bufio.NewWriter(fileWriteHandler)
_, err = fileWriter.WriteString(state)
util.Check(err)
fileWriter.Flush()
fileWriteHandler.Close()
log.Println("NameNodeService state flushed to disk")
}

func getServiceStateFromPrimary(namenodeInstance *namenode.Service, listOfDataNodes []string) {
primaryNameNodeHost := "localhost"
const primaryNameNodePort = "9000"

for range time.Tick(time.Second * 2) {
primaryNameNodeClient, connectionErr := rpc.Dial("tcp", primaryNameNodeHost+":"+primaryNameNodePort)

if connectionErr != nil {
log.Printf("Unable to connect to NameNode on %s, starting recovery\n", primaryNameNodePort)
dataBytes, err := ioutil.ReadFile(NameNodeState)
util.Check(err)

namenodeInstance, err = DeserializeNameNodeImage(string(dataBytes))
util.Check(err)
go heartbeatToDataNodes(listOfDataNodes, namenodeInstance)
go flushServiceStateToDisk(namenodeInstance)
return
}

var response *namenode.Service
stateFetchErr := primaryNameNodeClient.Call("Service.GetState", true, &response)
if stateFetchErr != nil {
log.Println("Error fetching state from primary NameNode")
}

flushServiceStateToDisk(response)
}
}
41 changes: 41 additions & 0 deletions daemon/namenode/serde.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package namenode

import (
"bytes"
"encoding/base64"
"encoding/gob"

"github.com/rounakdatta/GoDFS/namenode"
)

func SerializeNameNodeImage(image *namenode.Service) (string, error) {
buf := bytes.Buffer{}
e := gob.NewEncoder(&buf)
err := e.Encode(image)

if err != nil {
return "", err
}

return base64.StdEncoding.EncodeToString(buf.Bytes()), nil
}

func DeserializeNameNodeImage(serializedImage string) (*namenode.Service, error) {
image := namenode.Service{}

data, err := base64.StdEncoding.DecodeString(serializedImage)
if err != nil {
return nil, err
}

buf := bytes.Buffer{}
buf.Write(data)
d := gob.NewDecoder(&buf)
err = d.Decode(&image)

if err != nil {
return nil, err
}

return &image, nil
}
10 changes: 6 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"flag"
"github.com/rounakdatta/GoDFS/daemon/client"
"github.com/rounakdatta/GoDFS/daemon/datanode"
"github.com/rounakdatta/GoDFS/daemon/namenode"
"log"
"os"
"strings"

"github.com/rounakdatta/GoDFS/daemon/client"
"github.com/rounakdatta/GoDFS/daemon/datanode"
"github.com/rounakdatta/GoDFS/daemon/namenode"
)

func main() {
Expand All @@ -22,6 +23,7 @@ func main() {
nameNodeListPtr := nameNodeCommand.String("datanodes", "", "Comma-separated list of DataNodes to connect to")
nameNodeBlockSizePtr := nameNodeCommand.Int("block-size", 32, "Block size to store")
nameNodeReplicationFactorPtr := nameNodeCommand.Int("replication-factor", 1, "Replication factor of the system")
nameNodeIsPrimary := nameNodeCommand.Bool("is-primary", true, "If this NameNode is primary")

clientNameNodePortPtr := clientCommand.String("namenode", "localhost:9000", "NameNode communication port")
clientOperationPtr := clientCommand.String("operation", "", "Operation to perform")
Expand All @@ -46,7 +48,7 @@ func main() {
} else {
listOfDataNodes = []string{}
}
namenode.InitializeNameNodeUtil(*nameNodePortPtr, *nameNodeBlockSizePtr, *nameNodeReplicationFactorPtr, listOfDataNodes)
namenode.InitializeNameNodeUtil(*nameNodePortPtr, *nameNodeBlockSizePtr, *nameNodeReplicationFactorPtr, *nameNodeIsPrimary, listOfDataNodes)

case "client":
_ = clientCommand.Parse(os.Args[2:])
Expand Down
14 changes: 11 additions & 3 deletions namenode/namenode.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package namenode

import (
"github.com/google/uuid"
"github.com/rounakdatta/GoDFS/datanode"
"github.com/rounakdatta/GoDFS/util"
"log"
"math"
"math/rand"
"net/rpc"
"strings"

"github.com/google/uuid"
"github.com/rounakdatta/GoDFS/datanode"
"github.com/rounakdatta/GoDFS/util"
)

type NameNodeMetaData struct {
Expand Down Expand Up @@ -67,6 +68,13 @@ func selectRandomNumbers(availableItems []uint64, count uint64) (randomNumberSet
return
}

func (nameNode *Service) GetState(request bool, reply *Service) error {
if request {
reply = nameNode
}
return nil
}

func (nameNode *Service) GetBlockSize(request bool, reply *uint64) error {
if request {
*reply = nameNode.BlockSize
Expand Down