当前位置: 首页 > news >正文

kafka 代码使用

目录

简单使用

Producer消息发送方

 发送消息到指定分区上

未指定分区

同步发送

 异步发消息

ACK参数设置

消息发送缓冲区

​编辑Consumer消息消费者

简单实现 

 提交offset

长轮询poll消息

心跳 -- 健康检查

指定分区和偏移量、时间消费

新消费组消费offset规则

整合springboot

依赖

配置文件

Producer生产者

Consumer消费者

设置消费组、多个topic、指定分区、指定偏移量消费及消费个数

简单使用

依赖

最好是根据kafka的版本,引入对应的依赖版本

 其中kafka_2.13-2.8.1这个目录就是自带了版本号。

前面为scala版本,后面为kafka版本。
样例说明:2.13为scala版本,2.8.1为kafka版本。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>

Producer消息发送方

public class MySimpleProducer {
    private final static String TOPIC_NAME = "my-replicated-topic";

    public static void main(String[] args) throws Exception {
        //1.设置参数
        Properties props = new Properties();
        //设置kafka集群
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,  StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,  StringSerializer.class.getName());
        //2.创建⽣产消息的客户端,传⼊参数
        Producer<String, String> producer = new KafkaProducer<>(props);
        //3.创建消息
        //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
        ProducerRecord<String, String> producerRecord =
                new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka");
        //4.发送消息,得到消息发送的元数据并输出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println("同步⽅式发送消息结果:" + "topic-" +
                metadata.topic()
                + "|partition-" + metadata.partition()
                + "|offset-" + metadata.offset());
    }
}

ProducerRecord可以不带key的构造方法,但是推荐使用代用key的,如果存在分区,kafka会根据key的经过特定的hash算法,基础这个消息应该落到哪个分区。

结果如下: partition表示分区,offset表示偏移量. 所以这个消息发送到了0 分区,偏移量为1

 发送消息到指定分区上

ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));

第二个参数执行分区,有特殊要求时才会用,一般不使用

未指定分区

则会通过业务keyhash运算,算出消息往哪个分区上发

//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));

同步发送

生产者同步发消息,在收到 kafka ack 告知发送成功之前一直处于阻塞状态
//等待消息发送成功的同步阻塞⽅法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式发送消息结果:"
+ "topic-" + metadata.topic() 
+ "|partition-" + metadata.partition() 
+ "|offset-" + metadata.offset());

 异步发消息

生产者发消息,发送完后不用等待 broker 给回复,直接执行下面的业务逻辑。可以提供callback,让 broker 异步的调用 callback ,告知生产者,消息发送的结果。
//要发送5条消息
 Order order = new Order((long) i, i);
 //指定发送分区
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>(TOPIC_NAME,
order.getOrderId().toString(), JSON.toJSONString(order));
 //异步回调方式发送消息
producer.send(producerRecord, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
         if (exception != null) {
             System.err.println("发送消息失败:" + exception.getStackTrace());
         }
         if (metadata != null) {
         System.out.println("异步方式发送消息结果:" 
                    + "topic-" + metadata.topic() 
                    + "|partition-" + metadata.partition() 
                    + "|offset-" + metadata.offset());
                }
         }
 });

ACK参数设置

在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。
那么集群什么时候返回ack呢?ack有3个配置
  • ack = 0 :消息到达kafka-cluster,kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息的,效率是最高的
  • ack = 1(默认):多副本之间的leader已经收到消息,并把消息写入到本地log中,才返回ack给生产者,性能和安全性是最均衡的
  • ack = -1/all. 搭配kafka的min.insync.replicas配置,如果min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完后,才会返回ack给生产者(此时集群中有2broker已完成数据的接收),这种方式最安全,但性能最差。

 关于ack以及重试(没有收到ack,会重新发送消息)的配置:

//设置ack的值 = 1 
props.put(ProducerConfig.ACKS_CONFIG, "1");
 /*
    发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造
    成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
 */
//设置重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

消息发送缓冲区

  • kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • kafka本地线程会去缓冲区中一次拉16k的数据,发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  • 如果线程拉不到16k的数据,间隔10ms也会将已拉倒的数据发到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

Consumer消息消费者

简单实现 

