Skip to content

Commit

Permalink
Merge branch 'master' of github.com:allwefantasy/streamingpro
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Jul 2, 2018
2 parents 910541c + 90f66a9 commit 083cb23
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ class SQLModelExplainInPlace extends SQLAlg with Functions {
import df.sparkSession.sqlContext.implicits._
val ExternalCommandRunnerDataframe = Seq("").toDF("model_expalin_inplace_field")
ExternalCommandRunnerDataframe.rdd.map(f => {
val fs = FileSystem.get(new Configuration())
val paramMap = new util.HashMap[String, Object]()
paramMap.put("systemParam", systemParam.asJava)
paramMap.put("modelPath", params.getOrElse("modelPath", ""))
val modelPath = params.getOrElse("modelPath", "")
val tempModelPath = s"/tmp/${UUID.randomUUID().toString}.pkl"
fs.copyToLocalFile(new Path(modelPath), new Path(tempModelPath))
paramMap.put("modelPath", tempModelPath)
val tempModelLocalPath = s"${SQLPythonFunc.getLocalBasePath}/${UUID.randomUUID().toString}/0"
FileUtils.forceMkdir(new File(tempModelLocalPath))

Expand All @@ -48,12 +52,11 @@ class SQLModelExplainInPlace extends SQLAlg with Functions {
paramMap,
MapType(StringType, MapType(StringType, StringType)),
pythonScript.fileContent,
pythonScript.fileName, modelPath = null, kafkaParam = null,
pythonScript.fileName, modelPath = null, kafkaParam = Map[String, String](),
validateData = null
)
res.foreach(f => f)
//模型保存到hdfs上
val fs = FileSystem.get(new Configuration())
val modelHDFSPath = SQLPythonFunc.getAlgModelPath(path) + "/0"
fs.delete(new Path(modelHDFSPath), true)
fs.copyFromLocalFile(new Path(tempModelLocalPath),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ def save_attributes(model_file, attributes_file):
with open(model_file, 'rb') as fr:
model = pickle.load(fr)
attribute_dict_numpy = {k: repr(v.tolist()).replace("\n", "").replace(" ", "") for (k, v) in model.__dict__.items() if (type(v) == np.ndarray)}
attribute_dict_normal = {k: repr(v) for (k, v) in model.__dict__.items() if (type(v) != np.ndarray)}
attribute_dict_normal = {k: repr(v).replace("\n", "").replace(" ", "") for (k, v) in model.__dict__.items() if (type(v) != np.ndarray)}
attribute_dict = dict(attribute_dict_numpy, **attribute_dict_normal)
attribute_json = json.dumps(attribute_dict)
attribute_json = json.dumps(attribute_dict).replace("\n", "").replace(" ", "")
with open(attributes_file, 'w') as f:
f.write(attribute_json)
params_file = os.path.join(os.getcwd(), "python_temp.pickle")
Expand Down

0 comments on commit 083cb23

Please sign in to comment.