From c61df708836b6cfbb2fb7df575a19fa8bd825186 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Thu, 17 Oct 2024 13:39:11 +0200 Subject: [PATCH 1/2] sweep: #7840 feat (TS): add getTransformationFilesAsJsonString --- .../DeveloperGuide/CodeTesting/index.rst | 8 +++---- .../DISET/private/Transports/BaseTransport.py | 22 ++++++++++--------- src/DIRAC/Core/Utilities/DEncode.py | 2 +- .../Client/TransformationCLI.py | 14 +++++------- .../Client/TransformationClient.py | 11 ++++++++-- .../DB/TransformationDB.py | 8 ++++--- .../Service/TransformationManagerHandler.py | 17 ++++++++++++++ 7 files changed, 54 insertions(+), 28 deletions(-) diff --git a/docs/source/DeveloperGuide/CodeTesting/index.rst b/docs/source/DeveloperGuide/CodeTesting/index.rst index 630d9f375e0..a7574cb0865 100644 --- a/docs/source/DeveloperGuide/CodeTesting/index.rst +++ b/docs/source/DeveloperGuide/CodeTesting/index.rst @@ -41,7 +41,7 @@ Testing itself could also speed up the development process rapidly tracing probl DIRAC is not different from that scenario, with the exception that service-oriented architecture paradigm, which is one of the basic concepts of the project, making the quality assurance and testing process the real challenge. However as DIRAC becomes more and more popular and now is being used by several different communities, -the main question is not: *to test or not to test?*, but rather: *how to test in an efficient way?* +the main question is not: *to test or not to test?*, but rather: *how to test in an efficient way?* [#]_. The topic of software testing is very complicated by its own nature, but depending on the testing method employed, the testing process itself can be implemented at any time in the development phase and ideally should cover many different levels of the system: @@ -89,13 +89,13 @@ This could be obtained by objects mocking technique, where all fragile component equivalents - test doubles. For that it is recommended to use mock_ module. Hence it is clear that knowledge of mock_ module API is essential. -Unit tests are typically created by the developer who will also write the code that is being tested. The tests may therefore share the same blind spots with the code: for example, a developer does not realize that certain input parameters must be checked, most likely neither the test nor the code will verify these input parameters. If the developer misinterprets the requirements specification for the module being developed, both the tests and the code will be wrong. Hence if the developer is going to prepare her own unit tests, she should pay attention and take extra care to implement proper testing suite, checking for every spot of possible failure (i.e. interactions with other components) and not trusting that someone else's code is always returning proper type and/or values. +Unit tests are typically created by the developer who will also write the code that is being tested. The tests may therefore share the same blind spots with the code: for example, a developer does not realize that certain input parameters must be checked, most likely neither the test nor the code will verify these input parameters [#]_. If the developer misinterprets the requirements specification for the module being developed, both the tests and the code will be wrong. Hence if the developer is going to prepare her own unit tests, she should pay attention and take extra care to implement proper testing suite, checking for every spot of possible failure (i.e. interactions with other components) and not trusting that someone else's code is always returning proper type and/or values. Test doubles ============ -Unit tests should run in *isolation*. Which means that they should run without having DIRAC fully installed, because, remember, they should just test the code logic. If, to run a unit test in DIRAC, you need a dirac.cfg file to be present, you are failing your goal. +Unit tests should run in *isolation*. Which means that they should run without having DIRAC fully installed, because, remember, they should just test the code logic. If, to run a unit test in DIRAC, you need a dirac.cfg file to be present, you are failing your goal [#]_. To isolate the code being tested from depended-on components it is convenient and sometimes necessary to use *test doubles*: simplified objects or procedures, that behaves and looks like the their real-intended counterparts, but are actually simplified versions @@ -371,9 +371,9 @@ Footnotes ============ .. [#] Or even better software requirements document, if any of such exists. Otherwise this is a great opportunity to prepare one. +.. [#] You may ask: *isn't it silly?* No, in fact it isn't. Validation of input parameters is one of the most important tasks during testing. .. [#] To better understand this term, think about a movie industry: if a scene movie makers are going to film is potentially dangerous and unsafe for the leading actor, his place is taken over by a stunt double. .. [#] And eventually is killing him with a gun. At least in a TV show. -.. [#] You may ask: *isn't it silly?* No, in fact it isn't. Validation of input parameters is one of the most important tasks during testing. .. _Python: http://www.python.org/ diff --git a/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py b/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py index 7001a703d63..d3fb1e618eb 100755 --- a/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py +++ b/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py @@ -42,7 +42,7 @@ class BaseTransport: def __init__(self, stServerAddress, bServerMode=False, **kwargs): self.bServerMode = bServerMode self.extraArgsDict = kwargs - self.byteStream = b"" + self.byteStream = bytearray() self.packetSize = 1048576 # 1MiB self.stServerAddress = stServerAddress self.peerCredentials = {} @@ -191,7 +191,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal maxBufferSize = max(maxBufferSize, 0) try: # Look either for message length of keep alive magic string - iSeparatorPosition = self.byteStream.find(b":", 0, 10) + iSeparatorPosition = self.byteStream.find(b":") keepAliveMagicLen = len(BaseTransport.keepAliveMagic) isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0 # While not found the message length or the ka, keep receiving @@ -204,9 +204,10 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal if not retVal["Value"]: return S_ERROR("Peer closed connection") # New data! - self.byteStream += retVal["Value"] - # Look again for either message length of ka magic string - iSeparatorPosition = self.byteStream.find(b":", 0, 10) + self.byteStream.extend(retVal["Value"]) + + # Look again for either message length or keep alive magic string + iSeparatorPosition = self.byteStream.find(b":") isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0 # Over the limit? if maxBufferSize and len(self.byteStream) > maxBufferSize and iSeparatorPosition == -1: @@ -214,7 +215,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal # Keep alive magic! if isKeepAlive: gLogger.debug("Received keep alive header") - # Remove the ka magic from the buffer and process the keep alive + # Remove the keep-alive magic from the buffer and process the keep-alive self.byteStream = self.byteStream[keepAliveMagicLen:] return self.__processKeepAlive(maxBufferSize, blockAfterKeepAlive) # From here it must be a real message! @@ -225,7 +226,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal if readSize >= pkgSize: # If we already have all the data we need data = pkgData[:pkgSize] - self.byteStream = pkgData[pkgSize:] + self.byteStream = self.byteStream[pkgSize + iSeparatorPosition + 1 :] else: # If we still need to read stuff pkgMem = BytesIO() @@ -245,11 +246,12 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal # Data is here! take it out from the bytestream, dencode and return if readSize == pkgSize: data = pkgMem.getvalue() - self.byteStream = b"" - else: # readSize > pkgSize: + self.byteStream = bytearray() # Reset the byteStream + else: pkgMem.seek(0, 0) data = pkgMem.read(pkgSize) - self.byteStream = pkgMem.read() + self.byteStream = bytearray(pkgMem.read()) # store the rest in bytearray + try: data = MixedEncode.decode(data)[0] except Exception as e: diff --git a/src/DIRAC/Core/Utilities/DEncode.py b/src/DIRAC/Core/Utilities/DEncode.py index 3a20b868192..bae1a2179c1 100755 --- a/src/DIRAC/Core/Utilities/DEncode.py +++ b/src/DIRAC/Core/Utilities/DEncode.py @@ -521,7 +521,7 @@ def decode(data): if not data: return data # print("DECODE FUNCTION : %s" % g_dDecodeFunctions[ sStream [ iIndex ] ]) - if not isinstance(data, bytes): + if not isinstance(data, (bytes, bytearray)): raise NotImplementedError("This should never happen") return g_dDecodeFunctions[data[0]](data, 0) diff --git a/src/DIRAC/TransformationSystem/Client/TransformationCLI.py b/src/DIRAC/TransformationSystem/Client/TransformationCLI.py index 9a8d0399d2b..1821893d53c 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationCLI.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationCLI.py @@ -340,13 +340,12 @@ def do_getFiles(self, args): selectDict = {"TransformationID": res["Value"]["TransformationID"]} if status: selectDict["Status"] = status - res = self.transClient.getTransformationFiles(condDict=selectDict) + columns = ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"] + res = self.transClient.getTransformationFiles(condDict=selectDict, columns=columns) if not res["OK"]: print(f"Failed to get transformation files: {res['Message']}") elif res["Value"]: - self._printFormattedDictList( - res["Value"], ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"], "LFN", "LFN" - ) + self._printFormattedDictList(res["Value"], columns, "LFN", "LFN") else: print("No files found") @@ -367,7 +366,8 @@ def do_getFileStatus(self, args): print(f"Failed to get transformation information: {res['Message']}") else: selectDict = {"TransformationID": res["Value"]["TransformationID"]} - res = self.transClient.getTransformationFiles(condDict=selectDict) + columns = ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"] + res = self.transClient.getTransformationFiles(condDict=selectDict, columns=columns) if not res["OK"]: print(f"Failed to get transformation files: {res['Message']}") elif res["Value"]: @@ -376,9 +376,7 @@ def do_getFileStatus(self, args): if fileDict["LFN"] in lfns: filesList.append(fileDict) if filesList: - self._printFormattedDictList( - filesList, ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"], "LFN", "LFN" - ) + self._printFormattedDictList(filesList, columns, "LFN", "LFN") else: print("Could not find any LFN in", lfns, "for transformation", transName) else: diff --git a/src/DIRAC/TransformationSystem/Client/TransformationClient.py b/src/DIRAC/TransformationSystem/Client/TransformationClient.py index 7f1b7daa81a..25d2266be71 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationClient.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationClient.py @@ -3,6 +3,7 @@ from DIRAC import S_OK, S_ERROR, gLogger from DIRAC.Core.Base.Client import Client, createClient from DIRAC.Core.Utilities.List import breakListIntoChunks +from DIRAC.Core.Utilities.JEncode import decode as jdecode from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.TransformationSystem.Client import TransformationStatus from DIRAC.TransformationSystem.Client import TransformationFilesStatus @@ -176,9 +177,12 @@ def getTransformationFiles( timeStamp = "LastUpdate" if "LFN" not in condDict: - res = rpcClient.getTransformationFiles( + res = rpcClient.getTransformationFilesAsJsonString( condDict, older, newer, timeStamp, orderAttribute, offset, maxfiles, columns ) + if not res["OK"]: + return res + res, _ = jdecode(res["Value"]) # TransformationDB.getTransformationFiles includes a "Records"/"ParameterNames" # that we don't want to return to the client so explicitly return S_OK with the value if not res["OK"]: @@ -204,9 +208,12 @@ def getTransformationFiles( # Apply the offset to the list of LFNs condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit] # No limit and no offset as the list is limited already - res = rpcClient.getTransformationFiles( + res = rpcClient.getTransformationFilesAsJsonString( condDict, older, newer, timeStamp, orderAttribute, None, None, columns ) + if not res["OK"]: + return res + res, _ = jdecode(res["Value"]) if not res["OK"]: gLogger.error( "Error getting files for transformation %s (offset %d), %s" diff --git a/src/DIRAC/TransformationSystem/DB/TransformationDB.py b/src/DIRAC/TransformationSystem/DB/TransformationDB.py index f311d0d23bc..b69447644e1 100755 --- a/src/DIRAC/TransformationSystem/DB/TransformationDB.py +++ b/src/DIRAC/TransformationSystem/DB/TransformationDB.py @@ -595,6 +595,7 @@ def getTransformationFiles( offset=None, connection=False, columns=None, + include_web_records=True, ): """Get files for the supplied transformations with support for the web standard structure""" connection = self.__getConnection(connection) @@ -632,11 +633,12 @@ def getTransformationFiles( return res resultList = [dict(zip(columns, row)) for row in res["Value"]] - webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]] result = S_OK(resultList) - result["Records"] = webList - result["ParameterNames"] = columns + if include_web_records: + webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]] + result["Records"] = webList + result["ParameterNames"] = columns return result def getFileSummary(self, lfns, connection=False): diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index d566dde6e65..1bb5af6ca29 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -1,9 +1,11 @@ """ Service for interacting with TransformationDB """ + from DIRAC import S_OK, S_ERROR from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Security.Properties import SecurityProperty from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning +from DIRAC.Core.Utilities.JEncode import encode as jencode from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.TransformationSystem.Client import TransformationFilesStatus @@ -290,6 +292,7 @@ def export_getTransformationFiles( limit=None, offset=None, columns=None, + include_web_records=True, ): if not condDict: condDict = {} @@ -303,8 +306,22 @@ def export_getTransformationFiles( offset=offset, connection=False, columns=columns, + include_web_records=include_web_records, ) + types_getTransformationFilesAsJsonString = types_getTransformationFiles + + def export_getTransformationFilesAsJsonString(self, *args, **kwargs): + """ + DEncode cannot cope with nested structures of multiple millions items. + Encode everything as a json string, that DEncode can then transmit faster. + + This will be the default as of v9.0 + """ + kwargs["include_web_records"] = False + res = self.export_getTransformationFiles(*args, **kwargs) + return S_OK(jencode(res)) + #################################################################### # # These are the methods to manipulate the TransformationTasks table From 07c836025c8891ab0320d5a0eb64eb1cda931203 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Wed, 6 Nov 2024 14:11:02 +0100 Subject: [PATCH 2/2] fix: deprecated getTransformationFilesAsJsonString --- .../Client/TransformationClient.py | 4 +-- .../DB/TransformationDB.py | 13 +-------- .../Service/TransformationManagerHandler.py | 29 +++++++++---------- 3 files changed, 17 insertions(+), 29 deletions(-) diff --git a/src/DIRAC/TransformationSystem/Client/TransformationClient.py b/src/DIRAC/TransformationSystem/Client/TransformationClient.py index 25d2266be71..2ff3945e45f 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationClient.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationClient.py @@ -177,7 +177,7 @@ def getTransformationFiles( timeStamp = "LastUpdate" if "LFN" not in condDict: - res = rpcClient.getTransformationFilesAsJsonString( + res = rpcClient.getTransformationFiles( condDict, older, newer, timeStamp, orderAttribute, offset, maxfiles, columns ) if not res["OK"]: @@ -208,7 +208,7 @@ def getTransformationFiles( # Apply the offset to the list of LFNs condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit] # No limit and no offset as the list is limited already - res = rpcClient.getTransformationFilesAsJsonString( + res = rpcClient.getTransformationFiles( condDict, older, newer, timeStamp, orderAttribute, None, None, columns ) if not res["OK"]: diff --git a/src/DIRAC/TransformationSystem/DB/TransformationDB.py b/src/DIRAC/TransformationSystem/DB/TransformationDB.py index b69447644e1..6d83a69befb 100755 --- a/src/DIRAC/TransformationSystem/DB/TransformationDB.py +++ b/src/DIRAC/TransformationSystem/DB/TransformationDB.py @@ -595,7 +595,6 @@ def getTransformationFiles( offset=None, connection=False, columns=None, - include_web_records=True, ): """Get files for the supplied transformations with support for the web standard structure""" connection = self.__getConnection(connection) @@ -634,12 +633,7 @@ def getTransformationFiles( resultList = [dict(zip(columns, row)) for row in res["Value"]] - result = S_OK(resultList) - if include_web_records: - webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]] - result["Records"] = webList - result["ParameterNames"] = columns - return result + return S_OK(resultList) def getFileSummary(self, lfns, connection=False): """Get file status summary in all the transformations""" @@ -890,13 +884,9 @@ def getTransformationTasks( return res if condDict is None: condDict = {} - webList = [] resultList = [] for row in res["Value"]: - # Prepare the structure for the web - rList = [str(item) if not isinstance(item, int) else item for item in row] taskDict = dict(zip(self.TASKSPARAMS, row)) - webList.append(rList) if inputVector: taskDict["InputVector"] = "" taskID = taskDict["TaskID"] @@ -909,7 +899,6 @@ def getTransformationTasks( return res resultList.append(taskDict) result = S_OK(resultList) - result["Records"] = webList result["ParameterNames"] = self.TASKSPARAMS return result diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 1bb5af6ca29..0f0e313e728 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -1,18 +1,18 @@ """ Service for interacting with TransformationDB """ -from DIRAC import S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Security.Properties import SecurityProperty +from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import encode as jencode from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.RequestManagementSystem.Client.Operation import Operation +from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.TransformationSystem.Client import TransformationFilesStatus from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.RequestManagementSystem.Client.Request import Request -from DIRAC.RequestManagementSystem.Client.Operation import Operation - TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted( set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES) @@ -292,11 +292,10 @@ def export_getTransformationFiles( limit=None, offset=None, columns=None, - include_web_records=True, ): if not condDict: condDict = {} - return self.transformationDB.getTransformationFiles( + result = self.transformationDB.getTransformationFiles( condDict=condDict, older=older, newer=newer, @@ -306,21 +305,21 @@ def export_getTransformationFiles( offset=offset, connection=False, columns=columns, - include_web_records=include_web_records, ) + # DEncode cannot cope with nested structures of multiple millions items. + # Encode everything as a json string, that DEncode can then transmit faster. + + return S_OK(jencode(result)) + types_getTransformationFilesAsJsonString = types_getTransformationFiles + @deprecated("Use getTransformationFiles instead") def export_getTransformationFilesAsJsonString(self, *args, **kwargs): """ - DEncode cannot cope with nested structures of multiple millions items. - Encode everything as a json string, that DEncode can then transmit faster. - - This will be the default as of v9.0 + Deprecated call -- redirect to getTransformationFiles """ - kwargs["include_web_records"] = False - res = self.export_getTransformationFiles(*args, **kwargs) - return S_OK(jencode(res)) + return self.export_getTransformationFiles(*args, **kwargs) #################################################################### #