package com.lb;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MySimplerConsumer {
    private final static String TOPIC_NAME = "my-replicated-topic";
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) {
        Properties props = new Properties();
        //设置kafka集群
//        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");

        //设置分组名称
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);

        //设置key和value的反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //1.创建一个消费者客户端
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //2.订阅一个主题列表
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            /*
             * 3.poll()是拉取消息的长轮询
             */
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                //4.打印消息
                System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                        record.offset(), record.key(), record.value());
            }
        }
    }
}

 提交offset

1)提交内容

消费者无论是自动提交还是手动提交,都需要把

所属的消费组+消费的主题+消费分区+消费的偏移量

这些信息提交给集群的_consumer_offsets主题里面

2) 自动提交

消费者poll消息下来以后就会自动提交offset

// 是否主动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 主动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

注意:自动提交会丢失消息。因为消费者会在消费前提前offset,有可能提交完后还没有消费时就挂了

3) 手动提交

需要把手动提交的配置设置成false

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

手动提交又分成了两种:

  • 手动同步提交

在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交offset成功,执行之后的逻辑

while (true) {
 /*
 * poll() API 是拉取消息的长轮询
 */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("收到消息:partition = %d,offset = %d, key= %s, value =%s%n",
        record.partition(),record.offset(), record.key(), record.value());
        }
 //所有的消息已消费完
     if (records.count() > 0) {//有消息
         // 手动同步提交offset,当前线程会阻塞直到offset提交成功
         // 一般使用同步提交
         consumer.commitSync();//=======阻塞=== 提交成功
         }
     }
 }

一般使用同步提交,记得给consumer.commitSync();进行异常捕获,添加异常处理

  • 手动异步提交

在消息消费完后提交,不需要等到集群的ack,直接执行后续逻辑,可以设置一个回调方法,够集群调用

while (true) {
	/*
	 * poll() API 是拉取消息的⻓轮询
	 */
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
	for (ConsumerRecord<String, String> record : records) {
		System.out.printf("收到消息:partition = %d,offset = %d, key= % s, value = %s % n ",
				record.partition(), record.offset(), record.key(), record.value());
	}
	//所有的消息已消费完
	if (records.count() > 0) {
		// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯的程序逻辑
		consumer.commitAsync(new OffsetCommitCallback() {
			@Override
			public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
				if (exception != null) {
					System.err.println("Commit failed for " + offsets);
					System.err.println("Commit failed exception: " +
							exception.getStackTrace());
				}
			}
		});
	}
}

长轮询poll消息

  • 默认情况下,消费者一次会poll500条消息
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  • 代码中设置了长轮询的时间是1000毫秒
while (true) {
 /*
 * poll() API 是拉取消息的长轮询
 */
 ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(1000));
 for (ConsumerRecord<String, String> record : records) {
    System.out.printf("收到消息:partition = %d,offset = %d, key = %s,value = %s%n",                             
    record.partition(),record.offset(), record.key(), record.value());
 }

poll数量 & 长轮询 的意思是:

  1. 如果一次poll到500条消息,就直接返回执行for循环的逻辑
  2. 如果一次poll没有拿到500条消息。且时间仍在1秒内,那么长轮询继续poll,直到消息数量达到500条,就返回执行for循环的逻辑
  3. 如果多次poll都没有达到500条,且1秒时间已经到了,就返回执行for循环的逻辑
  • 如果两个poll的时间间隔超过30s,集群会认为该消费者的消费能力过弱,将该消费者提出消费组,触发rebalance机制,rebalance机制会造成性能开销,则可以通过减少poll条数的设置,来满足时间限制
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

心跳 -- 健康检查

消费者每隔1s向kafka集群发送心跳,集群发现如果超过10s没有收到消费者的心跳,将被提出消费组,触发该分区消费组的rebalance机制,将该分区交给消费组里的其他消费者进行消费

//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

指定分区和偏移量、时间消费

  • 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 从头消费 -- offset = 0 开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 指定时间消费

根据时间,去所有的partition中确定该时间的对应的offset,然后去所有partition中找到该offset之后开始消费消息

//获取主题的所有分区
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
	map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
//根绝主题以及所有分区,以及提供的时间点 -- 获取到分区对应时间点的 offset偏移量
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
	//获得分区
	TopicPartition key = entry.getKey();
	//获得时间对应分区偏移量的对象
	OffsetAndTimestamp value = entry.getValue();
	if (key == null || value == null) continue;
	Long offset = value.offset();
	System.out.println("partition-" + key.partition() +
			"|offset-" + offset);
	//根据消费⾥的timestamp确定offset
	if (value != null) {
		//设置分区
		consumer.assign(Arrays.asList(key));
		//根绝分区以及偏移量 获取之后的所有消息
		consumer.seek(key, offset);
	}
}

