[redisson]RTopic如果执行removeAllListeners疑似会导致发布订阅阻塞

2024-07-18 581 views
7
redisson版本
 <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.8.2</version>
</dependency>
redis信息
# Server
redis_version:3.2.9
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:d837dd4aae3a6933
redis_mode:standalone
os:Linux 4.9.27-moby x86_64
arch_bits:64
multiplexing_api:epoll
gcc_version:4.9.2
process_id:1
run_id:3c1a4fd32e311a07d2d9c5ece9b937580467a241
tcp_port:6379
uptime_in_seconds:7678
uptime_in_days:0
hz:10
lru_clock:13324804
executable:/data/redis-server
config_file:

# Clients
connected_clients:83
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

# Memory
used_memory:2539056
used_memory_human:2.42M
used_memory_rss:4964352
used_memory_rss_human:4.73M
used_memory_peak:3540152
used_memory_peak_human:3.38M
total_system_memory:2095890432
total_system_memory_human:1.95G
used_memory_lua:37888
used_memory_lua_human:37.00K
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
mem_fragmentation_ratio:1.96
mem_allocator:jemalloc-4.0.3

# Persistence
loading:0
rdb_changes_since_last_save:0
rdb_bgsave_in_progress:0
rdb_last_save_time:1540050984
rdb_last_bgsave_status:ok
rdb_last_bgsave_time_sec:0
rdb_current_bgsave_time_sec:-1
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok

# Stats
total_connections_received:3548
total_commands_processed:8491
instantaneous_ops_per_sec:0
total_net_input_bytes:2401890
total_net_output_bytes:21135836
instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
rejected_connections:0
sync_full:0
sync_partial_ok:0
sync_partial_err:0
expired_keys:0
evicted_keys:0
keyspace_hits:263
keyspace_misses:2
pubsub_channels:1
pubsub_patterns:0
latest_fork_usec:410
migrate_cached_sockets:0

# Replication
role:master
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

# CPU
used_cpu_sys:17.84
used_cpu_user:3.51
used_cpu_sys_children:0.00
used_cpu_user_children:0.01

# Cluster
cluster_enabled:0

# Keyspace
db0:keys=3,expires=0,avg_ttl=0
复现代码
import org.redisson.Redisson;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;

