

If you have other non-serializable attributes in the 'ProcessActions' class, make sure they are either transient (marked with decorator) or moved outside the class if not needed.Self.logger = create_logger() # Create the logger object inside the class

This way, it will not be passed as an argument during serialization.ĭef _init_(self, config, rule, session=spark_session): Move the 'logger' object creation inside the 'ProcessActions' class and make it a class attribute.
#PYSPARK ADD DAYS TO DATE CODE#
To resolve this issue, you can modify your code in the following ways: In particular, objects that rely on underlying system resources like locks cannot be serialized. Spark's serialization mechanism uses Pickle, and not all objects can be pickled. In this case, the issue is likely caused by trying to pass a logger object ('logger') to the 'ProcessActions' class, which Spark attempts to serialize and distribute to the worker nodes. The error you're encountering, _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object, occurs when trying to serialize an object that contains a non-serializable attribute. (See the valid date parts in the table below) value is an integer number to be added to the datepart of the inputdate. I face this problem also using df.foreach instead of df.foreachPartition The DATEADD () function accepts three arguments: datepart is the part of date to which the DATEADD () function will add the value. I have also tried to pass a function to the foreachBatch() instead of a class and its method but the result is the same. I tried removing every argument passed to the ProcessAction() in the foreachBatch(), but it breaks the same way. My code before was like this: def process(self, df:, _: int) -> None: I was using df.collect() instead of df.foreachPartition, but I noticed it is not a best practice because it limits the parallelism as it demand work to a single node driver. _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object foreachBatch(ProcessActions(config, rule, logger, session=spark_session).process)ĭef process(self, df:, _: int) -> None:īut whenever df.foreachPartition is called, I get the error : Here's what I'm doing: streaming_query = (į'', I'm not understanding why my code raise an error.
