diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..807b41e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM rackspacedot/python38 +COPY requirements.txt /tmp/ +RUN pip install -r /tmp/requirements.txt +COPY . /home/distributed_obj_storage_system +WORKDIR /home/distributed_obj_storage_system diff --git a/README.md b/README.md new file mode 100644 index 0000000..c6527f1 --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# python-simple-object-storage + +--- +### Environment Installation +pip install -r requirements.txt +docker network create --subnet=172.32.0.0/16 obj + +--- +### Simple object storage system +python frontend.py --name 'obj01' --content 'helloworld' +cat /tmp/objs/obj01 + +--- +### distributed object storage system + +python frontend.py --type uo --name obj01 --version 1.7.3 --file /home/distributed\_obj\_storage\_system/distributed\_obj\_system/test +python frontend.py --type do --name obj01 --version 1.7.3 +python frontend.py --type ol --name obj01 --version 1.7.3 +python frontend.py --type vl --name obj01 diff --git a/distributed_obj_system/backend/api/obj.py b/distributed_obj_system/backend/api/obj.py index fb32f53..fd44078 100644 --- a/distributed_obj_system/backend/api/obj.py +++ b/distributed_obj_system/backend/api/obj.py @@ -1,7 +1,7 @@ import os, sys sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..')) from utils.utils import DB, splitData, combineData, getDataServers -from utils.const import KAFKA_SERVICE, KAFKA_PORT +from utils.const import KAFKA_SERVICE, KAFKA_PORT, PORT from kafka import KafkaProducer import random import pickle @@ -68,10 +68,18 @@ def getObj(name, version): data = b'' for server, idx, size in locate: try: - data += requests.get(f"http://{server}:{PORT}/partition/{hash}-{idx}", timeout=1).content + res = requests.get(f"http://{server}:{PORT}/partition/{hash}-{idx}", timeout=1) + if res.status_code == 201: + return -1 + else: + data += res.content except Exception as e: print('request error: %s' % e, flush=True) - data += b'X'*size - - return combineData(data) + data += b'x'*size + print(data, flush=True) + try: + res = combineData(data) + return res + except: + return -2 diff --git a/distributed_obj_system/backend/api/run.py b/distributed_obj_system/backend/api/run.py index 016b51a..5574151 100644 --- a/distributed_obj_system/backend/api/run.py +++ b/distributed_obj_system/backend/api/run.py @@ -22,15 +22,22 @@ def getVersion(obj_name): @app.route("/objects/size/", methods=['GET']) def getSize(obj_name): - if request.method == 'GET': - version = request.args.get('version') - if version: - metadata = DB.getMetadata(name, version) - dataSize = metadata['size'] - return Response({'size': dataSize}, status=200, mimetype='application/json') + version = request.args.get('version') + if version: + metadata = DB.getMetadata(obj_name, version) + dataSize = metadata['size'] + return Response(json.dumps({'size': dataSize}), status=200, mimetype='application/json') return Response(json.dumps({'size': 0}), status=400, mimetype='application/json') +@app.route("/objects/locate/", methods=['GET']) +def getLocate(obj_name): + version = request.args.get('version') + if version: + metadata = DB.getMetadata(obj_name, version) + return Response(json.dumps(metadata), status=200, mimetype='application/json') + + return Response(json.dumps({}), status=400, mimetype='application/json') @app.route("/objects/", methods=['GET', 'POST']) def handler(obj_name): @@ -38,14 +45,18 @@ def handler(obj_name): if request.method == 'GET': version = request.args.get('version') start = request.args.get('start') - start = start if start else 0 + start = int(start) if start else 0 if version: data = getObj(obj_name, version) + if data == -1: + return Response('Object not found', status=400) + if data == -2: + return Response('Data server is broken', status=400) data = data[start:] def gen(): chunk = 10 for i in range(0,len(data),chunk): - yield data[i:i+chunk] + yield bytes(data[i:i+chunk]) return Response(gen(), mimetype='text/xml') else: reason = 'version not found' diff --git a/distributed_obj_system/backend/data/run.py b/distributed_obj_system/backend/data/run.py index 1c4ea69..abf6537 100644 --- a/distributed_obj_system/backend/data/run.py +++ b/distributed_obj_system/backend/data/run.py @@ -19,11 +19,13 @@ def heatbeat(): @app.route("/partition/", methods=['GET']) def partition(comp_hash): + if not os.path.isfile(os.path.join(DIR_PATH, comp_hash)): + return Response(status=201) + t = open(os.path.join(DIR_PATH, comp_hash), 'rb') res = t.read() t.close() - print(str(res), flush=True) - return res + return Response(res, status=200) def SaveComp(name, data): f = open(os.path.join(DIR_PATH, name), 'wb') diff --git a/distributed_obj_system/build_containers/start.sh b/distributed_obj_system/build_containers/start.sh new file mode 100755 index 0000000..5faba7b --- /dev/null +++ b/distributed_obj_system/build_containers/start.sh @@ -0,0 +1,12 @@ +# start data service +for v in 100 101 102 103 104 105 106 +do + docker run -d --name "data$v" --net=obj --ip "172.32.1.$v" -v $(cd "$(dirname "$0")"; pwd)../../../:/home/distributed_obj_storage_system objsystem:latest python distributed_obj_system/backend/data/run.py +done + +for v in 50 +do + docker run -d --name "api$v" --net=obj --ip "172.32.1.$v" -v $(cd "$(dirname "$0")"; pwd)../../../:/home/distributed_obj_storage_system objsystem:latest python distributed_obj_system/backend/api/run.py +done + +docker run -dt --name frontend --net=obj -v $(cd "$(dirname "$0")"; pwd)../../../:/home/distributed_obj_storage_system -w /home/distributed_obj_storage_system/distributed_obj_system objsystem:latest diff --git a/distributed_obj_system/stop.sh b/distributed_obj_system/build_containers/stop.sh similarity index 86% rename from distributed_obj_system/stop.sh rename to distributed_obj_system/build_containers/stop.sh index 807f95e..a3de650 100755 --- a/distributed_obj_system/stop.sh +++ b/distributed_obj_system/build_containers/stop.sh @@ -9,3 +9,4 @@ do docker rm "api$v" -f done +docker rm frontend -f diff --git a/distributed_obj_system/build_middleware/start.sh b/distributed_obj_system/build_middleware/start.sh new file mode 100755 index 0000000..975c145 --- /dev/null +++ b/distributed_obj_system/build_middleware/start.sh @@ -0,0 +1,2 @@ +docker run --name kafka --net=obj --ip 172.32.1.10 -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=172.32.1.10 --env ADVERTISED_PORT=9092 -d spotify/kafka +docker run --name redis --net=obj --ip 172.32.1.11 -d redis diff --git a/distributed_obj_system/build_middleware/stop.sh b/distributed_obj_system/build_middleware/stop.sh new file mode 100755 index 0000000..feb1a3b --- /dev/null +++ b/distributed_obj_system/build_middleware/stop.sh @@ -0,0 +1,2 @@ +docker rm kafka -f +docker rm redis -f diff --git a/distributed_obj_system/frontend.py b/distributed_obj_system/frontend.py index b59413f..fb294a5 100644 --- a/distributed_obj_system/frontend.py +++ b/distributed_obj_system/frontend.py @@ -1,6 +1,6 @@ import os, sys sys.path.append(os.path.join(os.path.dirname(__file__), '..')) -from utils.utils import sha256, compress +from utils.utils import sha256, compress, decompress import requests import argparse @@ -10,11 +10,12 @@ def getArgParser(): parser = argparse.ArgumentParser(description='A frontend for object storage system.', formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument('--server', default='localhost', type=str, help='Server address for object storage system.') + parser.add_argument('--server', default='172.32.1.50', type=str, help='Server address for object storage system.') parser.add_argument('--port', default=PORT, type=int, help='Server port for object storage system.') parser.add_argument('--type', required=True, type=str, help='uo-> upload an object. \n' 'vl-> get version list by object name. \n' - 'do-> download an object\n') + 'do-> download an object\n' + 'ol-> object locations') parser.add_argument('--version', required=False, type=str, help='Object version') parser.add_argument('--name', required=True, type=str, help='Object name to upload') parser.add_argument('--content', required=False, type=str, help='Object content to upload') @@ -26,7 +27,15 @@ def getArgParser(): def uploadObject(args): content = args.content name = args.name + file = args.file + if not name: + print('--name is required') + if not content and not file: + print('--name or --file is required') + + if file: + content = open(file, 'r').read() content = compress(content.encode()) # content type will be byte after compress hash = sha256(content) size = len(content) @@ -42,20 +51,58 @@ def uploadObject(args): print(i, chunk, size, res.status_code) def downloadObject(args): - size = requests.get(f"{server_url}/objects/size/{name}").json()['size'] + name = args.name + version = args.version + if not name: + print('--name is required') + return + if not version: + print('--version is required') + return + size = requests.get(f"{server_url}/objects/size/{name}?version={version}").json()['size'] print(f'Size of Object: {size} bytes') - content = '' + content = b'' + start = 0 while len(content)