import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class RedissonSubErrorTest {

    static RedissonClient redissonClient = Redisson.create();
    static ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2);

    public static RTopic<String> getRequestTopic() {
        return redissonClient.getTopic("request-topic");
    }

    public static RTopic<String> getResponseTopic(String msgId) {
        return redissonClient.getTopic("response-topic-" + msgId);
    }

    public static void consume(String msgId, Consumer<String> consumer) {
        RTopic<String> topic = getResponseTopic(msgId);
        topic.addListener((channel, msg) -> {
            consumer.accept(msg);

            //*********[删除本行可解决此问题]**********
            topic.removeAllListeners();
        });

    }

    public static void main(String[] args) {
        AtomicLong responseCounter = new AtomicLong();
        AtomicLong requestCounter = new AtomicLong();

        executorService.submit(() -> {
            try {
                getRequestTopic()
                        .addListener((channel, msg) -> {
                            System.out.print("accept [" + msg + "] publish result: ");
                            getResponseTopic(msg).publish("response-" + msg + "(" + responseCounter.incrementAndGet() + ")");
                            System.out.println("ok");
                        });
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        executorService.scheduleAtFixedRate(() -> {
            String msgId = UUID.randomUUID().toString();
            try {
                System.out.print("consume [" + msgId+"] times "+requestCounter.incrementAndGet());
                consume(msgId, msg->{

                });
                System.out.println(" ok");
                System.out.print("publish request [" + msgId+"]");
                getRequestTopic().publish(msgId);
                System.out.println(" ok");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }, 100, 100, TimeUnit.MILLISECONDS);
    }
}
输出
consume [b00ca1ff-f7f4-4c57-994e-66c569ccef59] times 1 ok
publish request [b00ca1ff-f7f4-4c57-994e-66c569ccef59] ok
accept [b00ca1ff-f7f4-4c57-994e-66c569ccef59] publish result: ok
consume [fd3e8c42-6544-46a2-aeb8-fb2e7fff4677] times 2 ok
publish request [fd3e8c42-6544-46a2-aeb8-fb2e7fff4677] ok
accept [fd3e8c42-6544-46a2-aeb8-fb2e7fff4677] publish result: ok
consume [58337154-1947-4429-bd46-ed16c8293cc2] times 3 ok
publish request [58337154-1947-4429-bd46-ed16c8293cc2] ok
accept [58337154-1947-4429-bd46-ed16c8293cc2] publish result: ok
consume [f89b7327-360e-4f1e-ab21-3207e5d1f979] times 4 ok
publish request [f89b7327-360e-4f1e-ab21-3207e5d1f979] ok
accept [f89b7327-360e-4f1e-ab21-3207e5d1f979] publish result: ok
consume [b81afdb8-fe94-4a70-8232-6f8eb5fde86a] times 5 ok
publish request [b81afdb8-fe94-4a70-8232-6f8eb5fde86a] ok
accept [b81afdb8-fe94-4a70-8232-6f8eb5fde86a] publish result: ok
consume [7c294875-7fc6-40c8-a4ce-2fcf9700b83c] times 6 ok
publish request [7c294875-7fc6-40c8-a4ce-2fcf9700b83c] ok
accept [7c294875-7fc6-40c8-a4ce-2fcf9700b83c] publish result: ok
consume [9d3b8e9f-7fdc-440d-84d9-84e915d7a3f6] times 7 ok
publish request [9d3b8e9f-7fdc-440d-84d9-84e915d7a3f6] ok
accept [9d3b8e9f-7fdc-440d-84d9-84e915d7a3f6] publish result: ok
consume [30cf5df0-7da0-461a-a9c9-b87aecd9a0bd] times 8 ok
publish request [30cf5df0-7da0-461a-a9c9-b87aecd9a0bd] ok
accept [30cf5df0-7da0-461a-a9c9-b87aecd9a0bd] publish result: ok
consume [4b5ff736-8728-4a39-af27-a1e6e9cfddb8] times 9 ok
publish request [4b5ff736-8728-4a39-af27-a1e6e9cfddb8] ok
accept [4b5ff736-8728-4a39-af27-a1e6e9cfddb8] publish result: ok
consume [dbbc3354-b3bd-4ab6-8557-f6821ec09948] times 10 ok
publish request [dbbc3354-b3bd-4ab6-8557-f6821ec09948] ok
accept [dbbc3354-b3bd-4ab6-8557-f6821ec09948] publish result: ok
consume [048072d4-8c28-46a2-954e-c05d6577f4db] times 11 ok
publish request [048072d4-8c28-46a2-954e-c05d6577f4db] ok
accept [048072d4-8c28-46a2-954e-c05d6577f4db] publish result: ok
consume [16d10170-58cf-40b3-bb45-187b7bb82da3] times 12 ok
publish request [16d10170-58cf-40b3-bb45-187b7bb82da3] ok
accept [16d10170-58cf-40b3-bb45-187b7bb82da3] publish result: ok
consume [1eecc1db-f479-45c7-9a43-d98b87c2cd7b] times 13 ok
publish request [1eecc1db-f479-45c7-9a43-d98b87c2cd7b] ok
accept [1eecc1db-f479-45c7-9a43-d98b87c2cd7b] publish result: ok
consume [3562cf2e-211e-4ec3-ba29-fa19f28f0159] times 14 ok
publish request [3562cf2e-211e-4ec3-ba29-fa19f28f0159] ok
accept [3562cf2e-211e-4ec3-ba29-fa19f28f0159] publish result: ok
consume [2a30a36e-4144-4619-b4d9-a9a9b2e3b22e] times 15 ok
publish request [2a30a36e-4144-4619-b4d9-a9a9b2e3b22e] ok
accept [2a30a36e-4144-4619-b4d9-a9a9b2e3b22e] publish result: ok
consume [bf85d4bb-f6d3-49e8-ad8c-22154de7f526] times 16 ok
publish request [bf85d4bb-f6d3-49e8-ad8c-22154de7f526] ok
accept [bf85d4bb-f6d3-49e8-ad8c-22154de7f526] publish result: ok
consume [fbd0e428-09bc-466e-a981-c317cc67649f] times 17 ok
publish request [fbd0e428-09bc-466e-a981-c317cc67649f] ok
accept [fbd0e428-09bc-466e-a981-c317cc67649f] publish result: ok
consume [2c38c431-c22d-463e-bbd5-601648dde4d3] times 18 ok
publish request [2c38c431-c22d-463e-bbd5-601648dde4d3] ok
accept [2c38c431-c22d-463e-bbd5-601648dde4d3] publish result: ok
consume [0bd47020-0039-4b66-9485-2a171d0f8b24] times 19 ok
publish request [0bd47020-0039-4b66-9485-2a171d0f8b24] ok
accept [0bd47020-0039-4b66-9485-2a171d0f8b24] publish result: ok
consume [5de00553-50d5-4340-ae18-76951cbc78d4] times 20 ok
publish request [5de00553-50d5-4340-ae18-76951cbc78d4] ok
accept [5de00553-50d5-4340-ae18-76951cbc78d4] publish result: ok
consume [55a7312b-f1ab-49a3-a49a-de7d18f58c02] times 21 ok
publish request [55a7312b-f1ab-49a3-a49a-de7d18f58c02] ok
accept [55a7312b-f1ab-49a3-a49a-de7d18f58c02] publish result: ok
consume [43e1361e-5806-42dc-a6fd-b0efb22c5cc1] times 22 ok
publish request [43e1361e-5806-42dc-a6fd-b0efb22c5cc1] ok
accept [43e1361e-5806-42dc-a6fd-b0efb22c5cc1] publish result: ok
consume [a749aa68-57ea-40fc-8c95-30b5bf7a1a9d] times 23 ok
publish request [a749aa68-57ea-40fc-8c95-30b5bf7a1a9d] ok
accept [a749aa68-57ea-40fc-8c95-30b5bf7a1a9d] publish result: ok
consume [df33ba6a-60ea-4e64-8e9b-69d9fbfe2b81] times 24 ok
publish request [df33ba6a-60ea-4e64-8e9b-69d9fbfe2b81] ok
accept [df33ba6a-60ea-4e64-8e9b-69d9fbfe2b81] publish result: ok
consume [9062568f-5c30-429b-8f46-b65ae7a5a711] times 25 ok
publish request [9062568f-5c30-429b-8f46-b65ae7a5a711] ok
accept [9062568f-5c30-429b-8f46-b65ae7a5a711] publish result: ok
consume [1ff12c28-1586-4086-ab61-b80ad11a2de9] times 26 ok
publish request [1ff12c28-1586-4086-ab61-b80ad11a2de9] ok
accept [1ff12c28-1586-4086-ab61-b80ad11a2de9] publish result: ok
consume [69472202-1aba-492b-ac1c-f9f4554b79a5] times 27 ok
publish request [69472202-1aba-492b-ac1c-f9f4554b79a5] ok
accept [69472202-1aba-492b-ac1c-f9f4554b79a5] publish result: ok
consume [5a75099e-1b9b-4f36-9706-690435ed8048] times 28 ok
publish request [5a75099e-1b9b-4f36-9706-690435ed8048] ok
accept [5a75099e-1b9b-4f36-9706-690435ed8048] publish result: ok
consume [043e5df4-0601-447b-9d99-bf8e388d6d91] times 29 ok
publish request [043e5df4-0601-447b-9d99-bf8e388d6d91] ok
accept [043e5df4-0601-447b-9d99-bf8e388d6d91] publish result: ok
consume [21b57a16-e865-4005-ad4e-14675f7ee729] times 30 ok
publish request [21b57a16-e865-4005-ad4e-14675f7ee729] ok
accept [21b57a16-e865-4005-ad4e-14675f7ee729] publish result: ok
consume [5514e4e6-a5c7-41e1-adac-27103e763c90] times 31 ok
publish request [5514e4e6-a5c7-41e1-adac-27103e763c90] ok
accept [5514e4e6-a5c7-41e1-adac-27103e763c90] publish result: ok
consume [56b41d0f-8b29-4045-8db9-327f4be42f65] times 32 ok
publish request [56b41d0f-8b29-4045-8db9-327f4be42f65] ok
accept [56b41d0f-8b29-4045-8db9-327f4be42f65] publish result: ok
consume [1b231538-79b4-47cd-9599-078c79ae5d17] times 33 ok
publish request [1b231538-79b4-47cd-9599-078c79ae5d17] ok
accept [1b231538-79b4-47cd-9599-078c79ae5d17] publish result: ok
consume [afcb6ad2-8daf-4b19-9850-a7c40a664bf7] times 34 ok
publish request [afcb6ad2-8daf-4b19-9850-a7c40a664bf7] ok
accept [afcb6ad2-8daf-4b19-9850-a7c40a664bf7] publish result: ok
consume [843e0d65-d5d5-41c5-bfc3-933e475faae1] times 35 ok
publish request [843e0d65-d5d5-41c5-bfc3-933e475faae1] ok
accept [843e0d65-d5d5-41c5-bfc3-933e475faae1] publish result: ok
consume [07b2d9f9-631e-4807-84a7-977a737e2e36] times 36 ok
publish request [07b2d9f9-631e-4807-84a7-977a737e2e36] ok
accept [07b2d9f9-631e-4807-84a7-977a737e2e36] publish result: ok
consume [ed5e3995-bee6-4595-8e8a-71c394b88c7d] times 37 ok
publish request [ed5e3995-bee6-4595-8e8a-71c394b88c7d] ok
accept [ed5e3995-bee6-4595-8e8a-71c394b88c7d] publish result: ok
consume [c26ee613-6baf-4b20-ad6c-d77889d64e72] times 38 ok
publish request [c26ee613-6baf-4b20-ad6c-d77889d64e72] ok
accept [c26ee613-6baf-4b20-ad6c-d77889d64e72] publish result: ok
consume [fdb01678-19af-4cc3-a280-a363223d2a2c] times 39 ok
publish request [fdb01678-19af-4cc3-a280-a363223d2a2c] ok
accept [fdb01678-19af-4cc3-a280-a363223d2a2c] publish result: ok
consume [1f6ac25b-537c-45c1-8741-35498c68a9d6] times 40 ok
publish request [1f6ac25b-537c-45c1-8741-35498c68a9d6] ok
accept [1f6ac25b-537c-45c1-8741-35498c68a9d6] publish result: ok
consume [df1cb753-137b-4e41-b2b1-f8993d9042e8] times 41 ok
publish request [df1cb753-137b-4e41-b2b1-f8993d9042e8] ok
accept [df1cb753-137b-4e41-b2b1-f8993d9042e8] publish result: ok
consume [e7e0b01c-d1cc-4d3d-ad77-194679119013] times 42 ok
publish request [e7e0b01c-d1cc-4d3d-ad77-194679119013] ok
accept [e7e0b01c-d1cc-4d3d-ad77-194679119013] publish result: ok
consume [6cc2e099-5a1b-4c1e-90fb-d75d8010d0dd] times 43 ok
publish request [6cc2e099-5a1b-4c1e-90fb-d75d8010d0dd] ok
accept [6cc2e099-5a1b-4c1e-90fb-d75d8010d0dd] publish result: ok
consume [151136a5-2204-40ec-a31b-c0942a362b06] times 44 ok
publish request [151136a5-2204-40ec-a31b-c0942a362b06] ok
accept [151136a5-2204-40ec-a31b-c0942a362b06] publish result: ok
consume [43b1a2f4-b652-4a2f-bd23-3902d00ae3bb] times 45 ok
publish request [43b1a2f4-b652-4a2f-bd23-3902d00ae3bb] ok
accept [43b1a2f4-b652-4a2f-bd23-3902d00ae3bb] publish result: ok
consume [db015885-d6b5-42bc-b953-9af535335190] times 46 ok
publish request [db015885-d6b5-42bc-b953-9af535335190] ok
accept [db015885-d6b5-42bc-b953-9af535335190] publish result: ok
consume [9aa892aa-91b3-41d3-b761-c4cb80263bc7] times 47 ok
publish request [9aa892aa-91b3-41d3-b761-c4cb80263bc7] ok
accept [9aa892aa-91b3-41d3-b761-c4cb80263bc7] publish result: ok
consume [4edc10ed-53b0-412a-98f5-995b1d4bf9f4] times 48 ok
publish request [4edc10ed-53b0-412a-98f5-995b1d4bf9f4] ok
accept [4edc10ed-53b0-412a-98f5-995b1d4bf9f4] publish result: ok
consume [1146df16-f8c1-489b-9be4-5949ad87f787] times 49 ok
publish request [1146df16-f8c1-489b-9be4-5949ad87f787] ok
accept [1146df16-f8c1-489b-9be4-5949ad87f787] publish result: ok
consume [a3e6b48a-4799-4b1f-9933-e874bb2ba6b1] times 50 ok
publish request [a3e6b48a-4799-4b1f-9933-e874bb2ba6b1] ok
accept [a3e6b48a-4799-4b1f-9933-e874bb2ba6b1] publish result: ok
consume [3b43ede5-1bce-45ee-b3c8-16829b09a982] times 51

回答

9

第51次时 无限等待

7

急,在线等

8
        topic.addListener((channel, msg) -> {
            consumer.accept(msg);

            //*********[删除本行可解决此问题]**********
            topic.removeAllListeners();
        });

这里应该是只需要remove自己吧?把其他线程的监听都remove掉了也就没有callback回来了。

6

@jackygurui 我是想清空掉这个topic,在监听到某个消息后 结束topic.不然PublishSubscribeService#name2PubSubConnection的数据越来越多。 现在用queue替代了. 顺便问一个问题,queue有支持类似topic订阅的功能么,比如 queue.take(msg-> )

8

@zhou-hao 在并发情况下,每个线程应该只关注自己的使用,清理自己的资源。在所有线程都完成使用以后,所有的资源也会清理完成。

Topic跟Queue有不同的特性,适用于不同的场景,两者不能简单地相互替换,需要根据场景来判断。

你可以看看BlockingQueue的用法。

7

@jackygurui 场景就是通知和回复. 一个topic只有一个listener且只用一次,如果不调用removeListener好像不会清理? debug看了PublishSubscribeService#name2PubSubConnection里会越来越多. 在调用了removeListener 一定次数就会发生阻塞。所以换成了queue. 但是好像没有能实现实现异步出队的queue.

0

@zhou-hao 不是说不用调用removeListener,而是只用清理自己。例如:

    public static void consume(String msgId, Consumer<String> consumer) {
        RTopic<String> topic = getResponseTopic(msgId);
        topic.addListener(new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence channel, String msg) {
                consumer.accept(msg);
                topic.removeListener(this);
            }
        });
    }

方法后缀名为Async的都是异步操作。

5

好的,谢谢!!!