Skip to content

Commit

Permalink
fix several bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Kaijia Chen committed Aug 21, 2021
1 parent 685cf4d commit 03e85fe
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 46 deletions.
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM rackspacedot/python38
COPY requirements.txt /tmp/
RUN pip install -r /tmp/requirements.txt
COPY . /home/distributed_obj_storage_system
WORKDIR /home/distributed_obj_storage_system
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# python-simple-object-storage

---
### Environment Installation
pip install -r requirements.txt
docker network create --subnet=172.32.0.0/16 obj

---
### Simple object storage system
python frontend.py --name 'obj01' --content 'helloworld'
cat /tmp/objs/obj01

---
### distributed object storage system

python frontend.py --type uo --name obj01 --version 1.7.3 --file /home/distributed\_obj\_storage\_system/distributed\_obj\_system/test
python frontend.py --type do --name obj01 --version 1.7.3
python frontend.py --type ol --name obj01 --version 1.7.3
python frontend.py --type vl --name obj01
18 changes: 13 additions & 5 deletions distributed_obj_system/backend/api/obj.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os, sys
sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..'))
from utils.utils import DB, splitData, combineData, getDataServers
from utils.const import KAFKA_SERVICE, KAFKA_PORT
from utils.const import KAFKA_SERVICE, KAFKA_PORT, PORT
from kafka import KafkaProducer
import random
import pickle
Expand Down Expand Up @@ -68,10 +68,18 @@ def getObj(name, version):
data = b''
for server, idx, size in locate:
try:
data += requests.get(f"http://{server}:{PORT}/partition/{hash}-{idx}", timeout=1).content
res = requests.get(f"http://{server}:{PORT}/partition/{hash}-{idx}", timeout=1)
if res.status_code == 201:
return -1
else:
data += res.content
except Exception as e:
print('request error: %s' % e, flush=True)
data += b'X'*size

return combineData(data)
data += b'x'*size
print(data, flush=True)
try:
res = combineData(data)
return res
except:
return -2

27 changes: 19 additions & 8 deletions distributed_obj_system/backend/api/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,41 @@ def getVersion(obj_name):

@app.route("/objects/size/<obj_name>", methods=['GET'])
def getSize(obj_name):
if request.method == 'GET':
version = request.args.get('version')
if version:
metadata = DB.getMetadata(name, version)
dataSize = metadata['size']
return Response({'size': dataSize}, status=200, mimetype='application/json')
version = request.args.get('version')
if version:
metadata = DB.getMetadata(obj_name, version)
dataSize = metadata['size']
return Response(json.dumps({'size': dataSize}), status=200, mimetype='application/json')

return Response(json.dumps({'size': 0}), status=400, mimetype='application/json')

@app.route("/objects/locate/<obj_name>", methods=['GET'])
def getLocate(obj_name):
version = request.args.get('version')
if version:
metadata = DB.getMetadata(obj_name, version)
return Response(json.dumps(metadata), status=200, mimetype='application/json')

return Response(json.dumps({}), status=400, mimetype='application/json')

@app.route("/objects/<obj_name>", methods=['GET', 'POST'])
def handler(obj_name):
reason = 'Unknown'
if request.method == 'GET':
version = request.args.get('version')
start = request.args.get('start')
start = start if start else 0
start = int(start) if start else 0
if version:
data = getObj(obj_name, version)
if data == -1:
return Response('Object not found', status=400)
if data == -2:
return Response('Data server is broken', status=400)
data = data[start:]
def gen():
chunk = 10
for i in range(0,len(data),chunk):
yield data[i:i+chunk]
yield bytes(data[i:i+chunk])
return Response(gen(), mimetype='text/xml')
else:
reason = 'version not found'
Expand Down
6 changes: 4 additions & 2 deletions distributed_obj_system/backend/data/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ def heatbeat():