新消费组消费offset规则

新消费组中的消费者在启动以后,默认会从当前分区最后一条消息的offset+1开始消费(消费新的消息)。可以通过以下的设置,让新的消费者第一次从头开始消费。之后开始消费新的消息(最后消费的位置的偏移量 + 1)

  • Latest: 默认的,消费新消息
  • earliest:第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量 + 1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

整合springboot

依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

spring:
  kafka:
    #设置kafka集群
    #bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094
    bootstrap-servers: 192.168.99.100:9092
    #设置生成者
    producer:
      # 1:leader接收到消息落盘后就返回ack
      acks: 1
      #设置重试次数 大于0的值
      retries: 3
      #去缓冲区中一次拉16k的数据,发送到broker
      batch-size: 16384
      #缓冲区大小32M
      buffer-memory: 33554432
      #指定消息key和value的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #设置消费者
    consumer:
      #默认消费组
      group-id: testGroup
      #关闭自动提交
      enable-auto-commit: false
      #新消费组从头开始消费
      auto-offset-reset: earliest
      #设置消息key和value的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #一次poll最大拿500条消息
      max-poll-records: 500
      properties:
        # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        session:
          timeout:
            ms: 120000
        # 消费消息超时时间
        request:
          timeout:
            ms=180000:
    listener:
      #RECORD 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
      #BATCH 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      #TIME 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
      #COUNT 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
      #COUNT_TIME TIME | COUNT 有⼀个条件满⾜时提交
      #MANUAL 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
      #MANUAL_IMMEDIATE ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
      ack-mode: manual_immediate

Producer生产者

 private final static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * send方法默认是异步的
     *
     * @return
     */
    @RequestMapping("/send")
    public String send() {
        kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message");
        return "send success";
    }

    /**
     * 同步发送    future.get()方法阻塞等到结果返回
     */
    @RequestMapping("/send3")
    public String send_syn() {

        try {
            SendResult<String, String> sendResult =
                    kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message").get();
            ProducerRecord<String, String> record = sendResult.getProducerRecord();
            RecordMetadata metadata = sendResult.getRecordMetadata();
            System.out.println("消息发送成功");
            System.out.printf("消息key = %s ,value = %s%n", record.key(), record.value());
            System.out.printf("主题 = %s, partition = %d, offset = %d%n", metadata.topic(), metadata.partition(), metadata.offset());
            return "send success";
        } catch (Exception e) {
            //发生异常时
            e.printStackTrace();
            return "send fail";
        }
    }

 异步发送消息添加回调监听

package com.lb.listener;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

@Component
public class ProduceListener implements ProducerListener {
    private static final Logger log = LoggerFactory.getLogger(ProduceListener.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());
        log.info("Message send success topic:{} , partition:{} , offset: {}",
                recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
    }

    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send success : " + producerRecord.toString());
    }
}

Consumer消费者

package com.lb.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {

    /**
     * groupId = MyGroup1 消费 my-replicated-topic 主题的消息
     * 这里的groupId是自己设定消费组,如果不设定则使用yml中默认的消费组
     * @param record 说明此方法是消费一条消息的逻辑
     * @param ack
     */
    @KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")
    public void listen_MyReplicatedTopic_MyGroup1_01(
            ConsumerRecord<String,String> record, Acknowledgment ack){
        String value = record.value();
        System.out.printf(
                "groupId = MyGroup1 接受到 %s 的一条消息,内容是:" +
                        "partition = %d,offset = %d, key = %s, value = %s%n",
                record.topic(),record.partition(),record.offset(), record.key(), record.value());
        //手动提交offset
        ack.acknowledge();
    }

    /**
     * @param records 消费多条消息的逻辑,只是方法参数不一样,2种都支持
     * @param ack
     */
/*    @KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")
    public void listen_MyReplicatedTopic_MyGroup1_01(
            ConsumerRecords<String,String> records, Acknowledgment ack){
        for (ConsumerRecord<String, String> record : records) {
            String value = record.value();
            System.out.printf(
                    "groupId = MyGroup1 接受到 %s 的一条消息,内容是:" +
                            "partition = %d,offset = %d, key = %s, value = %s%n",
                    record.topic(),record.partition(),record.offset(), record.key(), record.value());
        }
        //手动提交offset
        ack.acknowledge();
    }*/
}

2个Consumer的区别在于

