package main import ( "bytes" "encoding/json" "fmt" "io" "io/ioutil" "math" "mime/multipart" "net/http" "os" "path" "path/filepath" "strconv" "strings" "time" "github.com/FossoresLP/go-uuid-v4" ) func check(e error) { if e != nil { panic(e) } } //ArOZ PHP-Golang Bridge //The following sections remap the PHP functions to golang for the ease of development func file_exists(filepath string) bool { if _, err := os.Stat(filepath); !os.IsNotExist(err) { return true } return false } func mkdir(filepath string) { os.MkdirAll(filepath, os.ModePerm) } func file_put_contents(file string, data string) bool { f, err := os.Create(file) check(err) _, err = f.WriteString(data) defer f.Close() if err != nil { return false } return true } func file_get_contents(file string) string { b, err := ioutil.ReadFile(file) check(err) return string(b) } func strtolower(word string) string { return strings.ToLower(word) } func strtoupper(word string) string { return strings.ToUpper(word) } func trim(word string) string { return strings.Trim(word, " ") } func strlen(word string) int { return len(word) } func count(array []string) int { return len(array) } func explode(key string, word string) []string { return strings.Split(word, key) } func implode(key string, array []string) string { return strings.Join(array[:], key) } func str_replace(target string, result string, word string) string { return strings.Replace(word, target, result, -1) } func in_array(a string, list []string) bool { for _, b := range list { if b == a { return true } } return false } func strpos(word string, target string) int { return strings.Index(word, target) } func dirname(filepath string) string { return path.Dir(filepath) } func basename(fullpath string) string { return filepath.Base(fullpath) } //End of mapping functions //Utilities functions func genUUIDv4() string { uuid, err := uuid.NewString() check(err) return uuid } func padZeros(thisInt string, maxval int) string { targetLength := len(strconv.Itoa(maxval)) result := thisInt if len(thisInt) < targetLength { padzeros := targetLength - len(thisInt) for i := 0; i < padzeros; i++ { result = "0" + result } } return result } type clusterConfig struct { Prefix string `json:"prefix"` Port string `json:"port"` } //End of utilities functions //System constants const clusterServices = "../cluster/" const delChunkScript = "SystemAOB/functions/arozdfs/delChunk.php?chunkuuid=" const requestScript = "SystemAOB/functions/arozdfs/request.php?chunkuuid=" const uploadScript = "SystemAOB/functions/arozdfs/upload.php" const clusterSetting = "clusterSetting.config" var config clusterConfig func init() { //Check if the required directory exists. If not, create it. if !file_exists("chunks/") { mkdir("chunks/") } if !file_exists("uploads/") { mkdir("uploads/") } if !file_exists("index/") { mkdir("index/") } if !file_exists("tmp/") { mkdir("tmp/") } if !file_exists("remoteDisks.config") { file_put_contents("remoteDisks.config", "") } //Load config from clusterSetting.config jsonFile, _ := os.Open(clusterSetting) byteValue, _ := ioutil.ReadAll(jsonFile) json.Unmarshal(byteValue, &config) } func main() { //arozdfs implementation in Golang //Refer to the help section for the usable commands and parameters if len(os.Args) == 1 { fmt.Println("ERROR. Undefined function group or operations. Type 'arozdfs help' for usage instructions. ") return } //For each argument, start the processing switch functgroup := os.Args[1]; functgroup { case "help": showHelp() case "slice": startSlicingProc() case "upload": startUploadProc() case "download": startDownloadProc() case "open": openChunkedFile() case "remove": removeFile() case "debug": fmt.Println(config.Port + "/" + config.Prefix) //Debug function. Change this line for unit testing default: showNotFound() } /* //Examples for using the Go-PHP bridge functions file_put_contents("Hello World.txt", "This is the content of the file.") fmt.Println(file_get_contents("Hello World.txt")) array := explode(",", "Apple,Orange,Pizza") fmt.Println(array) newstring := implode(",", array) fmt.Println(newstring) fmt.Println(in_array("Pizza", array)) fmt.Println(strpos(newstring, "Pizza")) fmt.Println(strtoupper("hello world")) fmt.Println(str_replace("Pizza", "Ramen", newstring)) */ } func startDownloadProc() { vdir := "" storepath := "tmp/" for i, arg := range os.Args { if strpos(arg, "-") == 0 { //This is a parameter defining keyword if arg == "-vdir" { vdir = os.Args[i+1] } else if arg == "-storepath" { storepath = os.Args[i+1] //Check if the storepath is end with /. if not, append it into the pathname if storepath[len(storepath)-1:] != "/" { storepath = storepath + "/" } } } } if vdir != "" { //Go ahead the download process and get the content of the file fc := strings.Trim(str_replace("\r\n", "\n", file_get_contents("index/"+vdir+".index")), "\n") datachunks := explode("\n", fc) var filelist []string var locations []string for i := 0; i < len(datachunks); i++ { tmp := explode(",", datachunks[i]) filechunk := tmp[0] locationUUID := tmp[1] filelist = append(filelist, filechunk) thisip := resolveUUID(locationUUID) clusterConfig := ":" + string(config.Port) + "/" + string(config.Prefix) + "/" fullip := "http://" + thisip + clusterConfig locations = append(locations, fullip) } //fmt.Println(filelist) //fmt.Println(locations) //Start the request process for j := 0; j < len(filelist); j++ { //Multithreading download for each fileitem filename := filelist[j] targetURL := locations[j] + requestScript + string(filename) go downloadFileChunkWithOutput(storepath+filename, targetURL, filename) } fileUUID := explode("_", filelist[0])[0] //Getting the first part of a file with uuid, e.g. {uuid}_0 --> get only the {uuid} part //Wait for the go routine to finish downloadFinishIndicators, _ := filepath.Glob(storepath + fileUUID + "_*.done") for len(downloadFinishIndicators) < len(filelist) { time.Sleep(time.Duration(500) * time.Millisecond) downloadFinishIndicators, _ = filepath.Glob(storepath + fileUUID + "_*.done") } //Clear up all indicators for k := 0; k < len(downloadFinishIndicators); k++ { os.Remove(downloadFinishIndicators[k]) } fmt.Println("[OK] All chunks downloaded") } else { fmt.Println("ERROR. vdir cannot be empty") os.Exit(0) } } func downloadFileChunkWithOutput(filepath string, url string, filename string) { if DownloadFile(filepath, url) { fmt.Println("[OK] " + filename) file_put_contents(filepath+".done", "") } } func DownloadFile(filepath string, url string) bool { // Get the data resp, err := http.Get(url) if err != nil { return false } defer resp.Body.Close() // Create the file out, err := os.Create(filepath) if err != nil { return false } defer out.Close() // Write the body to file _, err = io.Copy(out, resp.Body) return true } func startSlicingProc() { infile := "" slice := 64 //Default 64MB per file chunk fileUUID := genUUIDv4() storepath := fileUUID + "/" for i, arg := range os.Args { if strpos(arg, "-") == 0 { //This is a parameter defining keyword if arg == "-infile" { infile = os.Args[i+1] } else if arg == "-storepath" { storepath = os.Args[i+1] //Check if the storepath is end with /. if not, append it into the pathname if storepath[len(storepath)-1:] != "/" { storepath = storepath + "/" } } else if arg == "-slice" { sliceSize, err := strconv.Atoi(os.Args[i+1]) check(err) slice = sliceSize } } } if slice <= 0 { fmt.Println("ERROR. slice size cannot be smaller or equal to 0") os.Exit(0) } if storepath != "" && infile != "" { //fmt.Println(storepath + " " + infile + " " + strconv.Itoa(slice) + " " + fileUUID) splitFileChunks(infile, "chunks/"+storepath, fileUUID, slice) fmt.Println(fileUUID) } else { fmt.Println("ERROR. Undefined storepath or infile.") } } func splitFileChunks(rawfile string, outputdir string, outfilename string, chunksize int) bool { if !file_exists(outputdir) { mkdir(outputdir) } fileToBeChunked := rawfile file, err := os.Open(fileToBeChunked) if err != nil { return false } defer file.Close() fileInfo, _ := file.Stat() var fileSize int64 = fileInfo.Size() var fileChunk = float64(chunksize * 1024 * 1024) // chunksize in MB // calculate total number of parts the file will be chunked into totalPartsNum := uint64(math.Ceil(float64(fileSize) / float64(fileChunk))) //fmt.Printf("[Info] Splitting to %d pieces.\n", totalPartsNum) for i := uint64(0); i < totalPartsNum; i++ { partSize := int(math.Min(fileChunk, float64(fileSize-int64(i*uint64(fileChunk))))) partBuffer := make([]byte, partSize) file.Read(partBuffer) // write to disk fileName := outputdir + outfilename + "_" + padZeros(strconv.FormatUint(i, 10), int(totalPartsNum)) _, err := os.Create(fileName) if err != nil { return false } // write/save buffer to disk ioutil.WriteFile(fileName, partBuffer, os.ModeAppend) //fmt.Println("[Export] ", fileName) } return true } func openChunkedFile() { storepath := "tmp/" uuid := "" outfile := "" removeAfterMerge := 0 for i, arg := range os.Args { if strpos(arg, "-") == 0 { //This is a parameter defining keyword if arg == "-uuid" { uuid = os.Args[i+1] } else if arg == "-storepath" { storepath = os.Args[i+1] //Check if the storepath is end with /. if not, append it into the pathname if storepath[len(storepath)-1:] != "/" { storepath = storepath + "/" } } else if arg == "-outfile" { outfile = os.Args[i+1] } else if arg == "-c" { //Remove the file chunks after the merging process removeAfterMerge = 1 } } } if storepath != "" && uuid != "" && outfile != "" { //fmt.Println(storepath + " " + uuid + " " + outfile) if joinFileChunks(storepath+uuid, outfile) { //Do checksum here //Remove all files if -c is used if removeAfterMerge == 1 { matches, _ := filepath.Glob(storepath + uuid + "_*") for j := 0; j < len(matches); j++ { os.Remove(matches[j]) } } } else { fmt.Println("ERROR. Unable to merge file chunks.") } } else { fmt.Println("ERROR. Undefined storepath, outfile or uuid.") } } func joinFileChunks(fileuuid string, outfilename string) bool { matches, _ := filepath.Glob(fileuuid + "_*") if len(matches) == 0 { fmt.Println("ERROR. No filechunk file for this uuid.") return false } outfile, err := os.Create(outfilename) if err != nil { return false } //For each file chunk, merge them into the output file for j := 0; j < len(matches); j++ { b, _ := ioutil.ReadFile(matches[j]) outfile.Write(b) } return true } func startUploadProc() { push := "remoteDisks.config" storepath := "chunks/" uuid := "" vdir := "" for i, arg := range os.Args { if strpos(arg, "-") == 0 { //This is a parameter defining keyword if arg == "-uuid" { uuid = os.Args[i+1] } else if arg == "-storepath" { storepath = os.Args[i+1] //Check if the storepath is end with /. if not, append it into the pathname if storepath[len(storepath)-1:] != "/" { storepath = "chunks/" + storepath + "/" } } else if arg == "-vdir" { vdir = os.Args[i+1] } else if arg == "-push" { //Remove the file chunks after the merging process push = os.Args[i+1] } } } //Check if the input data are valid if uuid == "" || vdir == "" { fmt.Println("ERROR. Undefined uuid or vdir.") os.Exit(0) } if !file_exists(clusterSetting) { fmt.Println("ERROR. clusterSetting configuration not found") os.Exit(0) } if file_exists("index/" + vdir + string(".index")) { fmt.Println("ERROR. Given file already exists in vdir. Please use remove before uploading a new file on the same vdir location.") os.Exit(0) } //Starting the uuid to ip conversion process var ipList []string var uuiddata []string var uploadUUIDList []string //Read cluster uuid list from remoteDisks.config if file_exists(push) { clusteruuids := file_get_contents(push) if trim(clusteruuids) == "" { fmt.Println("ERROR. remoteDisks not found or it is empty! ") return } clusteruuids = trim(strings.Trim(clusteruuids, "\n")) uuiddata = explode("\n", clusteruuids) //Generate iplist and ready for posting file chunks for i := 0; i < len(uuiddata); i++ { thisuuid := uuiddata[i%len(uuiddata)] uploadUUIDList = append(uploadUUIDList, thisuuid) thisip := resolveUUID(thisuuid) clusterConfig := ":" + string(config.Port) + "/" + string(config.Prefix) + "/" fullip := "http://" + thisip + clusterConfig ipList = append(ipList, fullip) } } else { fmt.Println("ERROR. remoteDisks not found or it is empty! ") return } //Handshake with clusters, create auth token if needed if !createToken(ipList) { fmt.Println("ERROR. Problem occured while trying to create token for one of the cluster's host. Upload process terminated.") return } //Ready to push. Create index file. file_put_contents("index/"+vdir+string(".index"), "") fileList, _ := filepath.Glob(storepath + uuid + "_*") //Make a directory for storing the result of the upload if !file_exists(storepath + ".upload/") { mkdir(storepath + ".upload/") } for i := 0; i < len(fileList); i++ { uploadIP := (ipList[i%len(ipList)]) uploadUUID := (uploadUUIDList[i%len(ipList)]) go SendPostRequest(uploadIP+uploadScript, fileList[i], "file", storepath+".upload/"+basename(fileList[i])+".done", uploadUUID) } //Retry for error chunks. Not implemented yet //Wait for all upload process to end uploadFinishIndicators, _ := filepath.Glob(storepath + ".upload/" + uuid + "_*.done") for len(uploadFinishIndicators) < len(fileList) { time.Sleep(time.Duration(500) * time.Millisecond) uploadFinishIndicators, _ = filepath.Glob(storepath + ".upload/" + uuid + "_*.done") } //Write the upload results to index file f, _ := os.OpenFile("index/"+vdir+string(".index"), os.O_APPEND|os.O_WRONLY, 0600) for j := 0; j < len(uploadFinishIndicators); j++ { f.WriteString(str_replace(".done", "", basename(uploadFinishIndicators[j]))) f.WriteString(",") f.WriteString(file_get_contents(uploadFinishIndicators[j])) f.WriteString("\n") } f.Close() //Clear up all indicators for k := 0; k < len(uploadFinishIndicators); k++ { os.Remove(uploadFinishIndicators[k]) } os.Remove(storepath + ".upload/") fmt.Println("[OK] All chunks uploaded.") } func createToken(ipList []string) bool { //Not implemented return true } func resolveUUID(uuid string) string { tmp := []byte(uuid) uuid = string(bytes.Trim(tmp, "\xef\xbb\xbf")) uuid = strings.Trim(strings.Trim(uuid, "\n"), "\r") if file_exists(clusterServices + "mappers/") { if file_exists(clusterServices + "/mappers/" + uuid + ".inf") { return file_get_contents(clusterServices + "/mappers/" + uuid + ".inf") } else { fmt.Println("ERROR. UUID not found. Please perform a scan first before using arozdfs functions") } } else { fmt.Println("ERROR. Unable to resolve UUID to IP: cluster services not found. Continuing with UUID as IP address.") } return uuid } func SendPostRequest(url string, filename string, fieldname string, resultName string, targetUUID string) []byte { file, err := os.Open(filename) if err != nil { panic(err) } defer file.Close() body := &bytes.Buffer{} writer := multipart.NewWriter(body) part, err := writer.CreateFormFile(fieldname, filepath.Base(file.Name())) if err != nil { panic(err) } io.Copy(part, file) writer.Close() request, err := http.NewRequest("POST", url, body) if err != nil { panic(err) } request.Header.Add("Content-Type", writer.FormDataContentType()) client := &http.Client{} response, err := client.Do(request) if err != nil { panic(err) } defer response.Body.Close() content, err := ioutil.ReadAll(response.Body) if err != nil { panic(err) } //Upload succeed. Create a .done file to indicate this file is done uploading file_put_contents(resultName, string(targetUUID)) fmt.Println("[OK] " + str_replace(".done", "", basename(resultName)) + " uploaded.") return content } func removeFile() { fileindex := "" if len(os.Args) == 3 { fileindex = os.Args[2] } if fileindex == "" { fmt.Println("ERROR. undefined file index. Usage: arozdfs file.ext (Root as ./index)") os.Exit(0) } indexFileRealpath := "index/" + fileindex + ".index" if !file_exists(indexFileRealpath) { fmt.Println("ERROR. fileindex not found in " + indexFileRealpath) os.Exit(0) } //Everything checked and go ahead to load the list into variables var filelist []string var targetUUIDs []string fc := strings.Trim(str_replace("\r\n", "\n", file_get_contents(indexFileRealpath)), "\n") datachunks := explode("\n", fc) for i := 0; i < len(datachunks); i++ { thisChunk := datachunks[i] thisChunk = strings.Trim(strings.Trim(thisChunk, "\n"), "\r") chunkdata := explode(",", thisChunk) filelist = append(filelist, chunkdata[0]) targetUUIDs = append(targetUUIDs, "http://"+resolveUUID(chunkdata[1])+":"+config.Port+"/"+config.Prefix+"/") } //fmt.Println(filelist) //fmt.Println(targetUUIDs) //Remove the chunks on each endpoints failed := len(filelist) var failedChunk []string for j := 0; j < len(filelist); j++ { targetEndpoint := targetUUIDs[j] + delChunkScript + filelist[j] resp, err := http.Get(targetEndpoint) if err != nil { // handle error fmt.Println("ERROR. Unable to connect to endpoint: " + targetEndpoint + ". Continue with the rest of the endpoints.") } body, _ := ioutil.ReadAll(resp.Body) fmt.Println("[REPLY] " + string(body) + " for " + filelist[j]) if trim(string(body)) == "DONE" { failed-- } else { failedChunk = append(failedChunk, filelist[j]) } resp.Body.Close() } if failed == 0 { fmt.Println("[OK] All file chunks has been removed from the clusters") os.Remove(indexFileRealpath) } else { fmt.Println("[WARNING] Unable to remove at least one chunks from cluster. Index file is not removed.") fmt.Println(failedChunk) } } func showHelp() { fmt.Println(`[arozdfs - Distributed File Storage Management Tool for ArOZ Online Cloud System] This is a command line tool build for the ArOZ Online distributed cloud platform file chunking and redundant data storage. Please refer to the ArOZ Online Documentaion for more information. `) fmt.Println(`Supported commands: help --> show all the help information [Uploading to arozdfs commands] slice -infile --> declare the input file -slice --> declare the slicing filesize -storepath (Optional) --> Relative path for storing the sliced chunk files, default ./{file-uuid} upload -storepath --> The location where the file chunks are stored, root start at ./chunks, not recommend for leaving this empty -uuid --> uuid of the file to be uploaded -vdir --> where the file.index should be stored. (Use for file / folder navigation) -push (Optional) --> push to a list of clusters and sync file index to other clusters, default ./remoteDisks.config [Download from arozdfs commands] download -vdir --> file.index location -storepath (Optional) --> define a special directory for caching the downloaded data chunks, default ./tmp open -uuid --> the uuid which the file is stored -outfile --> filepath for the exported and merged file -storepath (Optional)--> the file chunks tmp folder, default ./tmp -c (Optional) --> remove all stored file chunks after merging the file chunks. [File Operations] remove --> remove all chunks related to this file index rename --> rename all records related to this file move --> move the file to a new path in index directory [System checking commands] checkfile --> check if a file contains all chunks which has at least two copies of each chunks rebuild --> Check all files on the system and fix all chunks which has corrupted migrate --> Move all chunks from this host to other servers in the list.`) } func showNotFound() { fmt.Println("ERROR. Command not found: " + os.Args[1]) }