@app.route("/partition/<comp_hash>", methods=['GET'])
def partition(comp_hash):
if not os.path.isfile(os.path.join(DIR_PATH, comp_hash)):
return Response(status=201)

t = open(os.path.join(DIR_PATH, comp_hash), 'rb')
res = t.read()
t.close()
print(str(res), flush=True)
return res
return Response(res, status=200)

def SaveComp(name, data):
f = open(os.path.join(DIR_PATH, name), 'wb')
Expand Down
12 changes: 12 additions & 0 deletions distributed_obj_system/build_containers/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# start data service
for v in 100 101 102 103 104 105 106
do
docker run -d --name "data$v" --net=obj --ip "172.32.1.$v" -v $(cd "$(dirname "$0")"; pwd)../../../:/home/distributed_obj_storage_system objsystem:latest python distributed_obj_system/backend/data/run.py
done

for v in 50
do
docker run -d --name "api$v" --net=obj --ip "172.32.1.$v" -v $(cd "$(dirname "$0")"; pwd)../../../:/home/distributed_obj_storage_system objsystem:latest python distributed_obj_system/backend/api/run.py
done

docker run -dt --name frontend --net=obj -v $(cd "$(dirname "$0")"; pwd)../../../:/home/distributed_obj_storage_system -w /home/distributed_obj_storage_system/distributed_obj_system objsystem:latest
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ do
docker rm "api$v" -f
done

docker rm frontend -f
2 changes: 2 additions & 0 deletions distributed_obj_system/build_middleware/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
docker run --name kafka --net=obj --ip 172.32.1.10 -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=172.32.1.10 --env ADVERTISED_PORT=9092 -d spotify/kafka
docker run --name redis --net=obj --ip 172.32.1.11 -d redis
2 changes: 2 additions & 0 deletions distributed_obj_system/build_middleware/stop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
docker rm kafka -f
docker rm redis -f
70 changes: 59 additions & 11 deletions distributed_obj_system/frontend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os, sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from utils.utils import sha256, compress
from utils.utils import sha256, compress, decompress
import requests
import argparse

Expand All @@ -10,11 +10,12 @@

def getArgParser():
parser = argparse.ArgumentParser(description='A frontend for object storage system.', formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--server', default='localhost', type=str, help='Server address for object storage system.')
parser.add_argument('--server', default='172.32.1.50', type=str, help='Server address for object storage system.')
parser.add_argument('--port', default=PORT, type=int, help='Server port for object storage system.')
parser.add_argument('--type', required=True, type=str, help='uo-> upload an object. \n'
'vl-> get version list by object name. \n'
'do-> download an object\n')
'do-> download an object\n'
'ol-> object locations')
parser.add_argument('--version', required=False, type=str, help='Object version')
parser.add_argument('--name', required=True, type=str, help='Object name to upload')
parser.add_argument('--content', required=False, type=str, help='Object content to upload')
Expand All @@ -26,7 +27,15 @@ def getArgParser():
def uploadObject(args):
content = args.content
name = args.name
file = args.file

if not name:
print('--name is required')
if not content and not file:
print('--name or --file is required')

if file:
content = open(file, 'r').read()
content = compress(content.encode()) # content type will be byte after compress
hash = sha256(content)
size = len(content)
Expand All @@ -42,20 +51,58 @@ def uploadObject(args):
print(i, chunk, size, res.status_code)

def downloadObject(args):
size = requests.get(f"{server_url}/objects/size/{name}").json()['size']
name = args.name
version = args.version
if not name:
print('--name is required')
return
if not version:
print('--version is required')
return
size = requests.get(f"{server_url}/objects/size/{name}?version={version}").json()['size']
print(f'Size of Object: {size} bytes')
content = ''
content = b''
start = 0
while len(content)<size:
content += requests.get(f"{server_url}/objects/{name}").text

print(f'Content get: {content}')

