Question
Centene
US
Last activity: 18 Jul 2023 7:27 EDT
Remove items in "Ready to process" from Queue Processor
We have a queue processor 'pyProcessNotification' not running and about 100k items are in the "Ready to Process" state. How can we delete/clear the queue since we don't want to trigger notifications for old items.
Pega version is 8.3.1
Thank you
-
Like (0)
-
Share this page Facebook Twitter LinkedIn Email Copying... Copied!
Accepted Solution
Updated: 10 Jun 2021 15:29 EDT
Pegasystems Inc.
US
You can create an activity with one Java step using the attached code.
Ensure to update the TopicName field in the attached code with your QP name in Capital letters for which you want to truncate the ready to process queue.
Let me know if that helps!!
-
Vijay Vodnala Varun Gupta AMRITA LAHIRI Shailendra Kumar Sankhala Som Venkata Kumar Rallapati and 9 More
Updated: 10 Jun 2021 15:29 EDT
Centene
US
@Harish_Gunneri : Thanks Harish. This worked for us.
Capgemini
IN
Hi Harish,
I am also facing the issue. I have tried with the code which you shared but i am getting issue with the withTopicName method. Let me know is there any other way to clear the Ready to process queues from Queue Processor.
Error Details are below:
The method withTopicName(String) is undefined for the type StreamDataSetBuilder.
Thanks in Advance.
Regards,
Raghu Kumar K
Cotiviti
IN
@Harish_Gunneri This is really helpful and it helped us in clearing all the records
-
Harish Gunneri
Verizon Data services India
IN
Could you please provide that Java code. i could not find the code in this thread.
ING Bank
NL
Hello @Harish_Gunneri We are on version, 8.6 and when trying to save the activity with the java code you provided, resulted in the following compilation error
The method withTopicName(String) is undefined for the type StreamDataSetBuilder
Compile failed.
Cognizant
GB
@VVODNALAItems queued to Queue Processors are pushed to Kafka, not database table.
In order to clear the queues, you can try removing the partition directories for your Queue Processor, which are created on the app filesystem. These partitions can be located in the 'kafka-data' directory on each app-node that supports 'Stream' service. For example, kafka-data can be located inside the tomcat installation directory for a Pega app running on tomcat.
Prior to removing the affected partitions from each node separately, ensure the app-servers are stopped to prevent replication between the Stream nodes. Otherwise the Stream service will repopulate the nodes with partition data soon as you remove it from any node within the cluster.
Let me know how this goes.
Pratik Agarwal
-
Vijay Vodnala Pratik Agarwal Ramsurendar Govindarajan
Capgemini
IN
What are the steps you performed let me know. Or else did i miss anything to clear the Ready to Process items in Queue Processor.
Thanks in Advance.
Raghu K
Pegasystems Inc.
US
@RaghuK60 -@RaghuK60@RaghuK60Please@RaghuK60use@RaghuK60the@RaghuK60attached@RaghuK60java@RaghuK60snippet@RaghuK60for@RaghuK60the@RaghuK60latest@RaghuK60versions.
-
Venkatesh Gudeenti AMRITA LAHIRI Avinash Haridasu Harjot Kaur Bajwa Venkata Rakhesh Godavarthi and 2 More
Pegasystems
IN
@Harish_Gunneri , Hi Harish, We are getting com.pega.fnx.stream.spi.StreamServiceException: Stream SPI error in pega when running this code in our environment
Updated: 13 Apr 2022 16:34 EDT
Optum
US
Hi For V8.2.8, here is the code
java.util.ArrayList<String> partitionKeys = new java.util.ArrayList<>(); partitionKeys.add(".pzMessageContent.pzInsKey"); com.pega.dsm.dnode.api.dataset.DataSet dataset = com.pega.dsm.dnode.api.dataset.stream.StreamDataSet.builder() .withClassName("System-Message-QueueProcessor") .withTopicName("<QP_NAME IN CAPITAL LETTER>") .withPartitionKeys(partitionKeys) .withIdempotentSave(true) .withOperation(com.pega.dsm.dnode.api.dataset.operation.SaveOperation.NAME) .withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaGetTopicsOperation.NAME) .withOperation(com.pega.dsm.dnode.api.dataset.operation.BrowseOperation.NAME) .withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaDropOperation.NAME) .withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME) .withSerde(com.pega.dsm.kafka.api.serde.ClipboardPageStreamSerde.create()) .build(tools); com.pega.dsm.dnode.api.dataset.operation.Operation operation = dataset.getOperationByName(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME); ((com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation)operation).truncate().await(tools); |
Ai4Process Ltd
GB
@Harish_Gunneriwhile runnig the activity with the above java code getting this error.
Java Exception: java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.pega.fnx.stream.spi.StreamServiceException: Unknown stream ProcessFailedEvents
Can you please let me know solution for this.
Tekclan Software Solutions
IN
Please go through the below article, hope it helps.