ca를 전송하려고합니다. 10GB의 json 데이터 (제 경우에는 트윗)를 arangodb의 콜렉션에 저장합니다. 나는 또한 그것을 위해 JOBLIB 사용하려고 해요 :데이터를 arangodb로 전송하려고 할 때 데이터가 섞이게됩니다.
from ArangoConn import ArangoConn
import Userdata as U
import encodings
from joblib import Parallel,delayed
import json
from glob import glob
import time
def progress(total, prog, start, stri = ""):
if(prog == 0):
print("")
prog = 1;
perc = prog/total
diff = time.time() - start
rem = (diff/prog) * (total - prog)
bar = ""
for i in range(0,int(perc*20)):
bar = bar + "|"
for i in range(int(perc*20),20):
bar = bar + " "
print("\r"+"progress: " + "[" + bar + "] " + str(prog) + " of " +
str(total) + ": {0:.1f}% ".format(perc * 100) + "- " +
time.strftime("%H:%M:%S", time.gmtime(rem)) + " " + stri, end="")
def processfile(filepath):
file = open(filepath,encoding='utf-8')
s = file.read()
file.close()
data = json.loads(s)
Parallel(n_jobs=12, verbose=0, backend="threading"
(map(delayed(ArangoConn.createDocFromObject), data))
files = glob(U.path+'/*.json')
i = 1
j = len(files)
starttime = time.time()
for f in files:
progress(j,i,starttime,f)
i = i+1
processfile(f)
그리고 좀 그런 식으로 작동
from pyArango.connection import Connection
import Userdata as U
import time
class ArangoConn:
def __init__(self,server,user,pw,db,collectionname):
self.server = server
self.user = user
self.pw = pw
self.db = db
self.collectionname = collectionname
self.connection = None
self.dbHandle = self.connect()
if not self.dbHandle.hasCollection(name=self.collectionname):
coll = self.dbHandle.createCollection(name=collectionname)
else:
coll = self.dbHandle.collections[collectionname]
self.collection = coll
def db_createDocFromObject(self, obj):
data = obj.__dict__()
doc = self.collection.createDocument()
for key,value in data.items():
doc[key] = value
doc._key= str(int(round(time.time() * 1000)))
doc.save()
def connect(self):
self.connection = Connection(arangoURL=self.server + ":8529",
username=self.user, password=self.pw)
if not self.connection.hasDatabase(self.db):
db = self.connection.createDatabase(name=self.db)
else:
db = self.connection.databases.get(self.db)
return db
def disconnect(self):
self.connection.disconnectSession()
def getAllData(self):
docs = []
for doc in self.collection.fetchAll():
docs.append(self.doc_to_result(doc))
return docs
def addData(self,obj):
self.db_createDocFromObject(obj)
def search(self,collection,search,prop):
docs = []
aql = """FOR q IN """+collection+""" FILTER q."""+prop+""" LIKE
"%"""+search+"""%" RETURN q"""
results = self.dbHandle.AQLQuery(aql, rawResults=False, batchSize=1)
for doc in results:
docs.append(self.doc_to_result(doc))
return docs
def doc_to_result(self,arangodoc):
modstore = arangodoc.getStore()
modstore["_key"] = arangodoc._key
return modstore
def db_createDocFromJson(self,json):
for d in json:
doc = self.collection.createDocument()
for key,value in d.items():
doc[key] = value
doc._key = str(int(round(time.time() * 1000)))
doc.save()
@staticmethod
def createDocFromObject(obj):
c = ArangoConn(U.url, U.user, U.pw, U.db, U.collection)
data = obj
doc = c.collection.createDocument()
for key, value in data.items():
doc[key] = value
doc._key = doc["id"]
doc.save()
c.connection.disconnectSession()
합니다. 내 문제는 데이터베이스에있는 데이터가 어떻게 든 섞여 있다는 것입니다.
당신이 스크린 샷 "ID"와 "ID_STR"에서 볼 수 있듯이
는 동일하지 않습니다 - 그들은해야한다.이 나는 트윗 ID로 키를 설정할 수 있도록 몇 가지 포인트에서 databese의 기본 키 때문에 스레딩의 을 "충돌"할 수 있다고 생각 : 내가 지금까지 조사 어떤
.여러 스레드없이 시도했습니다. 스레딩은 내가 데이터베이스에 보내는 데이터를보고 문제
에게 될 것 같지 않습니다 ... 모든
그러나 잘 될 것 같다 최대한 빨리이 DB와 통신으로 데이터가 섞인다.
필자의 교수는 pyarango의 어떤 것이 threadsafe가 아니며 데이터를 엉망으로 만들 것이라고 생각했지만 스레딩이 문제가 아닌 것처럼 생각합니다.
이 동작이 어디에서 왔는지 아이디어가 없습니다 ... 아이디어가 있으십니까?
전환은 arangodb에서 발생합니다. 나는 그것을 보내기 전에 바로 데이터를 확인했기 때문에 거기에 확신 할 수 있습니다. 그래서 제가 할 수있는 일은 문자열로 저장하는 것입니다. –
Ok, 데이터를 저장하고 ArangoDB에서 다시 검색하는 데 사용하는 ArangoDB API를 알고 있습니까? 이렇게하면 변환/절단이 있는지, 어디에서 찾을 수 있는지 알 수 있습니다. 감사! – stj
현재 Arangodb 3.1.28을 사용하고 있지만 도움이된다면 업그레이드해도 문제가 없습니다. 파이썬 측면에서 나는 pyarango (http://bioinfo.iric.ca/~daoudat/pyArango/)를 사용하고 있습니다. 어떤 마법 버전인지 모르겠습니다. D –