try:
content += requests.get(f"{server_url}/objects/{name}?version={version}&start={start}").content
except KeyboardInterrupt:
sys.exit()
except Exception as e:
start = len(content)
print('Something wrong, downloading again...', e)
try:
print(f'Get content: {decompress(content).decode()}')
except Exception as e:
print(f'Something wrong: {e}')

def getVersionList(args):
name = args.name

if not name:
print('--name is required')

res = requests.get(f"{server_url}/versions/{name}")
print(f'Version list: {res.json()}')
if res.status_code == 200:
print(f'Version list: {res.json()}')
else:
print(f'Something wrong: f{res.text}')

def getObjLocation(args):
name = args.name
version = args.version

if not name:
print('--name is required')
if not version:
print('--version is required')

res = requests.get(f"{server_url}/objects/locate/{name}?version={version}")
if res.status_code == 200:
print(f'Location list: {[locate[0] for locate in res.json()["locate"]]}')
else:
print(f'Something wrong: {res.text}')


def run():
args = getArgParser()
Expand All @@ -67,7 +114,8 @@ def run():
funcMap = {
'uo': uploadObject,
'vl': getVersionList,
'do': downloadObject
'do': downloadObject,
'ol': getObjLocation,
}

if __name__ == '__main__':
Expand Down
11 changes: 0 additions & 11 deletions distributed_obj_system/start.sh

This file was deleted.

2 changes: 1 addition & 1 deletion simple_obj_system/backend/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os, sys
sys.path.append(os.path.join(os.path.dirname(__file__), '../..'))
from utils.const import SERVER, PORT
from utils.const import PORT
from flask import Flask, request, make_response, jsonify
from markupsafe import escape

Expand Down
4 changes: 2 additions & 2 deletions simple_obj_system/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import requests
import argparse

from utils.const import SERVER, PORT
from utils.const import PORT

def getArgParser():
parser = argparse.ArgumentParser(description='A frontend for object storage system.')
parser.add_argument('--server', default=SERVER, type=str, help='Server address for object storage system.')
parser.add_argument('--server', default='localhost', type=str, help='Server address for object storage system.')
parser.add_argument('--port', default=PORT, type=int, help='Server port for object storage system.')
parser.add_argument('--name', required=True, type=str, help='Object name to upload')
parser.add_argument('--content', required=True, type=str, help='Object content to upload')
Expand Down
15 changes: 9 additions & 6 deletions utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import datetime
from .const import REDIS_SERVER, REDIS_PORT
from reedsolo import RSCodec, ReedSolomonError
rsc = RSCodec(6)
RAID = 6
rsc = RSCodec(RAID)

import socket
MyIP = socket.gethostbyname(socket.gethostname())
Expand All @@ -29,18 +29,16 @@ def __exit__(self, type, value, traceback):

def splitData(data):
res = rsc.encode(data)
print(res,flush=True)
n = len(res)//RAID
big_num = len(res)%RAID
left_start = big_num*(n+1)

return [res[i*(n+1):(i+1)*(n+1)] for i in range(big_num)] + [res[left_start+i*n:left_start+(i+1)*n] for i in range(0, RAID-big_num)]

def combineData(data):
try:
return rsc.decode(data)[0]
except Exception as e:
print('Error occurs when combine data: %s' % e, flush=True)
return None
return rsc.decode(data)[0]


def sha256(data):
return hashlib.sha256(data).hexdigest()
Expand All @@ -55,6 +53,10 @@ def sha256File(path):
def compress(data):
return zlib.compress(data)

def decompress(data):
print(data)
return zlib.decompress(data)

heartbeatMap = {}
def recordHeartbeat(url):
heartbeatMap[url] = datetime.datetime.now()
Expand All @@ -79,6 +81,7 @@ def getMetadata(cls, name, version):
for ver, data in cls.r.hgetall(name).items():
if ver.decode() == version:
return json.loads(data.decode())
return {}

@classmethod
def getVersionList(cls, name):
Expand Down

0 comments on commit 03e85fe

Please sign in to comment.