1.一个是消费一条消息的逻辑,一个是消费一组消息的逻辑

2.ack.acknowledge();如果配合配置文件中的spring.kafka.listener.ack-mode.如果配置值为manual_immediate则表示第一个方法消费一条信息就给kafka发送这条消息的确认接受的ack,第二个方法时消费完一组消息,一起提交ack。而如果配置值为MANUAL,则表示即使在代码中是每条消息都发送的了确认ack,依然是每一批poll都消费完了,一起确认。

设置消费组、多个topic、指定分区、指定偏移量消费及消费个数

    @KafkaListener(groupId = "testGroup",
            topicPartitions = {
                    @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
                    @TopicPartition(topic = "topic2", partitions = "0",
                            partitionOffsets = @PartitionOffset(partition = "1",
                                    initialOffset = "100"))
            },
            concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数, 建议⼩于等于分区总数
    public void listenGroup(ConsumerRecord<String, String> record,
                            Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //⼿动提交offset
        ack.acknowledge();
    }

注解含义:

groupId: 消费组名称

 topicPartitions :主题和分区信息集合

 TopicPartition:一个topic以及分区信息

topic = "topic1", partitions = {"0", "1"} : 消费主题为topic1,分区0和1的消息

topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100")

:消费主题topic2,分区0的所有消息,分区1,offset=100以后的消息

concurrency = "3" :表明创建同一个消费组下消费者个个数,不要大于分区数,否则会存在空闲的消费者。

相关文章:

  • 两个单链表相交的一系列问题
  • 连接查询
  • ArrayList源码解析
  • iptables防火墙 (SNAT、DNAT)
  • idea2021.3.3 创建maven-scala项目并解决遇到的问题:通过add frameworks support找到不到scala插件
  • 模板模式【Java设计模式】
  • openGL 材质
  • windows 10 局域网设置固定 IP 地址
  • Python之猜数字游戏
  • Python 程序的输出 | 第十套(异常处理)
  • 【Android入门】8、Service 后台线程、多线程、IntentService
  • 面向医学图像语义分割-MedISeg
  • puzzle(017.9)HueBots
  • SIM卡被锁怎么办
  • 腾讯云服务器有那么多的型号标准型,计算型,内存型等等,到底该如何选择?
  • 03Go 类型总结
  • 77. Combinations
  • Android组件 - 收藏集 - 掘金
  • centos安装java运行环境jdk+tomcat
  •  D - 粉碎叛乱F - 其他起义
  • dva中组件的懒加载
  • hadoop集群管理系统搭建规划说明
  • iOS帅气加载动画、通知视图、红包助手、引导页、导航栏、朋友圈、小游戏等效果源码...
  • Java程序员幽默爆笑锦集
  • java中具有继承关系的类及其对象初始化顺序
  • python 学习笔记 - Queue Pipes,进程间通讯
  • Vim Clutch | 面向脚踏板编程……
  • 动态魔术使用DBMS_SQL
  • 关于字符编码你应该知道的事情
  • 什么是Javascript函数节流?
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 微信小程序填坑清单
  • 小而合理的前端理论:rscss和rsjs
  • 硬币翻转问题,区间操作
  • 测评:对于写作的人来说,Markdown是你最好的朋友 ...
  • 移动端高清、多屏适配方案
  • 曾刷新两项世界纪录,腾讯优图人脸检测算法 DSFD 正式开源 ...
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • #!/usr/bin/python与#!/usr/bin/env python的区别
  • #我与Java虚拟机的故事#连载13:有这本书就够了
  • (06)金属布线——为半导体注入生命的连接
  • (9)STL算法之逆转旋转
  • (C语言)字符分类函数
  • (done) 两个矩阵 “相似” 是什么意思?
  • (Forward) Music Player: From UI Proposal to Code
  • (附源码)ssm高校运动会管理系统 毕业设计 020419
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite
  • (转)eclipse内存溢出设置 -Xms212m -Xmx804m -XX:PermSize=250M -XX:MaxPermSize=356m
  • (转)详解PHP处理密码的几种方式
  • .【机器学习】隐马尔可夫模型(Hidden Markov Model,HMM)
  • .babyk勒索病毒解析:恶意更新如何威胁您的数据安全
  • .net core 连接数据库,通过数据库生成Modell
  • .NET/C# 使用反射调用含 ref 或 out 参数的方法
  • .NET国产化改造探索(一)、VMware安装银河麒麟