|
@@ -104,8 +104,8 @@ func dirname(filepath string) string {
|
|
|
return path.Dir(filepath)
|
|
|
}
|
|
|
|
|
|
-func basename(filepath string) string {
|
|
|
- return path.Base(filepath)
|
|
|
+func basename(fullpath string) string {
|
|
|
+ return filepath.Base(fullpath)
|
|
|
}
|
|
|
|
|
|
//End of mapping functions
|
|
@@ -130,11 +130,19 @@ func padZeros(thisInt string, maxval int) string {
|
|
|
}
|
|
|
|
|
|
type clusterConfig struct {
|
|
|
- Prefix string `json: "prefix"`
|
|
|
- Port string `json: "port`
|
|
|
+ Prefix string `json:"prefix"`
|
|
|
+ Port string `json:"port"`
|
|
|
}
|
|
|
|
|
|
//End of utilities functions
|
|
|
+//System constants
|
|
|
+const clusterServices = "../cluster/"
|
|
|
+const delChunkScript = "SystemAOB/functions/arozdfs/delChunk.php?chunkuuid="
|
|
|
+const requestScript = "SystemAOB/functions/arozdfs/request.php?chunkuuid="
|
|
|
+const uploadScript = "SystemAOB/functions/arozdfs/upload.php"
|
|
|
+const clusterSetting = "clusterSetting.config"
|
|
|
+
|
|
|
+var config clusterConfig
|
|
|
|
|
|
func init() {
|
|
|
//Check if the required directory exists. If not, create it.
|
|
@@ -153,6 +161,11 @@ func init() {
|
|
|
if !file_exists("remoteDisks.config") {
|
|
|
file_put_contents("remoteDisks.config", "")
|
|
|
}
|
|
|
+
|
|
|
+ //Load config from clusterSetting.config
|
|
|
+ jsonFile, _ := os.Open(clusterSetting)
|
|
|
+ byteValue, _ := ioutil.ReadAll(jsonFile)
|
|
|
+ json.Unmarshal(byteValue, &config)
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
@@ -176,8 +189,10 @@ func main() {
|
|
|
startDownloadProc()
|
|
|
case "open":
|
|
|
openChunkedFile()
|
|
|
+ case "remove":
|
|
|
+ removeFile()
|
|
|
case "debug":
|
|
|
- fmt.Println(padZeros("1", 32)) //Debug function. Change this line for unit testing
|
|
|
+ fmt.Println(config.Port + "/" + config.Prefix) //Debug function. Change this line for unit testing
|
|
|
default:
|
|
|
showNotFound()
|
|
|
}
|
|
@@ -221,11 +236,7 @@ func startDownloadProc() {
|
|
|
datachunks := explode("\n", fc)
|
|
|
var filelist []string
|
|
|
var locations []string
|
|
|
- //Load config from clusterSetting.config
|
|
|
- jsonFile, _ := os.Open("clusterSetting.config")
|
|
|
- byteValue, _ := ioutil.ReadAll(jsonFile)
|
|
|
- var config clusterConfig
|
|
|
- json.Unmarshal(byteValue, &config)
|
|
|
+
|
|
|
for i := 0; i < len(datachunks); i++ {
|
|
|
tmp := explode(",", datachunks[i])
|
|
|
filechunk := tmp[0]
|
|
@@ -236,14 +247,14 @@ func startDownloadProc() {
|
|
|
fullip := "http://" + thisip + clusterConfig
|
|
|
locations = append(locations, fullip)
|
|
|
}
|
|
|
- fmt.Println(filelist)
|
|
|
- fmt.Println(locations)
|
|
|
+ //fmt.Println(filelist)
|
|
|
+ //fmt.Println(locations)
|
|
|
|
|
|
//Start the request process
|
|
|
for j := 0; j < len(filelist); j++ {
|
|
|
//Multithreading download for each fileitem
|
|
|
filename := filelist[j]
|
|
|
- targetURL := locations[j] + "SystemAOB/functions/arozdfs/request.php?chunkuuid=" + string(filename)
|
|
|
+ targetURL := locations[j] + requestScript + string(filename)
|
|
|
go downloadFileChunkWithOutput(storepath+filename, targetURL, filename)
|
|
|
}
|
|
|
|
|
@@ -321,9 +332,9 @@ func startSlicingProc() {
|
|
|
os.Exit(0)
|
|
|
}
|
|
|
if storepath != "" && infile != "" {
|
|
|
- fmt.Println(storepath + " " + infile + " " + strconv.Itoa(slice) + " " + fileUUID)
|
|
|
+ //fmt.Println(storepath + " " + infile + " " + strconv.Itoa(slice) + " " + fileUUID)
|
|
|
splitFileChunks(infile, "chunks/"+storepath, fileUUID, slice)
|
|
|
- fmt.Println(fileUUID)
|
|
|
+ //fmt.Println(fileUUID)
|
|
|
} else {
|
|
|
fmt.Println("ERROR. Undefined storepath or infile.")
|
|
|
}
|
|
@@ -351,7 +362,7 @@ func splitFileChunks(rawfile string, outputdir string, outfilename string, chunk
|
|
|
// calculate total number of parts the file will be chunked into
|
|
|
|
|
|
totalPartsNum := uint64(math.Ceil(float64(fileSize) / float64(fileChunk)))
|
|
|
- fmt.Printf("Splitting to %d pieces.\n", totalPartsNum)
|
|
|
+ fmt.Printf("[Info] Splitting to %d pieces.\n", totalPartsNum)
|
|
|
for i := uint64(0); i < totalPartsNum; i++ {
|
|
|
partSize := int(math.Min(fileChunk, float64(fileSize-int64(i*uint64(fileChunk)))))
|
|
|
partBuffer := make([]byte, partSize)
|
|
@@ -364,7 +375,7 @@ func splitFileChunks(rawfile string, outputdir string, outfilename string, chunk
|
|
|
}
|
|
|
// write/save buffer to disk
|
|
|
ioutil.WriteFile(fileName, partBuffer, os.ModeAppend)
|
|
|
- fmt.Println("Split to : ", fileName)
|
|
|
+ fmt.Println("[Export] ", fileName)
|
|
|
}
|
|
|
return true
|
|
|
}
|
|
@@ -394,7 +405,7 @@ func openChunkedFile() {
|
|
|
}
|
|
|
}
|
|
|
if storepath != "" && uuid != "" && outfile != "" {
|
|
|
- fmt.Println(storepath + " " + uuid + " " + outfile)
|
|
|
+ //fmt.Println(storepath + " " + uuid + " " + outfile)
|
|
|
if joinFileChunks(storepath+uuid, outfile) {
|
|
|
//Do checksum here
|
|
|
|
|
@@ -433,7 +444,7 @@ func joinFileChunks(fileuuid string, outfilename string) bool {
|
|
|
|
|
|
func startUploadProc() {
|
|
|
push := "remoteDisks.config"
|
|
|
- storepath := "tmp/"
|
|
|
+ storepath := "chunks/"
|
|
|
uuid := ""
|
|
|
vdir := ""
|
|
|
for i, arg := range os.Args {
|
|
@@ -445,7 +456,7 @@ func startUploadProc() {
|
|
|
storepath = os.Args[i+1]
|
|
|
//Check if the storepath is end with /. if not, append it into the pathname
|
|
|
if storepath[len(storepath)-1:] != "/" {
|
|
|
- storepath = storepath + "/"
|
|
|
+ storepath = "chunks/" + storepath + "/"
|
|
|
}
|
|
|
} else if arg == "-vdir" {
|
|
|
vdir = os.Args[i+1]
|
|
@@ -460,7 +471,7 @@ func startUploadProc() {
|
|
|
fmt.Println("ERROR. Undefined uuid or vdir.")
|
|
|
os.Exit(0)
|
|
|
}
|
|
|
- if !file_exists("clusterSetting.config") {
|
|
|
+ if !file_exists(clusterSetting) {
|
|
|
fmt.Println("ERROR. clusterSetting configuration not found")
|
|
|
os.Exit(0)
|
|
|
}
|
|
@@ -472,12 +483,8 @@ func startUploadProc() {
|
|
|
//Starting the uuid to ip conversion process
|
|
|
|
|
|
var ipList []string
|
|
|
- //Read cluster setting from clusterSetting.config
|
|
|
- jsonFile, _ := os.Open("clusterSetting.config")
|
|
|
- byteValue, _ := ioutil.ReadAll(jsonFile)
|
|
|
- var config clusterConfig
|
|
|
var uuiddata []string
|
|
|
- json.Unmarshal(byteValue, &config)
|
|
|
+ var uploadUUIDList []string
|
|
|
//Read cluster uuid list from remoteDisks.config
|
|
|
if file_exists(push) {
|
|
|
clusteruuids := file_get_contents(push)
|
|
@@ -489,7 +496,9 @@ func startUploadProc() {
|
|
|
uuiddata = explode("\n", clusteruuids)
|
|
|
//Generate iplist and ready for posting file chunks
|
|
|
for i := 0; i < len(uuiddata); i++ {
|
|
|
- thisip := resolveUUID(uuiddata[i])
|
|
|
+ thisuuid := uuiddata[i%len(uuiddata)]
|
|
|
+ uploadUUIDList = append(uploadUUIDList, thisuuid)
|
|
|
+ thisip := resolveUUID(thisuuid)
|
|
|
clusterConfig := ":" + string(config.Port) + "/" + string(config.Prefix) + "/"
|
|
|
fullip := "http://" + thisip + clusterConfig
|
|
|
ipList = append(ipList, fullip)
|
|
@@ -498,7 +507,6 @@ func startUploadProc() {
|
|
|
fmt.Println("ERROR. remoteDisks not found or it is empty! ")
|
|
|
os.Exit(0)
|
|
|
}
|
|
|
- fmt.Println(ipList)
|
|
|
|
|
|
//Handshake with clusters, create auth token if needed
|
|
|
if !createToken(ipList) {
|
|
@@ -509,32 +517,40 @@ func startUploadProc() {
|
|
|
//Ready to push. Create index file.
|
|
|
file_put_contents("index/"+vdir+string(".index"), "")
|
|
|
fileList, _ := filepath.Glob(storepath + uuid + "_*")
|
|
|
- var pushResultTarget []string
|
|
|
- var pushResultFilename []string
|
|
|
- var failed []string
|
|
|
- var failedTarget []string
|
|
|
+ //Make a directory for storing the result of the upload
|
|
|
+ if !file_exists(storepath + ".upload/") {
|
|
|
+ mkdir(storepath + ".upload/")
|
|
|
+ }
|
|
|
for i := 0; i < len(fileList); i++ {
|
|
|
uploadIP := (ipList[i%len(ipList)])
|
|
|
- r := pushFileChunk(uuid, uploadIP, fileList[i])
|
|
|
- if trim(r) == "DONE" {
|
|
|
- //This upload process is doing fine. Append to the result list
|
|
|
- pushResultTarget = append(pushResultTarget, uuiddata[i%len(ipList)])
|
|
|
- pushResultFilename = append(pushResultFilename, filepath.Base(fileList[i]))
|
|
|
- fmt.Println("[OK] " + fileList[i] + " uploaded.")
|
|
|
- } else {
|
|
|
- failed = append(failed, fileList[i])
|
|
|
- failedTarget = append(failedTarget, uuid)
|
|
|
- }
|
|
|
+ uploadUUID := (uploadUUIDList[i%len(ipList)])
|
|
|
+ go SendPostRequest(uploadIP+uploadScript, fileList[i], "file", storepath+".upload/"+basename(fileList[i])+".done", uploadUUID)
|
|
|
+ }
|
|
|
+ //Retry for error chunks. Not implemented yet
|
|
|
+
|
|
|
+ //Wait for all upload process to end
|
|
|
+ uploadFinishIndicators, _ := filepath.Glob(storepath + ".upload/" + uuid + "_*.done")
|
|
|
+ for len(uploadFinishIndicators) < len(fileList) {
|
|
|
+ time.Sleep(time.Duration(500) * time.Millisecond)
|
|
|
+ uploadFinishIndicators, _ = filepath.Glob(storepath + ".upload/" + uuid + "_*.done")
|
|
|
}
|
|
|
|
|
|
- for j := 0; j < len(pushResultTarget); j++ {
|
|
|
+ //Write the upload results to index file
|
|
|
+ for j := 0; j < len(uploadFinishIndicators); j++ {
|
|
|
f, _ := os.OpenFile("index/"+vdir+string(".index"), os.O_APPEND|os.O_WRONLY, 0600)
|
|
|
defer f.Close()
|
|
|
- f.WriteString(pushResultFilename[j])
|
|
|
+ f.WriteString(str_replace(".done", "", basename(uploadFinishIndicators[j])))
|
|
|
f.WriteString(",")
|
|
|
- f.WriteString(pushResultTarget[j])
|
|
|
+ f.WriteString(file_get_contents(uploadFinishIndicators[j]))
|
|
|
f.WriteString("\n")
|
|
|
}
|
|
|
+
|
|
|
+ //Clear up all indicators
|
|
|
+ for k := 0; k < len(uploadFinishIndicators); k++ {
|
|
|
+ os.Remove(uploadFinishIndicators[k])
|
|
|
+ }
|
|
|
+ os.Remove(storepath + ".upload/")
|
|
|
+
|
|
|
fmt.Println("[OK] All chunks uploaded.")
|
|
|
}
|
|
|
|
|
@@ -543,18 +559,13 @@ func createToken(ipList []string) bool {
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
-func pushFileChunk(uuid string, targetEndpoint string, filename string) string {
|
|
|
- response := string(SendPostRequest(targetEndpoint+"SystemAOB/functions/arozdfs/upload.php", filename, "file"))
|
|
|
- return response
|
|
|
-}
|
|
|
-
|
|
|
func resolveUUID(uuid string) string {
|
|
|
tmp := []byte(uuid)
|
|
|
uuid = string(bytes.Trim(tmp, "\xef\xbb\xbf"))
|
|
|
uuid = strings.Trim(strings.Trim(uuid, "\n"), "\r")
|
|
|
- if file_exists("../cluster/mappers/") {
|
|
|
- if file_exists("../cluster/mappers/" + uuid + ".inf") {
|
|
|
- return file_get_contents("../cluster/mappers/" + uuid + ".inf")
|
|
|
+ if file_exists(clusterServices + "mappers/") {
|
|
|
+ if file_exists(clusterServices + "/mappers/" + uuid + ".inf") {
|
|
|
+ return file_get_contents(clusterServices + "/mappers/" + uuid + ".inf")
|
|
|
} else {
|
|
|
fmt.Println("ERROR. UUID not found. Please perform a scan first before using arozdfs functions")
|
|
|
}
|
|
@@ -565,7 +576,7 @@ func resolveUUID(uuid string) string {
|
|
|
return uuid
|
|
|
}
|
|
|
|
|
|
-func SendPostRequest(url string, filename string, fieldname string) []byte {
|
|
|
+func SendPostRequest(url string, filename string, fieldname string, resultName string, targetUUID string) []byte {
|
|
|
file, err := os.Open(filename)
|
|
|
|
|
|
if err != nil {
|
|
@@ -605,9 +616,73 @@ func SendPostRequest(url string, filename string, fieldname string) []byte {
|
|
|
panic(err)
|
|
|
}
|
|
|
|
|
|
+ //Upload suceed. Create a .done file to indicate this file is done uploading
|
|
|
+ file_put_contents(resultName, string(targetUUID))
|
|
|
+ fmt.Println("[OK] " + str_replace(".done", "", basename(resultName)) + " uploaded.")
|
|
|
return content
|
|
|
}
|
|
|
|
|
|
+func removeFile() {
|
|
|
+ fileindex := ""
|
|
|
+ if len(os.Args) == 3 {
|
|
|
+ fileindex = os.Args[2]
|
|
|
+ }
|
|
|
+ if fileindex == "" {
|
|
|
+ fmt.Println("ERROR. undefined file index. Usage: arozdfs file.ext (Root as ./index)")
|
|
|
+ os.Exit(0)
|
|
|
+ }
|
|
|
+
|
|
|
+ indexFileRealpath := "index/" + fileindex + ".index"
|
|
|
+ if !file_exists(indexFileRealpath) {
|
|
|
+ fmt.Println("ERROR. fileindex not found in " + indexFileRealpath)
|
|
|
+ os.Exit(0)
|
|
|
+ }
|
|
|
+
|
|
|
+ //Everything checked and go ahead to load the list into variables
|
|
|
+ var filelist []string
|
|
|
+ var targetUUIDs []string
|
|
|
+ fc := strings.Trim(str_replace("\r\n", "\n", file_get_contents(indexFileRealpath)), "\n")
|
|
|
+ datachunks := explode("\n", fc)
|
|
|
+ for i := 0; i < len(datachunks); i++ {
|
|
|
+ thisChunk := datachunks[i]
|
|
|
+ thisChunk = strings.Trim(strings.Trim(thisChunk, "\n"), "\r")
|
|
|
+ chunkdata := explode(",", thisChunk)
|
|
|
+ filelist = append(filelist, chunkdata[0])
|
|
|
+ targetUUIDs = append(targetUUIDs, "http://"+resolveUUID(chunkdata[1])+":"+config.Port+"/"+config.Prefix+"/")
|
|
|
+ }
|
|
|
+
|
|
|
+ //fmt.Println(filelist)
|
|
|
+ //fmt.Println(targetUUIDs)
|
|
|
+
|
|
|
+ //Remove the chunks on each endpoints
|
|
|
+ failed := len(filelist)
|
|
|
+ var failedChunk []string
|
|
|
+ for j := 0; j < len(filelist); j++ {
|
|
|
+ targetEndpoint := targetUUIDs[j] + delChunkScript + filelist[j]
|
|
|
+ resp, err := http.Get(targetEndpoint)
|
|
|
+ if err != nil {
|
|
|
+ // handle error
|
|
|
+ fmt.Println("ERROR. Unable to connect to endpoint: " + targetEndpoint + ". Continue with the rest of the endpoints.")
|
|
|
+ }
|
|
|
+ body, _ := ioutil.ReadAll(resp.Body)
|
|
|
+ fmt.Println("[REPLY] " + string(body) + " for " + filelist[j])
|
|
|
+ if trim(string(body)) == "DONE" {
|
|
|
+ failed--
|
|
|
+ } else {
|
|
|
+ failedChunk = append(failedChunk, filelist[j])
|
|
|
+ }
|
|
|
+ resp.Body.Close()
|
|
|
+ }
|
|
|
+ if failed == 0 {
|
|
|
+ fmt.Println("[OK] All file chunks has been removed from the clusters")
|
|
|
+ os.Remove(indexFileRealpath)
|
|
|
+ } else {
|
|
|
+ fmt.Println("[WARNING] Unable to remove at least one chunks from cluster. Index file is not removed.")
|
|
|
+ fmt.Println(failedChunk)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
func showHelp() {
|
|
|
fmt.Println(`[arozdfs - Distributed File Storage Management Tool for ArOZ Online Cloud System]
|
|
|
This is a command line tool build for the ArOZ Online distributed cloud platform file chunking and redundant data storage.
|
|
@@ -623,7 +698,7 @@ func showHelp() {
|
|
|
-storepath <pathname> (Optional) --> Relative path for storing the sliced chunk files, default ./{file-uuid}
|
|
|
|
|
|
upload
|
|
|
- -storepath <pathname> --> The location where the file chunks are stored
|
|
|
+ -storepath <pathname> --> The location where the file chunks are stored, root start at ./chunks, not recommend for leaving this empty
|
|
|
-uuid <file uuid> --> uuid of the file to be uploaded
|
|
|
-vdir <file.index> --> where the file.index should be stored. (Use for file / folder navigation)
|
|
|
-push <remoteDisks.config> (Optional) --> push to a list of clusters and sync file index to other clusters, default ./remoteDisks.config
|